PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 207 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 225 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 924 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

925 {
926  const char *snapshot_name = NULL;
927  char xloc[MAXFNAMELEN];
928  char *slot_name;
929  bool reserve_wal = false;
930  bool two_phase = false;
931  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
933  TupOutputState *tstate;
934  TupleDesc tupdesc;
935  Datum values[4];
936  bool nulls[4];
937 
939 
940  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
941 
942  /* setup state for WalSndSegmentOpen */
943  sendTimeLineIsHistoric = false;
945 
946  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
947  {
948  ReplicationSlotCreate(cmd->slotname, false,
950  false);
951  }
952  else
953  {
955 
956  /*
957  * Initially create persistent slot as ephemeral - that allows us to
958  * nicely handle errors during initialization because it'll get
959  * dropped if this transaction fails. We'll make it persistent at the
960  * end. Temporary slots can be created as temporary from beginning as
961  * they get dropped on error as well.
962  */
963  ReplicationSlotCreate(cmd->slotname, true,
965  two_phase);
966  }
967 
968  if (cmd->kind == REPLICATION_KIND_LOGICAL)
969  {
971  bool need_full_snapshot = false;
972 
973  /*
974  * Do options check early so that we can bail before calling the
975  * DecodingContextFindStartpoint which can take long time.
976  */
977  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
978  {
979  if (IsTransactionBlock())
980  ereport(ERROR,
981  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
982  (errmsg("%s must not be called inside a transaction",
983  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
984 
985  need_full_snapshot = true;
986  }
987  else if (snapshot_action == CRS_USE_SNAPSHOT)
988  {
989  if (!IsTransactionBlock())
990  ereport(ERROR,
991  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
992  (errmsg("%s must be called inside a transaction",
993  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
994 
996  ereport(ERROR,
997  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
998  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
999  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1000 
1001  if (FirstSnapshotSet)
1002  ereport(ERROR,
1003  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1004  (errmsg("%s must be called before any query",
1005  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1006 
1007  if (IsSubTransaction())
1008  ereport(ERROR,
1009  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1010  (errmsg("%s must not be called in a subtransaction",
1011  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1012 
1013  need_full_snapshot = true;
1014  }
1015 
1016  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1018  XL_ROUTINE(.page_read = logical_read_xlog_page,
1019  .segment_open = WalSndSegmentOpen,
1020  .segment_close = wal_segment_close),
1023 
1024  /*
1025  * Signal that we don't need the timeout mechanism. We're just
1026  * creating the replication slot and don't yet accept feedback
1027  * messages or send keepalives. As we possibly need to wait for
1028  * further WAL the walsender would otherwise possibly be killed too
1029  * soon.
1030  */
1032 
1033  /* build initial snapshot, might take a while */
1035 
1036  /*
1037  * Export or use the snapshot if we've been asked to do so.
1038  *
1039  * NB. We will convert the snapbuild.c kind of snapshot to normal
1040  * snapshot when doing this.
1041  */
1042  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1043  {
1044  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1045  }
1046  else if (snapshot_action == CRS_USE_SNAPSHOT)
1047  {
1048  Snapshot snap;
1049 
1052  }
1053 
1054  /* don't need the decoding context anymore */
1055  FreeDecodingContext(ctx);
1056 
1057  if (!cmd->temporary)
1059  }
1060  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1061  {
1063 
1065 
1066  /* Write this slot to disk if it's a permanent one. */
1067  if (!cmd->temporary)
1069  }
1070 
1071  snprintf(xloc, sizeof(xloc), "%X/%X",
1073 
1075  MemSet(nulls, false, sizeof(nulls));
1076 
1077  /*----------
1078  * Need a tuple descriptor representing four columns:
1079  * - first field: the slot name
1080  * - second field: LSN at which we became consistent
1081  * - third field: exported snapshot's name
1082  * - fourth field: output plugin
1083  *----------
1084  */
1085  tupdesc = CreateTemplateTupleDesc(4);
1086  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1087  TEXTOID, -1, 0);
1088  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1089  TEXTOID, -1, 0);
1090  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1091  TEXTOID, -1, 0);
1092  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1093  TEXTOID, -1, 0);
1094 
1095  /* prepare for projection of tuples */
1096  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1097 
1098  /* slot_name */
1099  slot_name = NameStr(MyReplicationSlot->data.name);
1100  values[0] = CStringGetTextDatum(slot_name);
1101 
1102  /* consistent wal location */
1103  values[1] = CStringGetTextDatum(xloc);
1104 
1105  /* snapshot name, or NULL if none */
1106  if (snapshot_name != NULL)
1107  values[2] = CStringGetTextDatum(snapshot_name);
1108  else
1109  nulls[2] = true;
1110 
1111  /* plugin, or NULL if none */
1112  if (cmd->plugin != NULL)
1113  values[3] = CStringGetTextDatum(cmd->plugin);
1114  else
1115  nulls[3] = true;
1116 
1117  /* send it to dest */
1118  do_tup_output(tstate, values, nulls);
1119  end_tup_output(tstate);
1120 
1122 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
PGPROC * MyProc
Definition: proc.c:68
#define XACT_REPEATABLE_READ
Definition: xact.h:38
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2286
#define MemSet(start, val, len)
Definition: c.h:1008
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:226
void ReplicationSlotSave(void)
Definition: slot.c:710
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2226
ReplicationSlotPersistentData data
Definition: slot.h:147
XLogRecPtr confirmed_flush
Definition: slot.h:84
ReplicationKind kind
Definition: replnodes.h:56
bool IsTransactionBlock(void)
Definition: xact.c:4683
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:622
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void ReplicationSlotReserveWal(void)
Definition: slot.c:1069
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:589
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2344
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
#define ERROR
Definition: elog.h:46
bool FirstSnapshotSet
Definition: snapmgr.c:149
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2266
void ReplicationSlotPersist(void)
Definition: slot.c:745
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1265
void ReplicationSlotRelease(void)
Definition: slot.c:469
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1238
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:411
static bool two_phase
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2471
TimeLineID ThisTimeLineID
Definition: xlog.c:194
#define ereport(elevel,...)
Definition: elog.h:157
struct SnapBuild * snapshot_builder
Definition: logical.h:43
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define Assert(condition)
Definition: c.h:804
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:633
int XactIsoLevel
Definition: xact.c:75
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
Definition: walsender.c:860
bool IsSubTransaction(void)
Definition: xact.c:4756
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:530
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int errmsg(const char *fmt,...)
Definition: elog.c:909
CRSSnapshotAction
Definition: walsender.h:20
#define NameStr(name)
Definition: c.h:681
#define CStringGetTextDatum(s)
Definition: builtins.h:82
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:806
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
#define snprintf
Definition: port.h:216
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:318
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
void ReplicationSlotMarkDirty(void)
Definition: slot.c:728
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1349

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1128 of file walsender.c.

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1129 {
1130  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1131 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:563

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1509 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1510 {
1511  int parse_rc;
1512  Node *cmd_node;
1513  const char *cmdtag;
1514  MemoryContext cmd_context;
1515  MemoryContext old_context;
1516 
1517  /*
1518  * If WAL sender has been told that shutdown is getting close, switch its
1519  * status accordingly to handle the next replication commands correctly.
1520  */
1521  if (got_STOPPING)
1523 
1524  /*
1525  * Throw error if in stopping mode. We need prevent commands that could
1526  * generate WAL while the shutdown checkpoint is being written. To be
1527  * safe, we just prohibit all new commands.
1528  */
1530  ereport(ERROR,
1531  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1532 
1533  /*
1534  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1535  * command arrives. Clean up the old stuff if there's anything.
1536  */
1538 
1540 
1541  /*
1542  * Parse the command.
1543  */
1545  "Replication command context",
1547  old_context = MemoryContextSwitchTo(cmd_context);
1548 
1549  replication_scanner_init(cmd_string);
1550  parse_rc = replication_yyparse();
1551  if (parse_rc != 0)
1552  ereport(ERROR,
1553  (errcode(ERRCODE_SYNTAX_ERROR),
1554  errmsg_internal("replication command parser returned %d",
1555  parse_rc)));
1557 
1558  cmd_node = replication_parse_result;
1559 
1560  /*
1561  * If it's a SQL command, just clean up our mess and return false; the
1562  * caller will take care of executing it.
1563  */
1564  if (IsA(cmd_node, SQLCmd))
1565  {
1566  if (MyDatabaseId == InvalidOid)
1567  ereport(ERROR,
1568  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1569 
1570  MemoryContextSwitchTo(old_context);
1571  MemoryContextDelete(cmd_context);
1572 
1573  /* Tell the caller that this wasn't a WalSender command. */
1574  return false;
1575  }
1576 
1577  /*
1578  * Report query to various monitoring facilities. For this purpose, we
1579  * report replication commands just like SQL commands.
1580  */
1581  debug_query_string = cmd_string;
1582 
1584 
1585  /*
1586  * Log replication command if log_replication_commands is enabled. Even
1587  * when it's disabled, log the command with DEBUG1 level for backward
1588  * compatibility.
1589  */
1591  (errmsg("received replication command: %s", cmd_string)));
1592 
1593  /*
1594  * Disallow replication commands in aborted transaction blocks.
1595  */
1597  ereport(ERROR,
1598  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1599  errmsg("current transaction is aborted, "
1600  "commands ignored until end of transaction block")));
1601 
1603 
1604  /*
1605  * Allocate buffers that will be used for each outgoing and incoming
1606  * message. We do this just once per command to reduce palloc overhead.
1607  */
1611 
1612  switch (cmd_node->type)
1613  {
1614  case T_IdentifySystemCmd:
1615  cmdtag = "IDENTIFY_SYSTEM";
1616  set_ps_display(cmdtag);
1617  IdentifySystem();
1618  EndReplicationCommand(cmdtag);
1619  break;
1620 
1621  case T_BaseBackupCmd:
1622  cmdtag = "BASE_BACKUP";
1623  set_ps_display(cmdtag);
1624  PreventInTransactionBlock(true, cmdtag);
1625  SendBaseBackup((BaseBackupCmd *) cmd_node);
1626  EndReplicationCommand(cmdtag);
1627  break;
1628 
1630  cmdtag = "CREATE_REPLICATION_SLOT";
1631  set_ps_display(cmdtag);
1633  EndReplicationCommand(cmdtag);
1634  break;
1635 
1637  cmdtag = "DROP_REPLICATION_SLOT";
1638  set_ps_display(cmdtag);
1640  EndReplicationCommand(cmdtag);
1641  break;
1642 
1643  case T_StartReplicationCmd:
1644  {
1645  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1646 
1647  cmdtag = "START_REPLICATION";
1648  set_ps_display(cmdtag);
1649  PreventInTransactionBlock(true, cmdtag);
1650 
1651  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1652  StartReplication(cmd);
1653  else
1655 
1656  /* dupe, but necessary per libpqrcv_endstreaming */
1657  EndReplicationCommand(cmdtag);
1658 
1659  Assert(xlogreader != NULL);
1660  break;
1661  }
1662 
1663  case T_TimeLineHistoryCmd:
1664  cmdtag = "TIMELINE_HISTORY";
1665  set_ps_display(cmdtag);
1666  PreventInTransactionBlock(true, cmdtag);
1668  EndReplicationCommand(cmdtag);
1669  break;
1670 
1671  case T_VariableShowStmt:
1672  {
1674  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1675 
1676  cmdtag = "SHOW";
1677  set_ps_display(cmdtag);
1678 
1679  /* syscache access needs a transaction environment */
1681  GetPGVariable(n->name, dest);
1683  EndReplicationCommand(cmdtag);
1684  }
1685  break;
1686 
1687  default:
1688  elog(ERROR, "unrecognized replication command node tag: %u",
1689  cmd_node->type);
1690  }
1691 
1692  /* done */
1693  MemoryContextSwitchTo(old_context);
1694  MemoryContextDelete(cmd_context);
1695 
1696  /*
1697  * We need not update ps display or pg_stat_activity, because PostgresMain
1698  * will reset those to "idle". But we must reset debug_query_string to
1699  * ensure it doesn't become a dangling pointer.
1700  */
1701  debug_query_string = NULL;
1702 
1703  return true;
1704 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:465
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1128
void CommitTransactionCommand(void)
Definition: xact.c:2939
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:539
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9316
Node * replication_parse_result
void set_ps_display(const char *activity)
Definition: ps_status.c:349
ReplicationKind kind
Definition: replnodes.h:82
#define ERROR
Definition: elog.h:46
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:932
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
WalSndState state
NodeTag type
Definition: nodes.h:541
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:571
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3379
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static XLogReaderState * xlogreader
Definition: walsender.c:137
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
const char * debug_query_string
Definition: postgres.c:89
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1138
Oid MyDatabaseId
Definition: globals.c:88
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3229
void StartTransactionCommand(void)
Definition: xact.c:2838
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:924
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pgstat_report_activity(BackendState state, const char *cmd_str)
#define elog(elevel,...)
Definition: elog.h:232
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static void IdentifySystem(void)
Definition: walsender.c:376
void replication_scanner_finish(void)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:683
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2970 of file walsender.c.

References GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2971 {
2972  XLogRecPtr replayPtr;
2973  TimeLineID replayTLI;
2974  XLogRecPtr receivePtr;
2976  XLogRecPtr result;
2977 
2978  /*
2979  * We can safely send what's already been replayed. Also, if walreceiver
2980  * is streaming WAL from the same timeline, we can send anything that it
2981  * has streamed, but hasn't been replayed yet.
2982  */
2983 
2984  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
2985  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2986 
2987  ThisTimeLineID = replayTLI;
2988 
2989  result = replayPtr;
2990  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2991  result = receivePtr;
2992 
2993  return result;
2994 }
uint32 TimeLineID
Definition: xlogdefs.h:59
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11742
static TimeLineID receiveTLI
Definition: xlog.c:200
TimeLineID ThisTimeLineID
Definition: xlog.c:194
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3023 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3024 {
3026 
3027  /*
3028  * If replication has not yet started, die like with SIGTERM. If
3029  * replication is active, only set a flag and wake up the main loop. It
3030  * will send any outstanding WAL, wait for it to be replicated to the
3031  * standby, and then exit gracefully.
3032  */
3033  if (!replication_active)
3034  kill(MyProcPid, SIGTERM);
3035  else
3036  got_STOPPING = true;
3037 }
int MyProcPid
Definition: globals.c:43
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:454
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:804

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 376 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

377 {
378  char sysid[32];
379  char xloc[MAXFNAMELEN];
380  XLogRecPtr logptr;
381  char *dbname = NULL;
383  TupOutputState *tstate;
384  TupleDesc tupdesc;
385  Datum values[4];
386  bool nulls[4];
387 
388  /*
389  * Reply with a result set with one row, four columns. First col is system
390  * ID, second is timeline ID, third is current xlog location and the
391  * fourth contains the database name if we are connected to one.
392  */
393 
394  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
396 
399  {
400  /* this also updates ThisTimeLineID */
401  logptr = GetStandbyFlushRecPtr();
402  }
403  else
404  logptr = GetFlushRecPtr();
405 
406  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
407 
408  if (MyDatabaseId != InvalidOid)
409  {
411 
412  /* syscache access needs a transaction env. */
414  /* make dbname live outside TX context */
418  /* CommitTransactionCommand switches to TopMemoryContext */
420  }
421 
423  MemSet(nulls, false, sizeof(nulls));
424 
425  /* need a tuple descriptor representing four columns */
426  tupdesc = CreateTemplateTupleDesc(4);
427  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
428  TEXTOID, -1, 0);
429  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
430  INT4OID, -1, 0);
431  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
432  TEXTOID, -1, 0);
433  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
434  TEXTOID, -1, 0);
435 
436  /* prepare for projection of tuples */
437  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
438 
439  /* column 1: system identifier */
440  values[0] = CStringGetTextDatum(sysid);
441 
442  /* column 2: timeline */
443  values[1] = Int32GetDatum(ThisTimeLineID);
444 
445  /* column 3: wal location */
446  values[2] = CStringGetTextDatum(xloc);
447 
448  /* column 4: database name, or NULL if none */
449  if (dbname)
450  values[3] = CStringGetTextDatum(dbname);
451  else
452  nulls[3] = true;
453 
454  /* send it to dest */
455  do_tup_output(tstate, values, nulls);
456 
457  end_tup_output(tstate);
458 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void CommitTransactionCommand(void)
Definition: xact.c:2939
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2286
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:1008
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8569
bool RecoveryInProgress(void)
Definition: xlog.c:8217
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2344
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2266
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2113
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:411
Oid MyDatabaseId
Definition: globals.c:88
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:194
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2838
char * dbname
Definition: streamutil.c:51
static Datum values[MAXATTR]
Definition: bootstrap.c:166
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4958
#define Int32GetDatum(X)
Definition: postgres.h:523
#define CStringGetTextDatum(s)
Definition: builtins.h:82
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2970
#define snprintf
Definition: port.h:216
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:484
bool am_cascading_walsender
Definition: walsender.c:116

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2393 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2394 {
2395  int i;
2396 
2397  /*
2398  * WalSndCtl should be set up already (we inherit this by fork() or
2399  * EXEC_BACKEND mechanism from the postmaster).
2400  */
2401  Assert(WalSndCtl != NULL);
2402  Assert(MyWalSnd == NULL);
2403 
2404  /*
2405  * Find a free walsender slot and reserve it. This must not fail due to
2406  * the prior check for free WAL senders in InitProcess().
2407  */
2408  for (i = 0; i < max_wal_senders; i++)
2409  {
2410  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2411 
2412  SpinLockAcquire(&walsnd->mutex);
2413 
2414  if (walsnd->pid != 0)
2415  {
2416  SpinLockRelease(&walsnd->mutex);
2417  continue;
2418  }
2419  else
2420  {
2421  /*
2422  * Found a free slot. Reserve it for us.
2423  */
2424  walsnd->pid = MyProcPid;
2425  walsnd->state = WALSNDSTATE_STARTUP;
2426  walsnd->sentPtr = InvalidXLogRecPtr;
2427  walsnd->needreload = false;
2428  walsnd->write = InvalidXLogRecPtr;
2429  walsnd->flush = InvalidXLogRecPtr;
2430  walsnd->apply = InvalidXLogRecPtr;
2431  walsnd->writeLag = -1;
2432  walsnd->flushLag = -1;
2433  walsnd->applyLag = -1;
2434  walsnd->sync_standby_priority = 0;
2435  walsnd->latch = &MyProc->procLatch;
2436  walsnd->replyTime = 0;
2437  SpinLockRelease(&walsnd->mutex);
2438  /* don't need the lock anymore */
2439  MyWalSnd = (WalSnd *) walsnd;
2440 
2441  break;
2442  }
2443  }
2444 
2445  Assert(MyWalSnd != NULL);
2446 
2447  /* Arrange to clean up at walsender exit */
2449 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:43
PGPROC * MyProc
Definition: proc.c:68
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2453
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:804
int sync_standby_priority
bool needreload
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3605 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3606 {
3607  TimestampTz time = 0;
3608 
3609  /* Read all unread samples up to this LSN or end of buffer. */
3610  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3611  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3612  {
3613  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3614  lag_tracker->last_read[head] =
3616  lag_tracker->read_heads[head] =
3618  }
3619 
3620  /*
3621  * If the lag tracker is empty, that means the standby has processed
3622  * everything we've ever sent so we should now clear 'last_read'. If we
3623  * didn't do that, we'd risk using a stale and irrelevant sample for
3624  * interpolation at the beginning of the next burst of WAL after a period
3625  * of idleness.
3626  */
3628  lag_tracker->last_read[head].time = 0;
3629 
3630  if (time > now)
3631  {
3632  /* If the clock somehow went backwards, treat as not found. */
3633  return -1;
3634  }
3635  else if (time == 0)
3636  {
3637  /*
3638  * We didn't cross a time. If there is a future sample that we
3639  * haven't reached yet, and we've already reached at least one sample,
3640  * let's interpolate the local flushed time. This is mainly useful
3641  * for reporting a completely stuck apply position as having
3642  * increasing lag, since otherwise we'd have to wait for it to
3643  * eventually start moving again and cross one of our samples before
3644  * we can show the lag increasing.
3645  */
3647  {
3648  /* There are no future samples, so we can't interpolate. */
3649  return -1;
3650  }
3651  else if (lag_tracker->last_read[head].time != 0)
3652  {
3653  /* We can interpolate between last_read and the next sample. */
3654  double fraction;
3655  WalTimeSample prev = lag_tracker->last_read[head];
3657 
3658  if (lsn < prev.lsn)
3659  {
3660  /*
3661  * Reported LSNs shouldn't normally go backwards, but it's
3662  * possible when there is a timeline change. Treat as not
3663  * found.
3664  */
3665  return -1;
3666  }
3667 
3668  Assert(prev.lsn < next.lsn);
3669 
3670  if (prev.time > next.time)
3671  {
3672  /* If the clock somehow went backwards, treat as not found. */
3673  return -1;
3674  }
3675 
3676  /* See how far we are between the previous and next samples. */
3677  fraction =
3678  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3679 
3680  /* Scale the local flush time proportionally. */
3681  time = (TimestampTz)
3682  ((double) prev.time + (next.time - prev.time) * fraction);
3683  }
3684  else
3685  {
3686  /*
3687  * We have only a future sample, implying that we were entirely
3688  * caught up but and now there is a new burst of WAL and the
3689  * standby hasn't processed the first sample yet. Until the
3690  * standby reaches the future sample the best we can do is report
3691  * the hypothetical lag if that sample were to be replayed now.
3692  */
3693  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3694  }
3695  }
3696 
3697  /* Return the elapsed time since local flush time in microseconds. */
3698  Assert(time != 0);
3699  return now - time;
3700 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
static int32 next
Definition: blutils.c:219
int write_head
Definition: walsender.c:214
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
XLogRecPtr lsn
Definition: walsender.c:202
#define Assert(condition)
Definition: c.h:804
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3540 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3541 {
3542  bool buffer_full;
3543  int new_write_head;
3544  int i;
3545 
3546  if (!am_walsender)
3547  return;
3548 
3549  /*
3550  * If the lsn hasn't advanced since last time, then do nothing. This way
3551  * we only record a new sample when new WAL has been written.
3552  */
3553  if (lag_tracker->last_lsn == lsn)
3554  return;
3555  lag_tracker->last_lsn = lsn;
3556 
3557  /*
3558  * If advancing the write head of the circular buffer would crash into any
3559  * of the read heads, then the buffer is full. In other words, the
3560  * slowest reader (presumably apply) is the one that controls the release
3561  * of space.
3562  */
3563  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3564  buffer_full = false;
3565  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3566  {
3567  if (new_write_head == lag_tracker->read_heads[i])
3568  buffer_full = true;
3569  }
3570 
3571  /*
3572  * If the buffer is full, for now we just rewind by one slot and overwrite
3573  * the last sample, as a simple (if somewhat uneven) way to lower the
3574  * sampling rate. There may be better adaptive compaction algorithms.
3575  */
3576  if (buffer_full)
3577  {
3578  new_write_head = lag_tracker->write_head;
3579  if (lag_tracker->write_head > 0)
3581  else
3583  }
3584 
3585  /* Store a sample at the current write head position. */
3587  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3588  lag_tracker->write_head = new_write_head;
3589 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
int write_head
Definition: walsender.c:214
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
bool am_walsender
Definition: walsender.c:115
XLogRecPtr lsn
Definition: walsender.c:202
XLogRecPtr last_lsn
Definition: walsender.c:212
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 806 of file walsender.c.

References CheckXLogRemoved(), XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

808 {
809  XLogRecPtr flushptr;
810  int count;
811  WALReadError errinfo;
812  XLogSegNo segno;
813 
814  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
816  sendTimeLine = state->currTLI;
818  sendTimeLineNextTLI = state->nextTLI;
819 
820  /* make sure we have enough WAL available */
821  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
822 
823  /* fail if not (implies we are going to shut down) */
824  if (flushptr < targetPagePtr + reqLen)
825  return -1;
826 
827  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
828  count = XLOG_BLCKSZ; /* more than one block available */
829  else
830  count = flushptr - targetPagePtr; /* part of the page available */
831 
832  /* now actually read the data, we know it's there */
833  if (!WALRead(state,
834  cur_page,
835  targetPagePtr,
836  XLOG_BLCKSZ,
837  state->seg.ws_tli, /* Pass the current TLI because only
838  * WalSndSegmentOpen controls whether new
839  * TLI is needed. */
840  &errinfo))
841  WALReadRaiseError(&errinfo);
842 
843  /*
844  * After reading into the buffer, check that what we read was valid. We do
845  * this after reading, because even though the segment was present when we
846  * opened it, it might get recycled or removed while we read it. The
847  * read() succeeds in that case, but the data we tried to read might
848  * already have been overwritten with new WAL records.
849  */
850  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
851  CheckXLogRemoved(segno, state->seg.ws_tli);
852 
853  return count;
854 }
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:975
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:707
WALOpenSegment seg
Definition: xlogreader.h:215
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3972
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:238
TimeLineID nextTLI
Definition: xlogreader.h:244
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1375
TimeLineID ThisTimeLineID
Definition: xlog.c:194
TimeLineID currTLI
Definition: xlogreader.h:228
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
TimeLineID ws_tli
Definition: xlogreader.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1062
WALSegmentContext segcxt
Definition: xlogreader.h:214
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3267 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3268 {
3269  Interval *result = palloc(sizeof(Interval));
3270 
3271  result->month = 0;
3272  result->day = 0;
3273  result->time = offset;
3274 
3275  return result;
3276 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:1062

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase 
)
static

Definition at line 860 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

864 {
865  ListCell *lc;
866  bool snapshot_action_given = false;
867  bool reserve_wal_given = false;
868  bool two_phase_given = false;
869 
870  /* Parse options */
871  foreach(lc, cmd->options)
872  {
873  DefElem *defel = (DefElem *) lfirst(lc);
874 
875  if (strcmp(defel->defname, "export_snapshot") == 0)
876  {
877  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
878  ereport(ERROR,
879  (errcode(ERRCODE_SYNTAX_ERROR),
880  errmsg("conflicting or redundant options")));
881 
882  snapshot_action_given = true;
883  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
885  }
886  else if (strcmp(defel->defname, "use_snapshot") == 0)
887  {
888  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
889  ereport(ERROR,
890  (errcode(ERRCODE_SYNTAX_ERROR),
891  errmsg("conflicting or redundant options")));
892 
893  snapshot_action_given = true;
894  *snapshot_action = CRS_USE_SNAPSHOT;
895  }
896  else if (strcmp(defel->defname, "reserve_wal") == 0)
897  {
898  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
899  ereport(ERROR,
900  (errcode(ERRCODE_SYNTAX_ERROR),
901  errmsg("conflicting or redundant options")));
902 
903  reserve_wal_given = true;
904  *reserve_wal = true;
905  }
906  else if (strcmp(defel->defname, "two_phase") == 0)
907  {
908  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
909  ereport(ERROR,
910  (errcode(ERRCODE_SYNTAX_ERROR),
911  errmsg("conflicting or redundant options")));
912  two_phase_given = true;
913  *two_phase = true;
914  }
915  else
916  elog(ERROR, "unrecognized option: %s", defel->defname);
917  }
918 }
int errcode(int sqlerrcode)
Definition: elog.c:698
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:46
static bool two_phase
#define ereport(elevel,...)
Definition: elog.h:157
#define lfirst(lc)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
char * defname
Definition: parsenodes.h:746

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3283 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, IntervalPGetDatum, is_member_of_role(), IsA, LSNGetDatum, max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3284 {
3285 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3286  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3287  TupleDesc tupdesc;
3288  Tuplestorestate *tupstore;
3289  MemoryContext per_query_ctx;
3290  MemoryContext oldcontext;
3291  SyncRepStandbyData *sync_standbys;
3292  int num_standbys;
3293  int i;
3294 
3295  /* check to see if caller supports us returning a tuplestore */
3296  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3297  ereport(ERROR,
3298  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3299  errmsg("set-valued function called in context that cannot accept a set")));
3300  if (!(rsinfo->allowedModes & SFRM_Materialize))
3301  ereport(ERROR,
3302  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3303  errmsg("materialize mode required, but it is not allowed in this context")));
3304 
3305  /* Build a tuple descriptor for our result type */
3306  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3307  elog(ERROR, "return type must be a row type");
3308 
3309  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3310  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3311 
3312  tupstore = tuplestore_begin_heap(true, false, work_mem);
3313  rsinfo->returnMode = SFRM_Materialize;
3314  rsinfo->setResult = tupstore;
3315  rsinfo->setDesc = tupdesc;
3316 
3317  MemoryContextSwitchTo(oldcontext);
3318 
3319  /*
3320  * Get the currently active synchronous standbys. This could be out of
3321  * date before we're done, but we'll use the data anyway.
3322  */
3323  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3324 
3325  for (i = 0; i < max_wal_senders; i++)
3326  {
3327  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3329  XLogRecPtr write;
3330  XLogRecPtr flush;
3331  XLogRecPtr apply;
3332  TimeOffset writeLag;
3333  TimeOffset flushLag;
3334  TimeOffset applyLag;
3335  int priority;
3336  int pid;
3338  TimestampTz replyTime;
3339  bool is_sync_standby;
3341  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3342  int j;
3343 
3344  /* Collect data from shared memory */
3345  SpinLockAcquire(&walsnd->mutex);
3346  if (walsnd->pid == 0)
3347  {
3348  SpinLockRelease(&walsnd->mutex);
3349  continue;
3350  }
3351  pid = walsnd->pid;
3352  sentPtr = walsnd->sentPtr;
3353  state = walsnd->state;
3354  write = walsnd->write;
3355  flush = walsnd->flush;
3356  apply = walsnd->apply;
3357  writeLag = walsnd->writeLag;
3358  flushLag = walsnd->flushLag;
3359  applyLag = walsnd->applyLag;
3360  priority = walsnd->sync_standby_priority;
3361  replyTime = walsnd->replyTime;
3362  SpinLockRelease(&walsnd->mutex);
3363 
3364  /*
3365  * Detect whether walsender is/was considered synchronous. We can
3366  * provide some protection against stale data by checking the PID
3367  * along with walsnd_index.
3368  */
3369  is_sync_standby = false;
3370  for (j = 0; j < num_standbys; j++)
3371  {
3372  if (sync_standbys[j].walsnd_index == i &&
3373  sync_standbys[j].pid == pid)
3374  {
3375  is_sync_standby = true;
3376  break;
3377  }
3378  }
3379 
3380  memset(nulls, 0, sizeof(nulls));
3381  values[0] = Int32GetDatum(pid);
3382 
3383  if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3384  {
3385  /*
3386  * Only superusers and members of pg_read_all_stats can see
3387  * details. Other users only get the pid value to know it's a
3388  * walsender, but no details.
3389  */
3390  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3391  }
3392  else
3393  {
3394  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3395 
3396  if (XLogRecPtrIsInvalid(sentPtr))
3397  nulls[2] = true;
3398  values[2] = LSNGetDatum(sentPtr);
3399 
3400  if (XLogRecPtrIsInvalid(write))
3401  nulls[3] = true;
3402  values[3] = LSNGetDatum(write);
3403 
3404  if (XLogRecPtrIsInvalid(flush))
3405  nulls[4] = true;
3406  values[4] = LSNGetDatum(flush);
3407 
3408  if (XLogRecPtrIsInvalid(apply))
3409  nulls[5] = true;
3410  values[5] = LSNGetDatum(apply);
3411 
3412  /*
3413  * Treat a standby such as a pg_basebackup background process
3414  * which always returns an invalid flush location, as an
3415  * asynchronous standby.
3416  */
3417  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3418 
3419  if (writeLag < 0)
3420  nulls[6] = true;
3421  else
3422  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3423 
3424  if (flushLag < 0)
3425  nulls[7] = true;
3426  else
3427  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3428 
3429  if (applyLag < 0)
3430  nulls[8] = true;
3431  else
3432  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3433 
3434  values[9] = Int32GetDatum(priority);
3435 
3436  /*
3437  * More easily understood version of standby state. This is purely
3438  * informational.
3439  *
3440  * In quorum-based sync replication, the role of each standby
3441  * listed in synchronous_standby_names can be changing very
3442  * frequently. Any standbys considered as "sync" at one moment can
3443  * be switched to "potential" ones at the next moment. So, it's
3444  * basically useless to report "sync" or "potential" as their sync
3445  * states. We report just "quorum" for them.
3446  */
3447  if (priority == 0)
3448  values[10] = CStringGetTextDatum("async");
3449  else if (is_sync_standby)
3451  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3452  else
3453  values[10] = CStringGetTextDatum("potential");
3454 
3455  if (replyTime == 0)
3456  nulls[11] = true;
3457  else
3458  values[11] = TimestampTzGetDatum(replyTime);
3459  }
3460 
3461  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3462  }
3463 
3464  /* clean up and return the tuplestore */
3465  tuplestore_donestoring(tupstore);
3466 
3467  return (Datum) 0;
3468 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:478
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:69
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:1008
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:46
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:411
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3248
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:124
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define ereport(elevel,...)
Definition: elog.h:157
int allowedModes
Definition: execnodes.h:305
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4869
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:317
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:310
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:725
static Datum values[MAXATTR]
Definition: bootstrap.c:166
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:303
#define Int32GetDatum(X)
Definition: postgres.h:523
TupleDesc setDesc
Definition: execnodes.h:311
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:82
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3267
XLogRecPtr apply

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1855 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1856 {
1857  bool changed = false;
1859 
1860  Assert(lsn != InvalidXLogRecPtr);
1861  SpinLockAcquire(&slot->mutex);
1862  if (slot->data.restart_lsn != lsn)
1863  {
1864  changed = true;
1865  slot->data.restart_lsn = lsn;
1866  }
1867  SpinLockRelease(&slot->mutex);
1868 
1869  if (changed)
1870  {
1873  }
1874 
1875  /*
1876  * One could argue that the slot should be saved to disk now, but that'd
1877  * be energy wasted - the worst lost information can do here is give us
1878  * wrong information in a statistics view - we'll just potentially be more
1879  * conservative in removing files.
1880  */
1881 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:147
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:817
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
XLogRecPtr restart_lsn
Definition: slot.h:73
slock_t mutex
Definition: slot.h:120
void ReplicationSlotMarkDirty(void)
Definition: slot.c:728

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1992 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1993 {
1994  bool changed = false;
1996 
1997  SpinLockAcquire(&slot->mutex);
1999 
2000  /*
2001  * For physical replication we don't need the interlock provided by xmin
2002  * and effective_xmin since the consequences of a missed increase are
2003  * limited to query cancellations, so set both at once.
2004  */
2005  if (!TransactionIdIsNormal(slot->data.xmin) ||
2006  !TransactionIdIsNormal(feedbackXmin) ||
2007  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2008  {
2009  changed = true;
2010  slot->data.xmin = feedbackXmin;
2011  slot->effective_xmin = feedbackXmin;
2012  }
2013  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2014  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2015  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2016  {
2017  changed = true;
2018  slot->data.catalog_xmin = feedbackCatalogXmin;
2019  slot->effective_catalog_xmin = feedbackCatalogXmin;
2020  }
2021  SpinLockRelease(&slot->mutex);
2022 
2023  if (changed)
2024  {
2027  }
2028 }
PGPROC * MyProc
Definition: proc.c:68
ReplicationSlotPersistentData data
Definition: slot.h:147
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:143
TransactionId xmin
Definition: proc.h:138
TransactionId catalog_xmin
Definition: slot.h:70
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:62
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:144
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:120
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:767
void ReplicationSlotMarkDirty(void)
Definition: slot.c:728

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1711 of file walsender.c.

References Assert, COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1712 {
1713  unsigned char firstchar;
1714  int maxmsglen;
1715  int r;
1716  bool received = false;
1717 
1719 
1720  /*
1721  * If we already received a CopyDone from the frontend, any subsequent
1722  * message is the beginning of a new command, and should be processed in
1723  * the main processing loop.
1724  */
1725  while (!streamingDoneReceiving)
1726  {
1727  pq_startmsgread();
1728  r = pq_getbyte_if_available(&firstchar);
1729  if (r < 0)
1730  {
1731  /* unexpected error or EOF */
1733  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1734  errmsg("unexpected EOF on standby connection")));
1735  proc_exit(0);
1736  }
1737  if (r == 0)
1738  {
1739  /* no data available without blocking */
1740  pq_endmsgread();
1741  break;
1742  }
1743 
1744  /* Validate message type and set packet size limit */
1745  switch (firstchar)
1746  {
1747  case 'd':
1748  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1749  break;
1750  case 'c':
1751  case 'X':
1752  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1753  break;
1754  default:
1755  ereport(FATAL,
1756  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1757  errmsg("invalid standby message type \"%c\"",
1758  firstchar)));
1759  maxmsglen = 0; /* keep compiler quiet */
1760  break;
1761  }
1762 
1763  /* Read the message contents */
1765  if (pq_getmessage(&reply_message, maxmsglen))
1766  {
1768  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1769  errmsg("unexpected EOF on standby connection")));
1770  proc_exit(0);
1771  }
1772 
1773  /* ... and process it */
1774  switch (firstchar)
1775  {
1776  /*
1777  * 'd' means a standby reply wrapped in a CopyData packet.
1778  */
1779  case 'd':
1781  received = true;
1782  break;
1783 
1784  /*
1785  * CopyDone means the standby requested to finish streaming.
1786  * Reply with CopyDone, if we had not sent that already.
1787  */
1788  case 'c':
1789  if (!streamingDoneSending)
1790  {
1791  pq_putmessage_noblock('c', NULL, 0);
1792  streamingDoneSending = true;
1793  }
1794 
1795  streamingDoneReceiving = true;
1796  received = true;
1797  break;
1798 
1799  /*
1800  * 'X' means that the standby is closing down the socket.
1801  */
1802  case 'X':
1803  proc_exit(0);
1804 
1805  default:
1806  Assert(false); /* NOT REACHED */
1807  }
1808  }
1809 
1810  /*
1811  * Save the last reply timestamp if we've received at least one reply.
1812  */
1813  if (received)
1814  {
1816  waiting_for_ping_response = false;
1817  }
1818 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1824
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
static TimestampTz last_processing
Definition: walsender.c:162
void pq_startmsgread(void)
Definition: pqcomm.c:1152
#define FATAL
Definition: elog.h:49
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1034
static bool streamingDoneSending
Definition: walsender.c:179
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1214
static StringInfoData reply_message
Definition: walsender.c:158
void pq_endmsgread(void)
Definition: pqcomm.c:1176
#define ereport(elevel,...)
Definition: elog.h:157
#define Assert(condition)
Definition: c.h:804
static bool streamingDoneReceiving
Definition: walsender.c:180
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2072 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

2073 {
2074  TransactionId feedbackXmin;
2075  uint32 feedbackEpoch;
2076  TransactionId feedbackCatalogXmin;
2077  uint32 feedbackCatalogEpoch;
2078  TimestampTz replyTime;
2079 
2080  /*
2081  * Decipher the reply message. The caller already consumed the msgtype
2082  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2083  * of this message.
2084  */
2085  replyTime = pq_getmsgint64(&reply_message);
2086  feedbackXmin = pq_getmsgint(&reply_message, 4);
2087  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2088  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2089  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2090 
2092  {
2093  char *replyTimeStr;
2094 
2095  /* Copy because timestamptz_to_str returns a static buffer */
2096  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2097 
2098  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2099  feedbackXmin,
2100  feedbackEpoch,
2101  feedbackCatalogXmin,
2102  feedbackCatalogEpoch,
2103  replyTimeStr);
2104 
2105  pfree(replyTimeStr);
2106  }
2107 
2108  /*
2109  * Update shared state for this WalSender process based on reply data from
2110  * standby.
2111  */
2112  {
2113  WalSnd *walsnd = MyWalSnd;
2114 
2115  SpinLockAcquire(&walsnd->mutex);
2116  walsnd->replyTime = replyTime;
2117  SpinLockRelease(&walsnd->mutex);
2118  }
2119 
2120  /*
2121  * Unset WalSender's xmins if the feedback message values are invalid.
2122  * This happens when the downstream turned hot_standby_feedback off.
2123  */
2124  if (!TransactionIdIsNormal(feedbackXmin)
2125  && !TransactionIdIsNormal(feedbackCatalogXmin))
2126  {
2128  if (MyReplicationSlot != NULL)
2129  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2130  return;
2131  }
2132 
2133  /*
2134  * Check that the provided xmin/epoch are sane, that is, not in the future
2135  * and not so far back as to be already wrapped around. Ignore if not.
2136  */
2137  if (TransactionIdIsNormal(feedbackXmin) &&
2138  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2139  return;
2140 
2141  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2142  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2143  return;
2144 
2145  /*
2146  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2147  * the xmin will be taken into account by GetSnapshotData() /
2148  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2149  * thereby prevent the generation of cleanup conflicts on the standby
2150  * server.
2151  *
2152  * There is a small window for a race condition here: although we just
2153  * checked that feedbackXmin precedes nextXid, the nextXid could have
2154  * gotten advanced between our fetching it and applying the xmin below,
2155  * perhaps far enough to make feedbackXmin wrap around. In that case the
2156  * xmin we set here would be "in the future" and have no effect. No point
2157  * in worrying about this since it's too late to save the desired data
2158  * anyway. Assuming that the standby sends us an increasing sequence of
2159  * xmins, this could only happen during the first reply cycle, else our
2160  * own xmin would prevent nextXid from advancing so far.
2161  *
2162  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2163  * is assumed atomic, and there's no real need to prevent concurrent
2164  * horizon determinations. (If we're moving our xmin forward, this is
2165  * obviously safe, and if we're moving it backwards, well, the data is at
2166  * risk already since a VACUUM could already have determined the horizon.)
2167  *
2168  * If we're using a replication slot we reserve the xmin via that,
2169  * otherwise via the walsender's PGPROC entry. We can only track the
2170  * catalog xmin separately when using a slot, so we store the least of the
2171  * two provided when not using a slot.
2172  *
2173  * XXX: It might make sense to generalize the ephemeral slot concept and
2174  * always use the slot mechanism to handle the feedback xmin.
2175  */
2176  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2177  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2178  else
2179  {
2180  if (TransactionIdIsNormal(feedbackCatalogXmin)
2181  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2182  MyProc->xmin = feedbackCatalogXmin;
2183  else
2184  MyProc->xmin = feedbackXmin;
2185  }
2186 }
uint32 TransactionId
Definition: c.h:587
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
slock_t mutex
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2041
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1169
TransactionId xmin
Definition: proc.h:138
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:441
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1992
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1824 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1825 {
1826  char msgtype;
1827 
1828  /*
1829  * Check message type from the first byte.
1830  */
1831  msgtype = pq_getmsgbyte(&reply_message);
1832 
1833  switch (msgtype)
1834  {
1835  case 'r':
1837  break;
1838 
1839  case 'h':
1841  break;
1842 
1843  default:
1845  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1846  errmsg("unexpected message type \"%c\"", msgtype)));
1847  proc_exit(0);
1848  }
1849 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define COMMERROR
Definition: elog.h:30
static StringInfoData reply_message
Definition: walsender.c:158
#define ereport(elevel,...)
Definition: elog.h:157
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1887
int errmsg(const char *fmt,...)
Definition: elog.c:909
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2072

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1887 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1888 {
1889  XLogRecPtr writePtr,
1890  flushPtr,
1891  applyPtr;
1892  bool replyRequested;
1893  TimeOffset writeLag,
1894  flushLag,
1895  applyLag;
1896  bool clearLagTimes;
1897  TimestampTz now;
1898  TimestampTz replyTime;
1899 
1900  static bool fullyAppliedLastTime = false;
1901 
1902  /* the caller already consumed the msgtype byte */
1903  writePtr = pq_getmsgint64(&reply_message);
1904  flushPtr = pq_getmsgint64(&reply_message);
1905  applyPtr = pq_getmsgint64(&reply_message);
1906  replyTime = pq_getmsgint64(&reply_message);
1907  replyRequested = pq_getmsgbyte(&reply_message);
1908 
1910  {
1911  char *replyTimeStr;
1912 
1913  /* Copy because timestamptz_to_str returns a static buffer */
1914  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1915 
1916  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1917  LSN_FORMAT_ARGS(writePtr),
1918  LSN_FORMAT_ARGS(flushPtr),
1919  LSN_FORMAT_ARGS(applyPtr),
1920  replyRequested ? " (reply requested)" : "",
1921  replyTimeStr);
1922 
1923  pfree(replyTimeStr);
1924  }
1925 
1926  /* See if we can compute the round-trip lag for these positions. */
1927  now = GetCurrentTimestamp();
1928  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1929  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1930  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1931 
1932  /*
1933  * If the standby reports that it has fully replayed the WAL in two
1934  * consecutive reply messages, then the second such message must result
1935  * from wal_receiver_status_interval expiring on the standby. This is a
1936  * convenient time to forget the lag times measured when it last
1937  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1938  * until more WAL traffic arrives.
1939  */
1940  clearLagTimes = false;
1941  if (applyPtr == sentPtr)
1942  {
1943  if (fullyAppliedLastTime)
1944  clearLagTimes = true;
1945  fullyAppliedLastTime = true;
1946  }
1947  else
1948  fullyAppliedLastTime = false;
1949 
1950  /* Send a reply if the standby requested one. */
1951  if (replyRequested)
1952  WalSndKeepalive(false);
1953 
1954  /*
1955  * Update shared state for this WalSender process based on reply data from
1956  * standby.
1957  */
1958  {
1959  WalSnd *walsnd = MyWalSnd;
1960 
1961  SpinLockAcquire(&walsnd->mutex);
1962  walsnd->write = writePtr;
1963  walsnd->flush = flushPtr;
1964  walsnd->apply = applyPtr;
1965  if (writeLag != -1 || clearLagTimes)
1966  walsnd->writeLag = writeLag;
1967  if (flushLag != -1 || clearLagTimes)
1968  walsnd->flushLag = flushLag;
1969  if (applyLag != -1 || clearLagTimes)
1970  walsnd->applyLag = applyLag;
1971  walsnd->replyTime = replyTime;
1972  SpinLockRelease(&walsnd->mutex);
1973  }
1974 
1977 
1978  /*
1979  * Advance our local xmin horizon when the client confirmed a flush.
1980  */
1981  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1982  {
1985  else
1987  }
1988 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1169
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3479
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1855
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
#define SlotIsLogical(slot)
Definition: slot.h:169
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3605
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1695
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:440
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
bool am_cascading_walsender
Definition: walsender.c:116
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 465 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

466 {
468  char histfname[MAXFNAMELEN];
469  char path[MAXPGPATH];
470  int fd;
471  off_t histfilelen;
472  off_t bytesleft;
473  Size len;
474 
475  /*
476  * Reply with a result set with one row, and two columns. The first col is
477  * the name of the history file, 2nd is the contents.
478  */
479 
480  TLHistoryFileName(histfname, cmd->timeline);
481  TLHistoryFilePath(path, cmd->timeline);
482 
483  /* Send a RowDescription message */
484  pq_beginmessage(&buf, 'T');
485  pq_sendint16(&buf, 2); /* 2 fields */
486 
487  /* first field */
488  pq_sendstring(&buf, "filename"); /* col name */
489  pq_sendint32(&buf, 0); /* table oid */
490  pq_sendint16(&buf, 0); /* attnum */
491  pq_sendint32(&buf, TEXTOID); /* type oid */
492  pq_sendint16(&buf, -1); /* typlen */
493  pq_sendint32(&buf, 0); /* typmod */
494  pq_sendint16(&buf, 0); /* format code */
495 
496  /* second field */
497  pq_sendstring(&buf, "content"); /* col name */
498  pq_sendint32(&buf, 0); /* table oid */
499  pq_sendint16(&buf, 0); /* attnum */
500  pq_sendint32(&buf, TEXTOID); /* type oid */
501  pq_sendint16(&buf, -1); /* typlen */
502  pq_sendint32(&buf, 0); /* typmod */
503  pq_sendint16(&buf, 0); /* format code */
504  pq_endmessage(&buf);
505 
506  /* Send a DataRow message */
507  pq_beginmessage(&buf, 'D');
508  pq_sendint16(&buf, 2); /* # of columns */
509  len = strlen(histfname);
510  pq_sendint32(&buf, len); /* col1 len */
511  pq_sendbytes(&buf, histfname, len);
512 
513  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
514  if (fd < 0)
515  ereport(ERROR,
517  errmsg("could not open file \"%s\": %m", path)));
518 
519  /* Determine file length and send it to client */
520  histfilelen = lseek(fd, 0, SEEK_END);
521  if (histfilelen < 0)
522  ereport(ERROR,
524  errmsg("could not seek to end of file \"%s\": %m", path)));
525  if (lseek(fd, 0, SEEK_SET) != 0)
526  ereport(ERROR,
528  errmsg("could not seek to beginning of file \"%s\": %m", path)));
529 
530  pq_sendint32(&buf, histfilelen); /* col2 len */
531 
532  bytesleft = histfilelen;
533  while (bytesleft > 0)
534  {
535  PGAlignedBlock rbuf;
536  int nread;
537 
539  nread = read(fd, rbuf.data, sizeof(rbuf));
541  if (nread < 0)
542  ereport(ERROR,
544  errmsg("could not read file \"%s\": %m",
545  path)));
546  else if (nread == 0)
547  ereport(ERROR,
549  errmsg("could not read file \"%s\": read %d of %zu",
550  path, nread, (Size) bytesleft)));
551 
552  pq_sendbytes(&buf, rbuf.data, nread);
553  bytesleft -= nread;
554  }
555 
556  if (CloseTransientFile(fd) != 0)
557  ereport(ERROR,
559  errmsg("could not close file \"%s\": %m", path)));
560 
561  pq_endmessage(&buf);
562 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static void pgstat_report_wait_end(void)
Definition: wait_event.h:278
int errcode(int sqlerrcode)
Definition: elog.c:698
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1141
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2467
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:68
int errcode_for_file_access(void)
Definition: elog.c:721
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:262
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:47
int CloseTransientFile(int fd)
Definition: fd.c:2644
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:157
size_t Size
Definition: c.h:540
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1138 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), XLogRecPtrIsInvalid, and XLogSendLogical().

Referenced by exec_replication_command().

1139 {
1141  QueryCompletion qc;
1142 
1143  /* make sure that our requirements are still fulfilled */
1145 
1147 
1148  ReplicationSlotAcquire(cmd->slotname, true);
1149 
1151  ereport(ERROR,
1152  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1153  errmsg("cannot read from logical replication slot \"%s\"",
1154  cmd->slotname),
1155  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1156 
1157  /*
1158  * Force a disconnect, so that the decoding code doesn't need to care
1159  * about an eventual switch from running in recovery, to running in a
1160  * normal environment. Client code is expected to handle reconnects.
1161  */
1163  {
1164  ereport(LOG,
1165  (errmsg("terminating walsender process after promotion")));
1166  got_STOPPING = true;
1167  }
1168 
1169  /*
1170  * Create our decoding context, making it start at the previously ack'ed
1171  * position.
1172  *
1173  * Do this before sending a CopyBothResponse message, so that any errors
1174  * are reported early.
1175  */
1177  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1178  XL_ROUTINE(.page_read = logical_read_xlog_page,
1179  .segment_open = WalSndSegmentOpen,
1180  .segment_close = wal_segment_close),
1184 
1186 
1187  /* Send a CopyBothResponse message, and start streaming */
1188  pq_beginmessage(&buf, 'W');
1189  pq_sendbyte(&buf, 0);
1190  pq_sendint16(&buf, 0);
1191  pq_endmessage(&buf);
1192  pq_flush();
1193 
1194  /* Start reading WAL from the oldest required WAL. */
1197 
1198  /*
1199  * Report the location after which we'll send out further commits as the
1200  * current sentPtr.
1201  */
1203 
1204  /* Also update the sent position status in shared memory */
1208 
1209  replication_active = true;
1210 
1212 
1213  /* Main loop of walsender */
1215 
1218 
1219  replication_active = false;
1220  if (got_STOPPING)
1221  proc_exit(0);
1223 
1224  /* Get out of COPY mode (CommandComplete). */
1225  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1226  EndCommand(&qc, DestRemote, false);
1227 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:85
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
#define pq_flush()
Definition: libpq.h:46
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:380
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
ReplicationSlotPersistentData data
Definition: slot.h:147
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8217
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:84
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:46
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:478
static char * buf
Definition: pg_test_fsync.c:68
int errdetail(const char *fmt,...)
Definition: elog.c:1042
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:243
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1265
void ReplicationSlotRelease(void)
Definition: slot.c:469
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:411
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2267
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1238
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2471
#define ereport(elevel,...)
Definition: elog.h:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3229
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:633
static void XLogSendLogical(void)
Definition: walsender.c:2850
XLogRecPtr restart_lsn
Definition: slot.h:73
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
int errmsg(const char *fmt,...)
Definition: elog.c:909
XLogReaderState * reader
Definition: logical.h:41
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:806
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
bool am_cascading_walsender
Definition: walsender.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1349

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 571 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

572 {
574  XLogRecPtr FlushPtr;
575 
576  /* create xlogreader for physical replication */
577  xlogreader =
579  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
580  .segment_close = wal_segment_close),
581  NULL);
582 
583  if (!xlogreader)
584  ereport(ERROR,
585  (errcode(ERRCODE_OUT_OF_MEMORY),
586  errmsg("out of memory")));
587 
588  /*
589  * We assume here that we're logging enough information in the WAL for
590  * log-shipping, since this is checked in PostmasterMain().
591  *
592  * NOTE: wal_level can only change at shutdown, so in most cases it is
593  * difficult for there to be WAL data that we can still see that was
594  * written at wal_level='minimal'.
595  */
596 
597  if (cmd->slotname)
598  {
599  ReplicationSlotAcquire(cmd->slotname, true);
601  ereport(ERROR,
602  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
603  errmsg("cannot use a logical replication slot for physical replication")));
604 
605  /*
606  * We don't need to verify the slot's restart_lsn here; instead we
607  * rely on the caller requesting the starting point to use. If the
608  * WAL segment doesn't exist, we'll fail later.
609  */
610  }
611 
612  /*
613  * Select the timeline. If it was given explicitly by the client, use
614  * that. Otherwise use the timeline of the last replayed record, which is
615  * kept in ThisTimeLineID.
616  */
619  {
620  /* this also updates ThisTimeLineID */
621  FlushPtr = GetStandbyFlushRecPtr();
622  }
623  else
624  FlushPtr = GetFlushRecPtr();
625 
626  if (cmd->timeline != 0)
627  {
628  XLogRecPtr switchpoint;
629 
630  sendTimeLine = cmd->timeline;
632  {
633  sendTimeLineIsHistoric = false;
635  }
636  else
637  {
638  List *timeLineHistory;
639 
640  sendTimeLineIsHistoric = true;
641 
642  /*
643  * Check that the timeline the client requested exists, and the
644  * requested start location is on that timeline.
645  */
646  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
647  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
649  list_free_deep(timeLineHistory);
650 
651  /*
652  * Found the requested timeline in the history. Check that
653  * requested startpoint is on that timeline in our history.
654  *
655  * This is quite loose on purpose. We only check that we didn't
656  * fork off the requested timeline before the switchpoint. We
657  * don't check that we switched *to* it before the requested
658  * starting point. This is because the client can legitimately
659  * request to start replication from the beginning of the WAL
660  * segment that contains switchpoint, but on the new timeline, so
661  * that it doesn't end up with a partial segment. If you ask for
662  * too old a starting point, you'll get an error later when we
663  * fail to find the requested WAL segment in pg_wal.
664  *
665  * XXX: we could be more strict here and only allow a startpoint
666  * that's older than the switchpoint, if it's still in the same
667  * WAL segment.
668  */
669  if (!XLogRecPtrIsInvalid(switchpoint) &&
670  switchpoint < cmd->startpoint)
671  {
672  ereport(ERROR,
673  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
675  cmd->timeline),
676  errdetail("This server's history forked from timeline %u at %X/%X.",
677  cmd->timeline,
678  LSN_FORMAT_ARGS(switchpoint))));
679  }
680  sendTimeLineValidUpto = switchpoint;
681  }
682  }
683  else
684  {
687  sendTimeLineIsHistoric = false;
688  }
689 
691 
692  /* If there is nothing to stream, don't even enter COPY mode */
694  {
695  /*
696  * When we first start replication the standby will be behind the
697  * primary. For some applications, for example synchronous
698  * replication, it is important to have a clear state for this initial
699  * catchup mode, so we can trigger actions when we change streaming
700  * state later. We may stay in this state for a long time, which is
701  * exactly why we want to be able to monitor whether or not we are
702  * still here.
703  */
705 
706  /* Send a CopyBothResponse message, and start streaming */
707  pq_beginmessage(&buf, 'W');
708  pq_sendbyte(&buf, 0);
709  pq_sendint16(&buf, 0);
710  pq_endmessage(&buf);
711  pq_flush();
712 
713  /*
714  * Don't allow a request to stream from a future point in WAL that
715  * hasn't been flushed to disk in this server yet.
716  */
717  if (FlushPtr < cmd->startpoint)
718  {
719  ereport(ERROR,
720  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
722  LSN_FORMAT_ARGS(FlushPtr))));
723  }
724 
725  /* Start streaming from the requested point */
726  sentPtr = cmd->startpoint;
727 
728  /* Initialize shared memory status, too */
732 
734 
735  /* Main loop of walsender */
736  replication_active = true;
737 
739 
740  replication_active = false;
741  if (got_STOPPING)
742  proc_exit(0);
744 
746  }
747 
748  if (cmd->slotname)
750 
751  /*
752  * Copy is finished now. Send a single-row result set indicating the next
753  * timeline.
754  */
756  {
757  char startpos_str[8 + 1 + 8 + 1];
759  TupOutputState *tstate;
760  TupleDesc tupdesc;
761  Datum values[2];
762  bool nulls[2];
763 
764  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
766 
768  MemSet(nulls, false, sizeof(nulls));
769 
770  /*
771  * Need a tuple descriptor representing two columns. int8 may seem
772  * like a surprising data type for this, but in theory int4 would not
773  * be wide enough for this, as TimeLineID is unsigned.
774  */
775  tupdesc = CreateTemplateTupleDesc(2);
776  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
777  INT8OID, -1, 0);
778  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
779  TEXTOID, -1, 0);
780 
781  /* prepare for projection of tuple */
782  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
783 
784  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
785  values[1] = CStringGetTextDatum(startpos_str);
786 
787  /* send it to dest */
788  do_tup_output(tstate, values, nulls);
789 
790  end_tup_output(tstate);
791  }
792 
793  /* Send CommandComplete message */
794  EndReplicationCommand("START_STREAMING");
795 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
#define pq_flush()
Definition: libpq.h:46
int wal_segment_size
Definition: xlog.c:119
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:380
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
static void XLogSendPhysical(void)
Definition: walsender.c:2549
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2286
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define MemSet(start, val, len)
Definition: c.h:1008
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8569
bool RecoveryInProgress(void)
Definition: xlog.c:8217
void list_free_deep(List *list)
Definition: list.c:1405
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2344
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
#define ERROR
Definition: elog.h:46
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2266
static char * buf
Definition: pg_test_fsync.c:68
static bool streamingDoneSending
Definition: walsender.c:179
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:1042
void ReplicationSlotRelease(void)
Definition: slot.c:469
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:78
#define SlotIsLogical(slot)
Definition: slot.h:169
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1697
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:411
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2267
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:411
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2471
TimeLineID ThisTimeLineID
Definition: xlog.c:194
#define ereport(elevel,...)
Definition: elog.h:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3229
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
static bool streamingDoneReceiving
Definition: walsender.c:180
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:82
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2970
Definition: pg_list.h:50
#define snprintf
Definition: port.h:216
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
bool am_cascading_walsender
Definition: walsender.c:116

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2041 of file walsender.c.

References EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

2042 {
2043  FullTransactionId nextFullXid;
2044  TransactionId nextXid;
2045  uint32 nextEpoch;
2046 
2047  nextFullXid = ReadNextFullTransactionId();
2048  nextXid = XidFromFullTransactionId(nextFullXid);
2049  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2050 
2051  if (xid <= nextXid)
2052  {
2053  if (epoch != nextEpoch)
2054  return false;
2055  }
2056  else
2057  {
2058  if (epoch + 1 != nextEpoch)
2059  return false;
2060  }
2061 
2062  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2063  return false; /* epoch OK, but it's wrapped around */
2064 
2065  return true;
2066 }
uint32 TransactionId
Definition: c.h:587
#define XidFromFullTransactionId(x)
Definition: transam.h:48
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:441
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2240 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2241 {
2242  TimestampTz timeout;
2243 
2244  /* don't bail out if we're doing something that doesn't require timeouts */
2245  if (last_reply_timestamp <= 0)
2246  return;
2247 
2250 
2251  if (wal_sender_timeout > 0 && last_processing >= timeout)
2252  {
2253  /*
2254  * Since typically expiration of replication timeout means
2255  * communication problem, we don't send the error message to the
2256  * standby.
2257  */
2259  (errmsg("terminating walsender process due to replication timeout")));
2260 
2261  WalSndShutdown();
2262  }
2263 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:162
#define COMMERROR
Definition: elog.h:30
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg(const char *fmt,...)
Definition: elog.c:909
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2196 of file walsender.c.

References last_reply_timestamp, TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2197 {
2198  long sleeptime = 10000; /* 10 s */
2199 
2201  {
2202  TimestampTz wakeup_time;
2203 
2204  /*
2205  * At the latest stop sleeping once wal_sender_timeout has been
2206  * reached.
2207  */
2210 
2211  /*
2212  * If no ping has been sent yet, wakeup when it's time to do so.
2213  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2214  * the timeout passed without a response.
2215  */
2218  wal_sender_timeout / 2);
2219 
2220  /* Compute relative time until wakeup. */
2221  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2222  }
2223 
2224  return sleeptime;
2225 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1693

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2930 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2931 {
2932  XLogRecPtr replicatedPtr;
2933 
2934  /* ... let's just be real sure we're caught up ... */
2935  send_data();
2936 
2937  /*
2938  * To figure out whether all WAL has successfully been replicated, check
2939  * flush location if valid, write otherwise. Tools like pg_receivewal will
2940  * usually (unless in synchronous mode) return an invalid flush location.
2941  */
2942  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2944 
2945  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2946  !pq_is_send_pending())
2947  {
2948  QueryCompletion qc;
2949 
2950  /* Inform the standby that XLOG streaming is done */
2951  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
2952  EndCommand(&qc, DestRemote, false);
2953  pq_flush();
2954 
2955  proc_exit(0);
2956  }
2958  WalSndKeepalive(true);
2959 }
#define pq_is_send_pending()
Definition: libpq.h:48
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:46
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3479
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:183
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 296 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

297 {
301 
302  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
304 
305  if (MyReplicationSlot != NULL)
307 
309 
310  replication_active = false;
311 
312  /*
313  * If there is a transaction in progress, it will clean up our
314  * ResourceOwner, but if a replication command set up a resource owner
315  * without a transaction, we've got to clean that up now.
316  */
318  WalSndResourceCleanup(false);
319 
320  if (got_STOPPING || got_SIGUSR2)
321  proc_exit(0);
322 
323  /* Revert back to startup state */
325 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
static void pgstat_report_wait_end(void)
Definition: wait_event.h:278
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
WALOpenSegment seg
Definition: xlogreader.h:215
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:469
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:331
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3229
void ReplicationSlotCleanup(void)
Definition: slot.c:525
void LWLockReleaseAll(void)
Definition: lwlock.c:1902

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3248 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3249 {
3250  switch (state)
3251  {
3252  case WALSNDSTATE_STARTUP:
3253  return "startup";
3254  case WALSNDSTATE_BACKUP:
3255  return "backup";
3256  case WALSNDSTATE_CATCHUP:
3257  return "catchup";
3258  case WALSNDSTATE_STREAMING:
3259  return "streaming";
3260  case WALSNDSTATE_STOPPING:
3261  return "stopping";
3262  }
3263  return "UNKNOWN";
3264 }
Definition: regguts.h:317

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3165 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3166 {
3167  int i;
3168 
3169  for (i = 0; i < max_wal_senders; i++)
3170  {
3171  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3172  pid_t pid;
3173 
3174  SpinLockAcquire(&walsnd->mutex);
3175  pid = walsnd->pid;
3176  SpinLockRelease(&walsnd->mutex);
3177 
3178  if (pid == 0)
3179  continue;
3180 
3182  }
3183 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3479 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, and waiting_for_ping_response.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3480 {
3481  elog(DEBUG2, "sending replication keepalive");
3482 
3483  /* construct the message... */
3485  pq_sendbyte(&output_message, 'k');
3488  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3489 
3490  /* ... and send it wrapped in CopyData */
3492 
3493  /* Set local flag */
3494  if (requestReply)
3496 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static StringInfoData output_message
Definition: walsender.c:157
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
#define elog(elevel,...)
Definition: elog.h:232
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3502 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3503 {
3504  TimestampTz ping_time;
3505 
3506  /*
3507  * Don't send keepalive messages if timeouts are globally disabled or
3508  * we're doing something not partaking in timeouts.
3509  */
3510  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3511  return;
3512 
3514  return;
3515 
3516  /*
3517  * If half of wal_sender_timeout has lapsed without receiving any reply
3518  * from the standby, send a keep-alive message to the standby requesting
3519  * an immediate reply.
3520  */
3522  wal_sender_timeout / 2);
3523  if (last_processing >= ping_time)
3524  {
3525  WalSndKeepalive(true);
3526 
3527  /* Try to flush pending output to the client */
3528  if (pq_flush_if_writable() != 0)
3529  WalSndShutdown();
3530  }
3531 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3479
static TimestampTz last_processing
Definition: walsender.c:162
#define pq_flush_if_writable()
Definition: libpq.h:47
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2453 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2454 {
2455  WalSnd *walsnd = MyWalSnd;
2456 
2457  Assert(walsnd != NULL);
2458 
2459  MyWalSnd = NULL;
2460 
2461  SpinLockAcquire(&walsnd->mutex);
2462  /* clear latch while holding the spinlock, so it can safely be read */
2463  walsnd->latch = NULL;
2464  /* Mark WalSnd struct as no longer being in use. */
2465  walsnd->pid = 0;
2466  SpinLockRelease(&walsnd->mutex);
2467 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:804

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3045 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

3046 {
3047  int save_errno = errno;
3048 
3049  got_SIGUSR2 = true;
3050  SetLatch(MyLatch);
3051 
3052  errno = save_errno;
3053 }
void SetLatch(Latch *latch)
Definition: latch.c:567
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
struct Latch * MyLatch
Definition: globals.c:57

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2267 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

2268 {
2269  /*
2270  * Initialize the last reply timestamp. That enables timeout processing
2271  * from hereon.
2272  */
2274  waiting_for_ping_response = false;
2275 
2276  /*
2277  * Loop until we reach the end of this timeline or the client requests to
2278  * stop streaming.
2279  */
2280  for (;;)
2281  {
2282  /* Clear any already-pending wakeups */
2284 
2286 
2287  /* Process any requests or signals received recently */
2288  if (ConfigReloadPending)
2289  {
2290  ConfigReloadPending = false;
2293  }
2294 
2295  /* Check for input from the client */
2297 
2298  /*
2299  * If we have received CopyDone from the client, sent CopyDone
2300  * ourselves, and the output buffer is empty, it's time to exit
2301  * streaming.
2302  */
2304  !pq_is_send_pending())
2305  break;
2306 
2307  /*
2308  * If we don't have any pending data in the output buffer, try to send
2309  * some more. If there is some, we don't bother to call send_data
2310  * again until we've flushed it ... but we'd better assume we are not
2311  * caught up.
2312  */
2313  if (!pq_is_send_pending())
2314  send_data();
2315  else
2316  WalSndCaughtUp = false;
2317 
2318  /* Try to flush pending output to the client */
2319  if (pq_flush_if_writable() != 0)
2320  WalSndShutdown();
2321 
2322  /* If nothing remains to be sent right now ... */
2324  {
2325  /*
2326  * If we're in catchup state, move to streaming. This is an
2327  * important state change for users to know about, since before
2328  * this point data loss might occur if the primary dies and we
2329  * need to failover to the standby. The state change is also
2330  * important for synchronous replication, since commits that
2331  * started to wait at that point might wait for some time.
2332  */
2334  {
2335  ereport(DEBUG1,
2336  (errmsg_internal("\"%s\" has now caught up with upstream server",
2337  application_name)));
2339  }
2340 
2341  /*
2342  * When SIGUSR2 arrives, we send any outstanding logs up to the
2343  * shutdown checkpoint record (i.e., the latest record), wait for
2344  * them to be replicated to the standby, and exit. This may be a
2345  * normal termination at shutdown, or a promotion, the walsender
2346  * is not sure which.
2347  */
2348  if (got_SIGUSR2)
2349  WalSndDone(send_data);
2350  }
2351 
2352  /* Check for replication timeout. */
2354 
2355  /* Send keepalive if the time has come */
2357 
2358  /*
2359  * Block if we have unsent data. XXX For logical replication, let
2360  * WalSndWaitForWal() handle any other blocking; idle receivers need
2361  * its additional actions. For physical replication, also block if
2362  * caught up; its send_data does not block.
2363  */
2364  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2367  {
2368  long sleeptime;
2369  int wakeEvents;
2370 
2372  wakeEvents = WL_SOCKET_READABLE;
2373  else
2374  wakeEvents = 0;
2375 
2376  /*
2377  * Use fresh timestamp, not last_processing, to reduce the chance
2378  * of reaching wal_sender_timeout before sending a keepalive.
2379  */
2381 
2382  if (pq_is_send_pending())
2383  wakeEvents |= WL_SOCKET_WRITEABLE;
2384 
2385  /* Sleep until something happens or we time out */
2386  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2387  }
2388  }
2389 }
#define pq_is_send_pending()
Definition: libpq.h:48
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3148
#define DEBUG1
Definition: elog.h:25
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2930
#define WL_SOCKET_READABLE
Definition: latch.h:126
void ResetLatch(Latch *latch)
Definition: latch.c:660
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:47
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
void SyncRepInitConfig(void)
Definition: syncrep.c:411
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3502
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2240
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2196
void WalSndSetState(WalSndState state)
Definition: walsender.c:3229
static void XLogSendLogical(void)
Definition: walsender.c:2850
static bool streamingDoneReceiving
Definition: walsender.c:180
char * application_name
Definition: guc.c:621
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1711

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1238 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1239 {
1240  /* can't have sync rep confused by sending the same LSN several times */
1241  if (!last_write)
1242  lsn = InvalidXLogRecPtr;
1243 
1244  resetStringInfo(ctx->out);
1245 
1246  pq_sendbyte(ctx->out, 'w');
1247  pq_sendint64(ctx->out, lsn); /* dataStart */
1248  pq_sendint64(ctx->out, lsn); /* walEnd */
1249 
1250  /*
1251  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1252  * reserve space here.
1253  */
1254  pq_sendint64(ctx->out, 0); /* sendtime */
1255 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
StringInfo out
Definition: logical.h:70

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 331 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

332 {
333  ResourceOwner resowner;
334 
335  if (CurrentResourceOwner == NULL)
336  return;
337 
338  /*
339  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
340  * in a local variable and clear it first.
341  */
342  resowner = CurrentResourceOwner;
343  CurrentResourceOwner = NULL;
344 
345  /* Now we can release resources and delete it. */
346  ResourceOwnerRelease(resowner,
347  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
348  ResourceOwnerRelease(resowner,
349  RESOURCE_RELEASE_LOCKS, isCommit, true);
350  ResourceOwnerRelease(resowner,
351  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
352  ResourceOwnerDelete(resowner);
353 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:737
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3000 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

3001 {
3002  int i;
3003 
3004  for (i = 0; i < max_wal_senders; i++)
3005  {
3006  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3007 
3008  SpinLockAcquire(&walsnd->mutex);
3009  if (walsnd->pid == 0)
3010  {
3011  SpinLockRelease(&walsnd->mutex);
3012  continue;
3013  }
3014  walsnd->needreload = true;
3015  SpinLockRelease(&walsnd->mutex);
3016  }
3017 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2471 of file walsender.c.

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, XLByteToSeg, XLogFileName, and XLogFilePath.

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

2473 {
2474  char path[MAXPGPATH];
2475 
2476  /*-------
2477  * When reading from a historic timeline, and there is a timeline switch
2478  * within this segment, read from the WAL segment belonging to the new
2479  * timeline.
2480  *
2481  * For example, imagine that this server is currently on timeline 5, and
2482  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2483  * 0/13002088. In pg_wal, we have these files:
2484  *
2485  * ...
2486  * 000000040000000000000012
2487  * 000000040000000000000013
2488  * 000000050000000000000013
2489  * 000000050000000000000014
2490  * ...
2491  *
2492  * In this situation, when requested to send the WAL from segment 0x13, on
2493  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2494  * recovery prefers files from newer timelines, so if the segment was
2495  * restored from the archive on this server, the file belonging to the old
2496  * timeline, 000000040000000000000013, might not exist. Their contents are
2497  * equal up to the switchpoint, because at a timeline switch, the used
2498  * portion of the old segment is copied to the new file. -------
2499  */
2500  *tli_p = sendTimeLine;
2502  {
2503  XLogSegNo endSegNo;
2504 
2506  if (nextSegNo == endSegNo)
2507  *tli_p = sendTimeLineNextTLI;
2508  }
2509 
2510  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2511  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2512  if (state->seg.ws_file >= 0)
2513  return;
2514 
2515  /*
2516  * If the file is not found, assume it's because the standby asked for a
2517  * too old WAL segment that has already been removed or recycled.
2518  */
2519  if (errno == ENOENT)
2520  {
2521  char xlogfname[MAXFNAMELEN];
2522  int save_errno = errno;
2523 
2524  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2525  errno = save_errno;
2526  ereport(ERROR,
2528  errmsg("requested WAL segment %s has already been removed",
2529  xlogfname)));
2530  }
2531  else
2532  ereport(ERROR,
2534  errmsg("could not open file \"%s\": %m",
2535  path)));
2536 }
int wal_segment_size
Definition: xlog.c:119
#define PG_BINARY
Definition: c.h:1271
WALOpenSegment seg
Definition: xlogreader.h:215
#define ERROR
Definition: elog.h:46
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:48
int errcode_for_file_access(void)
Definition: elog.c:721
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:157
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1033
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:909
WALSegmentContext segcxt
Definition: xlogreader.h:214
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3229 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3230 {
3231  WalSnd *walsnd = MyWalSnd;
3232 
3234 
3235  if (walsnd->state == state)
3236  return;
3237 
3238  SpinLockAcquire(&walsnd->mutex);
3239  walsnd->state = state;
3240  SpinLockRelease(&walsnd->mutex);
3241 }
slock_t mutex
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:804
Definition: regguts.h:317

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3088 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3089 {
3090  bool found;
3091  int i;
3092 
3093  WalSndCtl = (WalSndCtlData *)
3094  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3095 
3096  if (!found)
3097  {
3098  /* First time through, so initialize */
3100 
3101  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3103 
3104  for (i = 0; i < max_wal_senders; i++)
3105  {
3106  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3107 
3108  SpinLockInit(&walsnd->mutex);
3109  }
3110  }
3111 }
Size WalSndShmemSize(void)
Definition: walsender.c:3076
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:1008
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3076 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3077 {
3078  Size size = 0;
3079 
3080  size = offsetof(WalSndCtlData, walsnds);
3081  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3082 
3083  return size;
3084 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540
#define offsetof(type, field)
Definition: c.h:727

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 229 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndResourceCleanup(), WalSndWaitForWal(), and WalSndWriteData().

263 {
265 
266  /* Create a per-walsender data structure in shared memory */
268 
269  /*
270  * We don't currently need any ResourceOwner in a walsender process, but
271  * if we did, we could call CreateAuxProcessResourceOwner here.
272  */
273 
274  /*
275  * Let postmaster know that we're a WAL sender. Once we've declared us as
276  * a WAL sender process, postmaster will let us outlive the bgwriter and
277  * kill us last in the shutdown sequence, so we get a chance to stream all
278  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
279  * there's no going back, and we mustn't write any WAL records after this.
280  */
283 
284  /* Initialize empty timestamp buffer for lag tracking. */
286 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2393
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:307
bool RecoveryInProgress(void)
Definition: xlog.c:8217
static LagTracker * lag_tracker
Definition: walsender.c:219
MemoryContext TopMemoryContext
Definition: mcxt.c:48
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:153
bool am_cascading_walsender
Definition: walsender.c:116

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3057 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3058 {
3059  /* Set up signal handlers */
3061  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3062  pqsignal(SIGTERM, die); /* request shutdown */
3063  /* SIGQUIT handler was already set up by InitPostmasterChild */
3064  InitializeTimeouts(); /* establishes SIGALRM handler */
3067  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3068  * shutdown */
3069 
3070  /* Reset some signals that are accepted by postmaster but not here */
3072 }
void InitializeTimeouts(void)
Definition: timeout.c:435
#define SIGUSR1
Definition: win32_port.h:171
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3045
#define SIGCHLD
Definition: win32_port.h:169
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2949
#define SIGHUP
Definition: win32_port.h:159
#define SIG_IGN
Definition: win32_port.h:156
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:154
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:642
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1349 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1350 {
1351  static TimestampTz sendTime = 0;
1353 
1354  /*
1355  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1356  * avoid flooding the lag tracker when we commit frequently.
1357  */
1358 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1359  if (!TimestampDifferenceExceeds(sendTime, now,
1361  return;
1362 
1363  LagTrackerWrite(lsn, now);
1364  sendTime = now;
1365 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3540
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3148 of file walsender.c.

References WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, ModifyWaitEvent(), proc_exit(), WaitEventSetWait(), and WL_POSTMASTER_DEATH.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3149 {
3150  WaitEvent event;
3151 
3152  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3153  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3154  (event.events & WL_POSTMASTER_DEATH))
3155  proc_exit(1);
3156 }
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
void proc_exit(int code)
Definition: ipc.c:104
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:948
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:168
uint32 events
Definition: latch.h:145
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1306

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1375 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1376 {
1377  int wakeEvents;
1378  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1379 
1380  /*
1381  * Fast path to avoid acquiring the spinlock in case we already know we
1382  * have enough WAL available. This is particularly interesting if we're
1383  * far behind.
1384  */
1385  if (RecentFlushPtr != InvalidXLogRecPtr &&
1386  loc <= RecentFlushPtr)
1387  return RecentFlushPtr;
1388 
1389  /* Get a more recent flush pointer. */
1390  if (!RecoveryInProgress())
1391  RecentFlushPtr = GetFlushRecPtr();
1392  else
1393  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1394 
1395  for (;;)
1396  {
1397  long sleeptime;
1398 
1399  /* Clear any already-pending wakeups */
1401 
1403 
1404  /* Process any requests or signals received recently */
1405  if (ConfigReloadPending)
1406  {
1407  ConfigReloadPending = false;
1410  }
1411 
1412  /* Check for input from the client */
1414 
1415  /*
1416  * If we're shutting down, trigger pending WAL to be written out,
1417  * otherwise we'd possibly end up waiting for WAL that never gets
1418  * written, because walwriter has shut down already.
1419  */
1420  if (got_STOPPING)
1422 
1423  /* Update our idea of the currently flushed position. */
1424  if (!RecoveryInProgress())
1425  RecentFlushPtr = GetFlushRecPtr();
1426  else
1427  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1428 
1429  /*
1430  * If postmaster asked us to stop, don't wait anymore.
1431  *
1432  * It's important to do this check after the recomputation of
1433  * RecentFlushPtr, so we can send all remaining data before shutting
1434  * down.
1435  */
1436  if (got_STOPPING)
1437  break;
1438 
1439  /*
1440  * We only send regular messages to the client for full decoded
1441  * transactions, but a synchronous replication and walsender shutdown
1442  * possibly are waiting for a later location. So, before sleeping, we
1443  * send a ping containing the flush location. If the receiver is
1444  * otherwise idle, this keepalive will trigger a reply. Processing the
1445  * reply will update these MyWalSnd locations.
1446  */
1447  if (MyWalSnd->flush < sentPtr &&
1448  MyWalSnd->write < sentPtr &&
1450  WalSndKeepalive(false);
1451 
1452  /* check whether we're done */
1453  if (loc <= RecentFlushPtr)
1454  break;
1455 
1456  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1457  WalSndCaughtUp = true;
1458 
1459  /*
1460  * Try to flush any pending output to the client.
1461  */
1462  if (pq_flush_if_writable() != 0)
1463  WalSndShutdown();
1464 
1465  /*
1466  * If we have received CopyDone from the client, sent CopyDone
1467  * ourselves, and the output buffer is empty, it's time to exit
1468  * streaming, so fail the current WAL fetch request.
1469  */
1471  !pq_is_send_pending())
1472  break;
1473 
1474  /* die if timeout was reached */
1476 
1477  /* Send keepalive if the time has come */
1479 
1480  /*
1481  * Sleep until something happens or we time out. Also wait for the
1482  * socket becoming writable, if there's still pending output.
1483  * Otherwise we might sit on sendable output data while waiting for
1484  * new WAL to be generated. (But if we have nothing to send, we don't
1485  * want to wake on socket-writable.)
1486  */
1488 
1489  wakeEvents = WL_SOCKET_READABLE;
1490 
1491  if (pq_is_send_pending())
1492  wakeEvents |= WL_SOCKET_WRITEABLE;
1493 
1494  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1495  }
1496 
1497  /* reactivate latch so WalSndLoop knows to continue */
1498  SetLatch(MyLatch);
1499  return RecentFlushPtr;
1500 }
#define pq_is_send_pending()
Definition: libpq.h:48
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3148
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define WL_SOCKET_READABLE
Definition: latch.h:126
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8569
bool RecoveryInProgress(void)
Definition: xlog.c:8217
void SetLatch(Latch *latch)
Definition: latch.c:567
void ResetLatch(Latch *latch)
Definition: latch.c:660
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3479
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11742
bool XLogBackgroundFlush(void)
Definition: xlog.c:3049
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:47
void SyncRepInitConfig(void)
Definition: syncrep.c:411
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3502
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2240
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2196
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool streamingDoneReceiving
Definition: walsender.c:180
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static bool waiting_for_ping_response
Definition: walsender.c:171
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1711

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3191 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3192 {
3193  for (;;)
3194  {
3195  int i;
3196  bool all_stopped = true;
3197 
3198  for (i = 0; i < max_wal_senders; i++)
3199  {
3200  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3201 
3202  SpinLockAcquire(&walsnd->mutex);
3203 
3204  if (walsnd->pid == 0)
3205  {
3206  SpinLockRelease(&walsnd->mutex);
3207  continue;
3208  }
3209 
3210  if (walsnd->state != WALSNDSTATE_STOPPING)
3211  {
3212  all_stopped = false;
3213  SpinLockRelease(&walsnd->mutex);
3214  break;
3215  }
3216  SpinLockRelease(&walsnd->mutex);
3217  }
3218 
3219  /* safe to leave if confirmation is done for all WAL senders */
3220  if (all_stopped)
3221  return;
3222 
3223  pg_usleep(10000L); /* wait for 10 msec */
3224  }
3225 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3120 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3121 {
3122  int i;
3123 
3124  for (i = 0; i < max_wal_senders; i++)
3125  {
3126  Latch *latch;
3127  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3128 
3129  /*
3130  * Get latch pointer with spinlock held, for the unlikely case that
3131  * pointer reads aren't atomic (as they're 8 bytes).
3132  */
3133  SpinLockAcquire(&walsnd->mutex);
3134  latch = walsnd->latch;
3135  SpinLockRelease(&walsnd->mutex);
3136 
3137  if (latch != NULL)
3138  SetLatch(latch);
3139  }
3140 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:567
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1265 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, now(), LogicalDecodingContext::out, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1267 {
1268  TimestampTz now;
1269 
1270  /*
1271  * Fill the send timestamp last, so that it is taken as late as possible.
1272  * This is somewhat ugly, but the protocol is set as it's already used for
1273  * several releases by streaming physical replication.
1274  */
1276  now = GetCurrentTimestamp();
1277  pq_sendint64(&tmpbuf, now);
1278  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1279  tmpbuf.data, sizeof(int64));
1280 
1281  /* output previously gathered data in a CopyData packet */
1282  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1283 
1285 
1286  /* Try to flush pending output to the client */
1287  if (pq_flush_if_writable() != 0)
1288  WalSndShutdown();
1289 
1290  /* Try taking fast path unless we get too close to walsender timeout. */
1292  wal_sender_timeout / 2) &&
1293  !pq_is_send_pending())
1294  {
1295  return;
1296  }
1297 
1298  /* If we have pending write here, go to slow path */
1299  for (;;)
1300  {
1301  long sleeptime;
1302 
1303  /* Check for input from the client */
1305 
1306  /* die if timeout was reached */
1308 
1309  /* Send keepalive if the time has come */
1311 
1312  if (!pq_is_send_pending())
1313  break;
1314 
1316 
1317  /* Sleep until something happens or we time out */
1320 
1321  /* Clear any already-pending wakeups */
1323 
1325 
1326  /* Process any requests or signals received recently */
1327  if (ConfigReloadPending)
1328  {
1329  ConfigReloadPending = false;
1332  }
1333 
1334  /* Try to flush pending output to the client */
1335  if (pq_flush_if_writable() != 0)
1336  WalSndShutdown();
1337  }
1338 
1339  /* reactivate latch so WalSndLoop knows to continue */
1340  SetLatch(MyLatch);
1341 }
#define pq_is_send_pending()
Definition: libpq.h:48
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3148
int wal_sender_timeout
Definition: walsender.c:123
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:126
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void SetLatch(Latch *latch)
Definition: latch.c:567
void ResetLatch(Latch *latch)
Definition: latch.c:660
#define pq_flush_if_writable()
Definition: libpq.h:47
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void SyncRepInitConfig(void)
Definition: syncrep.c:411
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3502
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2240
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2196
static StringInfoData tmpbuf
Definition: walsender.c:159
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
StringInfo out
Definition: logical.h:70
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1711
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2850 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

2851 {
2852  XLogRecord *record;
2853  char *errm;
2854 
2855  /*
2856  * We'll use the current flush point to determine whether we've caught up.
2857  * This variable is static in order to cache it across calls. Caching is
2858  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2859  * spinlock.
2860  */
2861  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2862 
2863  /*
2864  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2865  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2866  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2867  * didn't wait - i.e. when we're shutting down.
2868  */
2869  WalSndCaughtUp = false;
2870 
2871  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
2872 
2873  /* xlog record was invalid */
2874  if (errm != NULL)
2875  elog(ERROR, "%s", errm);
2876 
2877  if (record != NULL)
2878  {
2879  /*
2880  * Note the lack of any call to LagTrackerWrite() which is handled by
2881  * WalSndUpdateProgress which is called by output plugin through
2882  * logical decoding write api.
2883  */
2885 
2887  }
2888 
2889  /*
2890  * If first time through in this session, initialize flushPtr. Otherwise,
2891  * we only need to update flushPtr if EndRecPtr is past it.
2892  */
2893  if (flushPtr == InvalidXLogRecPtr)
2894  flushPtr = GetFlushRecPtr();
2895  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2896  flushPtr = GetFlushRecPtr();
2897 
2898  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
2899  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2900  WalSndCaughtUp = true;
2901 
2902  /*
2903  * If we're caught up and have been requested to stop, have WalSndLoop()
2904  * terminate the connection in an orderly manner, after writing out all
2905  * the pending data.
2906  */
2908  got_SIGUSR2 = true;
2909 
2910  /* Update shared memory status */
2911  {
2912  WalSnd *walsnd = MyWalSnd;
2913 
2914  SpinLockAcquire(&walsnd->mutex);
2915  walsnd->sentPtr = sentPtr;
2916  SpinLockRelease(&walsnd->mutex);
2917  }
2918 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8569
slock_t mutex
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:271
#define ERROR
Definition: elog.h:46
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:106
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197