PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   15
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static void UpdateSpillStats (LogicalDecodingContext *ctx)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 207 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   15

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 225 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 922 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

923 {
924  const char *snapshot_name = NULL;
925  char xloc[MAXFNAMELEN];
926  char *slot_name;
927  bool reserve_wal = false;
928  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
930  TupOutputState *tstate;
931  TupleDesc tupdesc;
932  Datum values[4];
933  bool nulls[4];
934 
936 
937  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
938 
939  /* setup state for WalSndSegmentOpen */
940  sendTimeLineIsHistoric = false;
942 
943  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
944  {
945  ReplicationSlotCreate(cmd->slotname, false,
947  }
948  else
949  {
951 
952  /*
953  * Initially create persistent slot as ephemeral - that allows us to
954  * nicely handle errors during initialization because it'll get
955  * dropped if this transaction fails. We'll make it persistent at the
956  * end. Temporary slots can be created as temporary from beginning as
957  * they get dropped on error as well.
958  */
959  ReplicationSlotCreate(cmd->slotname, true,
961  }
962 
963  if (cmd->kind == REPLICATION_KIND_LOGICAL)
964  {
966  bool need_full_snapshot = false;
967 
968  /*
969  * Do options check early so that we can bail before calling the
970  * DecodingContextFindStartpoint which can take long time.
971  */
972  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
973  {
974  if (IsTransactionBlock())
975  ereport(ERROR,
976  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
977  (errmsg("%s must not be called inside a transaction",
978  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
979 
980  need_full_snapshot = true;
981  }
982  else if (snapshot_action == CRS_USE_SNAPSHOT)
983  {
984  if (!IsTransactionBlock())
985  ereport(ERROR,
986  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
987  (errmsg("%s must be called inside a transaction",
988  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
989 
991  ereport(ERROR,
992  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
993  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
994  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
995 
996  if (FirstSnapshotSet)
997  ereport(ERROR,
998  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
999  (errmsg("%s must be called before any query",
1000  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1001 
1002  if (IsSubTransaction())
1003  ereport(ERROR,
1004  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1005  (errmsg("%s must not be called in a subtransaction",
1006  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1007 
1008  need_full_snapshot = true;
1009  }
1010 
1011  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1013  XL_ROUTINE(.page_read = logical_read_xlog_page,
1014  .segment_open = WalSndSegmentOpen,
1015  .segment_close = wal_segment_close),
1018 
1019  /*
1020  * Signal that we don't need the timeout mechanism. We're just
1021  * creating the replication slot and don't yet accept feedback
1022  * messages or send keepalives. As we possibly need to wait for
1023  * further WAL the walsender would otherwise possibly be killed too
1024  * soon.
1025  */
1027 
1028  /* build initial snapshot, might take a while */
1030 
1031  /*
1032  * Export or use the snapshot if we've been asked to do so.
1033  *
1034  * NB. We will convert the snapbuild.c kind of snapshot to normal
1035  * snapshot when doing this.
1036  */
1037  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1038  {
1039  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1040  }
1041  else if (snapshot_action == CRS_USE_SNAPSHOT)
1042  {
1043  Snapshot snap;
1044 
1047  }
1048 
1049  /* don't need the decoding context anymore */
1050  FreeDecodingContext(ctx);
1051 
1052  if (!cmd->temporary)
1054  }
1055  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1056  {
1058 
1060 
1061  /* Write this slot to disk if it's a permanent one. */
1062  if (!cmd->temporary)
1064  }
1065 
1066  snprintf(xloc, sizeof(xloc), "%X/%X",
1069 
1071  MemSet(nulls, false, sizeof(nulls));
1072 
1073  /*----------
1074  * Need a tuple descriptor representing four columns:
1075  * - first field: the slot name
1076  * - second field: LSN at which we became consistent
1077  * - third field: exported snapshot's name
1078  * - fourth field: output plugin
1079  *----------
1080  */
1081  tupdesc = CreateTemplateTupleDesc(4);
1082  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1083  TEXTOID, -1, 0);
1084  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1085  TEXTOID, -1, 0);
1086  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1087  TEXTOID, -1, 0);
1088  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1089  TEXTOID, -1, 0);
1090 
1091  /* prepare for projection of tuples */
1092  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1093 
1094  /* slot_name */
1095  slot_name = NameStr(MyReplicationSlot->data.name);
1096  values[0] = CStringGetTextDatum(slot_name);
1097 
1098  /* consistent wal location */
1099  values[1] = CStringGetTextDatum(xloc);
1100 
1101  /* snapshot name, or NULL if none */
1102  if (snapshot_name != NULL)
1103  values[2] = CStringGetTextDatum(snapshot_name);
1104  else
1105  nulls[2] = true;
1106 
1107  /* plugin, or NULL if none */
1108  if (cmd->plugin != NULL)
1109  values[3] = CStringGetTextDatum(cmd->plugin);
1110  else
1111  nulls[3] = true;
1112 
1113  /* send it to dest */
1114  do_tup_output(tstate, values, nulls);
1115  end_tup_output(tstate);
1116 
1118 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:813
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:869
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:38
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2225
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:222
#define MemSet(start, val, len)
Definition: c.h:971
void ReplicationSlotSave(void)
Definition: slot.c:697
ReplicationSlotPersistentData data
Definition: slot.h:143
XLogRecPtr confirmed_flush
Definition: slot.h:91
ReplicationKind kind
Definition: replnodes.h:56
bool IsTransactionBlock(void)
Definition: xact.c:4656
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:631
void ReplicationSlotReserveWal(void)
Definition: slot.c:1056
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:468
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:205
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
void ReplicationSlotPersist(void)
Definition: slot.c:732
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1265
unsigned int uint32
Definition: c.h:367
void ReplicationSlotRelease(void)
Definition: slot.c:476
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:230
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1238
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2458
TimeLineID ThisTimeLineID
Definition: xlog.c:191
#define ereport(elevel,...)
Definition: elog.h:144
struct SnapBuild * snapshot_builder
Definition: logical.h:43
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define Assert(condition)
Definition: c.h:738
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:511
int XactIsoLevel
Definition: xact.c:75
bool IsSubTransaction(void)
Definition: xact.c:4729
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:539
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:824
CRSSnapshotAction
Definition: walsender.h:20
#define NameStr(name)
Definition: c.h:615
#define CStringGetTextDatum(s)
Definition: builtins.h:87
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:815
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
#define snprintf
Definition: port.h:193
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
void ReplicationSlotMarkDirty(void)
Definition: slot.c:715
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1355

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1124 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), SetQueryCompletion(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1125 {
1126  QueryCompletion qc;
1127 
1128  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1129  SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0);
1130  EndCommand(&qc, DestRemote, false);
1131 }
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:569

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1526 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, generate_unaccent_rules::dest, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SetQueryCompletion(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_IDLE, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1527 {
1528  int parse_rc;
1529  Node *cmd_node;
1530  MemoryContext cmd_context;
1531  MemoryContext old_context;
1532  QueryCompletion qc;
1533 
1534  /*
1535  * If WAL sender has been told that shutdown is getting close, switch its
1536  * status accordingly to handle the next replication commands correctly.
1537  */
1538  if (got_STOPPING)
1540 
1541  /*
1542  * Throw error if in stopping mode. We need prevent commands that could
1543  * generate WAL while the shutdown checkpoint is being written. To be
1544  * safe, we just prohibit all new commands.
1545  */
1547  ereport(ERROR,
1548  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1549 
1550  /*
1551  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1552  * command arrives. Clean up the old stuff if there's anything.
1553  */
1555 
1557 
1559  "Replication command context",
1561  old_context = MemoryContextSwitchTo(cmd_context);
1562 
1563  replication_scanner_init(cmd_string);
1564  parse_rc = replication_yyparse();
1565  if (parse_rc != 0)
1566  ereport(ERROR,
1567  (errcode(ERRCODE_SYNTAX_ERROR),
1568  errmsg_internal("replication command parser returned %d",
1569  parse_rc)));
1570 
1571  cmd_node = replication_parse_result;
1572 
1573  /*
1574  * Log replication command if log_replication_commands is enabled. Even
1575  * when it's disabled, log the command with DEBUG1 level for backward
1576  * compatibility. Note that SQL commands are not logged here, and will be
1577  * logged later if log_statement is enabled.
1578  */
1579  if (cmd_node->type != T_SQLCmd)
1581  (errmsg("received replication command: %s", cmd_string)));
1582 
1583  /*
1584  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1585  * called outside of transaction the snapshot should be cleared here.
1586  */
1587  if (!IsTransactionBlock())
1589 
1590  /*
1591  * For aborted transactions, don't allow anything except pure SQL, the
1592  * exec_simple_query() will handle it correctly.
1593  */
1594  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1595  ereport(ERROR,
1596  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1597  errmsg("current transaction is aborted, "
1598  "commands ignored until end of transaction block")));
1599 
1601 
1602  /*
1603  * Allocate buffers that will be used for each outgoing and incoming
1604  * message. We do this just once per command to reduce palloc overhead.
1605  */
1609 
1610  /* Report to pgstat that this process is running */
1612 
1613  switch (cmd_node->type)
1614  {
1615  case T_IdentifySystemCmd:
1616  IdentifySystem();
1617  break;
1618 
1619  case T_BaseBackupCmd:
1620  PreventInTransactionBlock(true, "BASE_BACKUP");
1621  SendBaseBackup((BaseBackupCmd *) cmd_node);
1622  break;
1623 
1626  break;
1627 
1630  break;
1631 
1632  case T_StartReplicationCmd:
1633  {
1634  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1635 
1636  PreventInTransactionBlock(true, "START_REPLICATION");
1637 
1638  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1639  StartReplication(cmd);
1640  else
1642 
1643  Assert(xlogreader != NULL);
1644  break;
1645  }
1646 
1647  case T_TimeLineHistoryCmd:
1648  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1650  break;
1651 
1652  case T_VariableShowStmt:
1653  {
1655  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1656 
1657  /* syscache access needs a transaction environment */
1659  GetPGVariable(n->name, dest);
1661  }
1662  break;
1663 
1664  case T_SQLCmd:
1665  if (MyDatabaseId == InvalidOid)
1666  ereport(ERROR,
1667  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1668 
1669  /* Report to pgstat that this process is now idle */
1671 
1672  /* Tell the caller that this wasn't a WalSender command. */
1673  return false;
1674 
1675  default:
1676  elog(ERROR, "unrecognized replication command node tag: %u",
1677  cmd_node->type);
1678  }
1679 
1680  /* done */
1681  MemoryContextSwitchTo(old_context);
1682  MemoryContextDelete(cmd_context);
1683 
1684  /* Send CommandComplete message */
1685  SetQueryCompletion(&qc, CMDTAG_SELECT, 0);
1686  EndCommand(&qc, DestRemote, true);
1687 
1688  /* Report to pgstat that this process is now idle */
1690 
1691  return true;
1692 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3132
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:465
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1124
void CommitTransactionCommand(void)
Definition: xact.c:2919
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:376
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:529
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8936
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4656
ReplicationKind kind
Definition: replnodes.h:82
#define ERROR
Definition: elog.h:43
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:922
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
WalSndState state
NodeTag type
Definition: nodes.h:531
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:571
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3352
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static XLogReaderState * xlogreader
Definition: walsender.c:137
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1138
Oid MyDatabaseId
Definition: globals.c:85
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg_internal(const char *fmt,...)
Definition: elog.c:911
#define Assert(condition)
Definition: c.h:738
void WalSndSetState(WalSndState state)
Definition: walsender.c:3203
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
void StartTransactionCommand(void)
Definition: xact.c:2818
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:922
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void IdentifySystem(void)
Definition: walsender.c:376
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:692
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2960 of file walsender.c.

References GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2961 {
2962  XLogRecPtr replayPtr;
2963  TimeLineID replayTLI;
2964  XLogRecPtr receivePtr;
2966  XLogRecPtr result;
2967 
2968  /*
2969  * We can safely send what's already been replayed. Also, if walreceiver
2970  * is streaming WAL from the same timeline, we can send anything that it
2971  * has streamed, but hasn't been replayed yet.
2972  */
2973 
2974  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
2975  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2976 
2977  ThisTimeLineID = replayTLI;
2978 
2979  result = replayPtr;
2980  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2981  result = receivePtr;
2982 
2983  return result;
2984 }
uint32 TimeLineID
Definition: xlogdefs.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11467
static TimeLineID receiveTLI
Definition: xlog.c:213
TimeLineID ThisTimeLineID
Definition: xlog.c:191
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3013 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3014 {
3016 
3017  /*
3018  * If replication has not yet started, die like with SIGTERM. If
3019  * replication is active, only set a flag and wake up the main loop. It
3020  * will send any outstanding WAL, wait for it to be replicated to the
3021  * standby, and then exit gracefully.
3022  */
3023  if (!replication_active)
3024  kill(MyProcPid, SIGTERM);
3025  else
3026  got_STOPPING = true;
3027 }
int MyProcPid
Definition: globals.c:40
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:426
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:738

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 376 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

377 {
378  char sysid[32];
379  char xloc[MAXFNAMELEN];
380  XLogRecPtr logptr;
381  char *dbname = NULL;
383  TupOutputState *tstate;
384  TupleDesc tupdesc;
385  Datum values[4];
386  bool nulls[4];
387 
388  /*
389  * Reply with a result set with one row, four columns. First col is system
390  * ID, second is timeline ID, third is current xlog location and the
391  * fourth contains the database name if we are connected to one.
392  */
393 
394  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
396 
399  {
400  /* this also updates ThisTimeLineID */
401  logptr = GetStandbyFlushRecPtr();
402  }
403  else
404  logptr = GetFlushRecPtr();
405 
406  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
407 
408  if (MyDatabaseId != InvalidOid)
409  {
411 
412  /* syscache access needs a transaction env. */
414  /* make dbname live outside TX context */
418  /* CommitTransactionCommand switches to TopMemoryContext */
420  }
421 
423  MemSet(nulls, false, sizeof(nulls));
424 
425  /* need a tuple descriptor representing four columns */
426  tupdesc = CreateTemplateTupleDesc(4);
427  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
428  TEXTOID, -1, 0);
429  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
430  INT4OID, -1, 0);
431  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
432  TEXTOID, -1, 0);
433  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
434  TEXTOID, -1, 0);
435 
436  /* prepare for projection of tuples */
437  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
438 
439  /* column 1: system identifier */
440  values[0] = CStringGetTextDatum(sysid);
441 
442  /* column 2: timeline */
443  values[1] = Int32GetDatum(ThisTimeLineID);
444 
445  /* column 3: wal location */
446  values[2] = CStringGetTextDatum(xloc);
447 
448  /* column 4: database name, or NULL if none */
449  if (dbname)
450  values[3] = CStringGetTextDatum(dbname);
451  else
452  nulls[3] = true;
453 
454  /* send it to dest */
455  do_tup_output(tstate, values, nulls);
456 
457  end_tup_output(tstate);
458 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void CommitTransactionCommand(void)
Definition: xact.c:2919
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:971
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8422
bool RecoveryInProgress(void)
Definition: xlog.c:8071
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
unsigned int uint32
Definition: c.h:367
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:191
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2818
char * dbname
Definition: streamutil.c:50
static Datum values[MAXATTR]
Definition: bootstrap.c:167
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4912
#define Int32GetDatum(X)
Definition: postgres.h:479
#define CStringGetTextDatum(s)
Definition: builtins.h:87
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2960
#define snprintf
Definition: port.h:193
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:410
bool am_cascading_walsender
Definition: walsender.c:116

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2377 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, WalSnd::spillBytes, WalSnd::spillCount, WalSnd::spillTxns, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2378 {
2379  int i;
2380 
2381  /*
2382  * WalSndCtl should be set up already (we inherit this by fork() or
2383  * EXEC_BACKEND mechanism from the postmaster).
2384  */
2385  Assert(WalSndCtl != NULL);
2386  Assert(MyWalSnd == NULL);
2387 
2388  /*
2389  * Find a free walsender slot and reserve it. This must not fail due to
2390  * the prior check for free WAL senders in InitProcess().
2391  */
2392  for (i = 0; i < max_wal_senders; i++)
2393  {
2394  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2395 
2396  SpinLockAcquire(&walsnd->mutex);
2397 
2398  if (walsnd->pid != 0)
2399  {
2400  SpinLockRelease(&walsnd->mutex);
2401  continue;
2402  }
2403  else
2404  {
2405  /*
2406  * Found a free slot. Reserve it for us.
2407  */
2408  walsnd->pid = MyProcPid;
2409  walsnd->state = WALSNDSTATE_STARTUP;
2410  walsnd->sentPtr = InvalidXLogRecPtr;
2411  walsnd->needreload = false;
2412  walsnd->write = InvalidXLogRecPtr;
2413  walsnd->flush = InvalidXLogRecPtr;
2414  walsnd->apply = InvalidXLogRecPtr;
2415  walsnd->writeLag = -1;
2416  walsnd->flushLag = -1;
2417  walsnd->applyLag = -1;
2418  walsnd->sync_standby_priority = 0;
2419  walsnd->latch = &MyProc->procLatch;
2420  walsnd->replyTime = 0;
2421  walsnd->spillTxns = 0;
2422  walsnd->spillCount = 0;
2423  walsnd->spillBytes = 0;
2424  SpinLockRelease(&walsnd->mutex);
2425  /* don't need the lock anymore */
2426  MyWalSnd = (WalSnd *) walsnd;
2427 
2428  break;
2429  }
2430  }
2431 
2432  Assert(MyWalSnd != NULL);
2433 
2434  /* Arrange to clean up at walsender exit */
2436 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:40
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2440
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:111
int64 spillCount
int64 spillBytes
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
Latch * latch
int64 spillTxns
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:738
int sync_standby_priority
bool needreload
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3584 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3585 {
3586  TimestampTz time = 0;
3587 
3588  /* Read all unread samples up to this LSN or end of buffer. */
3589  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3590  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3591  {
3592  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3593  lag_tracker->last_read[head] =
3595  lag_tracker->read_heads[head] =
3597  }
3598 
3599  /*
3600  * If the lag tracker is empty, that means the standby has processed
3601  * everything we've ever sent so we should now clear 'last_read'. If we
3602  * didn't do that, we'd risk using a stale and irrelevant sample for
3603  * interpolation at the beginning of the next burst of WAL after a period
3604  * of idleness.
3605  */
3607  lag_tracker->last_read[head].time = 0;
3608 
3609  if (time > now)
3610  {
3611  /* If the clock somehow went backwards, treat as not found. */
3612  return -1;
3613  }
3614  else if (time == 0)
3615  {
3616  /*
3617  * We didn't cross a time. If there is a future sample that we
3618  * haven't reached yet, and we've already reached at least one sample,
3619  * let's interpolate the local flushed time. This is mainly useful
3620  * for reporting a completely stuck apply position as having
3621  * increasing lag, since otherwise we'd have to wait for it to
3622  * eventually start moving again and cross one of our samples before
3623  * we can show the lag increasing.
3624  */
3626  {
3627  /* There are no future samples, so we can't interpolate. */
3628  return -1;
3629  }
3630  else if (lag_tracker->last_read[head].time != 0)
3631  {
3632  /* We can interpolate between last_read and the next sample. */
3633  double fraction;
3634  WalTimeSample prev = lag_tracker->last_read[head];
3636 
3637  if (lsn < prev.lsn)
3638  {
3639  /*
3640  * Reported LSNs shouldn't normally go backwards, but it's
3641  * possible when there is a timeline change. Treat as not
3642  * found.
3643  */
3644  return -1;
3645  }
3646 
3647  Assert(prev.lsn < next.lsn);
3648 
3649  if (prev.time > next.time)
3650  {
3651  /* If the clock somehow went backwards, treat as not found. */
3652  return -1;
3653  }
3654 
3655  /* See how far we are between the previous and next samples. */
3656  fraction =
3657  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3658 
3659  /* Scale the local flush time proportionally. */
3660  time = (TimestampTz)
3661  ((double) prev.time + (next.time - prev.time) * fraction);
3662  }
3663  else
3664  {
3665  /*
3666  * We have only a future sample, implying that we were entirely
3667  * caught up but and now there is a new burst of WAL and the
3668  * standby hasn't processed the first sample yet. Until the
3669  * standby reaches the future sample the best we can do is report
3670  * the hypothetical lag if that sample were to be replayed now.
3671  */
3672  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3673  }
3674  }
3675 
3676  /* Return the elapsed time since local flush time in microseconds. */
3677  Assert(time != 0);
3678  return now - time;
3679 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
static int32 next
Definition: blutils.c:218
int write_head
Definition: walsender.c:214
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
XLogRecPtr lsn
Definition: walsender.c:202
#define Assert(condition)
Definition: c.h:738
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3519 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3520 {
3521  bool buffer_full;
3522  int new_write_head;
3523  int i;
3524 
3525  if (!am_walsender)
3526  return;
3527 
3528  /*
3529  * If the lsn hasn't advanced since last time, then do nothing. This way
3530  * we only record a new sample when new WAL has been written.
3531  */
3532  if (lag_tracker->last_lsn == lsn)
3533  return;
3534  lag_tracker->last_lsn = lsn;
3535 
3536  /*
3537  * If advancing the write head of the circular buffer would crash into any
3538  * of the read heads, then the buffer is full. In other words, the
3539  * slowest reader (presumably apply) is the one that controls the release
3540  * of space.
3541  */
3542  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3543  buffer_full = false;
3544  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3545  {
3546  if (new_write_head == lag_tracker->read_heads[i])
3547  buffer_full = true;
3548  }
3549 
3550  /*
3551  * If the buffer is full, for now we just rewind by one slot and overwrite
3552  * the last sample, as a simple (if somewhat uneven) way to lower the
3553  * sampling rate. There may be better adaptive compaction algorithms.
3554  */
3555  if (buffer_full)
3556  {
3557  new_write_head = lag_tracker->write_head;
3558  if (lag_tracker->write_head > 0)
3560  else
3562  }
3563 
3564  /* Store a sample at the current write head position. */
3566  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3567  lag_tracker->write_head = new_write_head;
3568 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
int write_head
Definition: walsender.c:214
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
bool am_walsender
Definition: walsender.c:115
XLogRecPtr lsn
Definition: walsender.c:202
XLogRecPtr last_lsn
Definition: walsender.c:212
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 815 of file walsender.c.

References CheckXLogRemoved(), XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

817 {
818  XLogRecPtr flushptr;
819  int count;
820  WALReadError errinfo;
821  XLogSegNo segno;
822 
823  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
825  sendTimeLine = state->currTLI;
827  sendTimeLineNextTLI = state->nextTLI;
828 
829  /* make sure we have enough WAL available */
830  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
831 
832  /* fail if not (implies we are going to shut down) */
833  if (flushptr < targetPagePtr + reqLen)
834  return -1;
835 
836  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
837  count = XLOG_BLCKSZ; /* more than one block available */
838  else
839  count = flushptr - targetPagePtr; /* part of the page available */
840 
841  /* now actually read the data, we know it's there */
842  if (!WALRead(state,
843  cur_page,
844  targetPagePtr,
845  XLOG_BLCKSZ,
846  state->seg.ws_tli, /* Pass the current TLI because only
847  * WalSndSegmentOpen controls whether new
848  * TLI is needed. */
849  &errinfo))
850  WALReadRaiseError(&errinfo);
851 
852  /*
853  * After reading into the buffer, check that what we read was valid. We do
854  * this after reading, because even though the segment was present when we
855  * opened it, it might get recycled or removed while we read it. The
856  * read() succeeds in that case, but the data we tried to read might
857  * already have been overwritten with new WAL records.
858  */
859  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
860  CheckXLogRemoved(segno, state->seg.ws_tli);
861 
862  return count;
863 }
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:959
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:690
WALOpenSegment seg
Definition: xlogreader.h:213
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3927
uint64 XLogSegNo
Definition: xlogdefs.h:41
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:236
TimeLineID nextTLI
Definition: xlogreader.h:242
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1386
TimeLineID ThisTimeLineID
Definition: xlog.c:191
TimeLineID currTLI
Definition: xlogreader.h:226
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
TimeLineID ws_tli
Definition: xlogreader.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1062
WALSegmentContext segcxt
Definition: xlogreader.h:212
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3241 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3242 {
3243  Interval *result = palloc(sizeof(Interval));
3244 
3245  result->month = 0;
3246  result->day = 0;
3247  result->time = offset;
3248 
3249  return result;
3250 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:949

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 869 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

872 {
873  ListCell *lc;
874  bool snapshot_action_given = false;
875  bool reserve_wal_given = false;
876 
877  /* Parse options */
878  foreach(lc, cmd->options)
879  {
880  DefElem *defel = (DefElem *) lfirst(lc);
881 
882  if (strcmp(defel->defname, "export_snapshot") == 0)
883  {
884  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
885  ereport(ERROR,
886  (errcode(ERRCODE_SYNTAX_ERROR),
887  errmsg("conflicting or redundant options")));
888 
889  snapshot_action_given = true;
890  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
892  }
893  else if (strcmp(defel->defname, "use_snapshot") == 0)
894  {
895  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
896  ereport(ERROR,
897  (errcode(ERRCODE_SYNTAX_ERROR),
898  errmsg("conflicting or redundant options")));
899 
900  snapshot_action_given = true;
901  *snapshot_action = CRS_USE_SNAPSHOT;
902  }
903  else if (strcmp(defel->defname, "reserve_wal") == 0)
904  {
905  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
906  ereport(ERROR,
907  (errcode(ERRCODE_SYNTAX_ERROR),
908  errmsg("conflicting or redundant options")));
909 
910  reserve_wal_given = true;
911  *reserve_wal = true;
912  }
913  else
914  elog(ERROR, "unrecognized option: %s", defel->defname);
915  }
916 }
int errcode(int sqlerrcode)
Definition: elog.c:610
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel,...)
Definition: elog.h:144
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
char * defname
Definition: parsenodes.h:732

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3257 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, Int64GetDatum(), IntervalPGetDatum, is_member_of_role(), IsA, LSNGetDatum, max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, WalSnd::spillBytes, WalSnd::spillCount, WalSnd::spillTxns, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3258 {
3259 #define PG_STAT_GET_WAL_SENDERS_COLS 15
3260  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3261  TupleDesc tupdesc;
3262  Tuplestorestate *tupstore;
3263  MemoryContext per_query_ctx;
3264  MemoryContext oldcontext;
3265  SyncRepStandbyData *sync_standbys;
3266  int num_standbys;
3267  int i;
3268 
3269  /* check to see if caller supports us returning a tuplestore */
3270  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3271  ereport(ERROR,
3272  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3273  errmsg("set-valued function called in context that cannot accept a set")));
3274  if (!(rsinfo->allowedModes & SFRM_Materialize))
3275  ereport(ERROR,
3276  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3277  errmsg("materialize mode required, but it is not allowed in this context")));
3278 
3279  /* Build a tuple descriptor for our result type */
3280  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3281  elog(ERROR, "return type must be a row type");
3282 
3283  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3284  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3285 
3286  tupstore = tuplestore_begin_heap(true, false, work_mem);
3287  rsinfo->returnMode = SFRM_Materialize;
3288  rsinfo->setResult = tupstore;
3289  rsinfo->setDesc = tupdesc;
3290 
3291  MemoryContextSwitchTo(oldcontext);
3292 
3293  /*
3294  * Get the currently active synchronous standbys. This could be out of
3295  * date before we're done, but we'll use the data anyway.
3296  */
3297  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3298 
3299  for (i = 0; i < max_wal_senders; i++)
3300  {
3301  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3303  XLogRecPtr write;
3304  XLogRecPtr flush;
3305  XLogRecPtr apply;
3306  TimeOffset writeLag;
3307  TimeOffset flushLag;
3308  TimeOffset applyLag;
3309  int priority;
3310  int pid;
3312  TimestampTz replyTime;
3313  int64 spillTxns;
3314  int64 spillCount;
3315  int64 spillBytes;
3316  bool is_sync_standby;
3318  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3319  int j;
3320 
3321  /* Collect data from shared memory */
3322  SpinLockAcquire(&walsnd->mutex);
3323  if (walsnd->pid == 0)
3324  {
3325  SpinLockRelease(&walsnd->mutex);
3326  continue;
3327  }
3328  pid = walsnd->pid;
3329  sentPtr = walsnd->sentPtr;
3330  state = walsnd->state;
3331  write = walsnd->write;
3332  flush = walsnd->flush;
3333  apply = walsnd->apply;
3334  writeLag = walsnd->writeLag;
3335  flushLag = walsnd->flushLag;
3336  applyLag = walsnd->applyLag;
3337  priority = walsnd->sync_standby_priority;
3338  replyTime = walsnd->replyTime;
3339  spillTxns = walsnd->spillTxns;
3340  spillCount = walsnd->spillCount;
3341  spillBytes = walsnd->spillBytes;
3342  SpinLockRelease(&walsnd->mutex);
3343 
3344  /*
3345  * Detect whether walsender is/was considered synchronous. We can
3346  * provide some protection against stale data by checking the PID
3347  * along with walsnd_index.
3348  */
3349  is_sync_standby = false;
3350  for (j = 0; j < num_standbys; j++)
3351  {
3352  if (sync_standbys[j].walsnd_index == i &&
3353  sync_standbys[j].pid == pid)
3354  {
3355  is_sync_standby = true;
3356  break;
3357  }
3358  }
3359 
3360  memset(nulls, 0, sizeof(nulls));
3361  values[0] = Int32GetDatum(pid);
3362 
3363  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3364  {
3365  /*
3366  * Only superusers and members of pg_read_all_stats can see
3367  * details. Other users only get the pid value to know it's a
3368  * walsender, but no details.
3369  */
3370  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3371  }
3372  else
3373  {
3374  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3375 
3376  if (XLogRecPtrIsInvalid(sentPtr))
3377  nulls[2] = true;
3378  values[2] = LSNGetDatum(sentPtr);
3379 
3380  if (XLogRecPtrIsInvalid(write))
3381  nulls[3] = true;
3382  values[3] = LSNGetDatum(write);
3383 
3384  if (XLogRecPtrIsInvalid(flush))
3385  nulls[4] = true;
3386  values[4] = LSNGetDatum(flush);
3387 
3388  if (XLogRecPtrIsInvalid(apply))
3389  nulls[5] = true;
3390  values[5] = LSNGetDatum(apply);
3391 
3392  /*
3393  * Treat a standby such as a pg_basebackup background process
3394  * which always returns an invalid flush location, as an
3395  * asynchronous standby.
3396  */
3397  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3398 
3399  if (writeLag < 0)
3400  nulls[6] = true;
3401  else
3402  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3403 
3404  if (flushLag < 0)
3405  nulls[7] = true;
3406  else
3407  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3408 
3409  if (applyLag < 0)
3410  nulls[8] = true;
3411  else
3412  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3413 
3414  values[9] = Int32GetDatum(priority);
3415 
3416  /*
3417  * More easily understood version of standby state. This is purely
3418  * informational.
3419  *
3420  * In quorum-based sync replication, the role of each standby
3421  * listed in synchronous_standby_names can be changing very
3422  * frequently. Any standbys considered as "sync" at one moment can
3423  * be switched to "potential" ones at the next moment. So, it's
3424  * basically useless to report "sync" or "potential" as their sync
3425  * states. We report just "quorum" for them.
3426  */
3427  if (priority == 0)
3428  values[10] = CStringGetTextDatum("async");
3429  else if (is_sync_standby)
3431  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3432  else
3433  values[10] = CStringGetTextDatum("potential");
3434 
3435  if (replyTime == 0)
3436  nulls[11] = true;
3437  else
3438  values[11] = TimestampTzGetDatum(replyTime);
3439 
3440  /* spill to disk */
3441  values[12] = Int64GetDatum(spillTxns);
3442  values[13] = Int64GetDatum(spillCount);
3443  values[14] = Int64GetDatum(spillBytes);
3444  }
3445 
3446  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3447  }
3448 
3449  /* clean up and return the tuplestore */
3450  tuplestore_donestoring(tupstore);
3451 
3452  return (Datum) 0;
3453 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:205
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:448
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:69
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:971
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
int64 spillCount
int64 spillBytes
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
int64 spillTxns
WalSndState state
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1701
int max_wal_senders
Definition: walsender.c:121
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3222
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:121
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define ereport(elevel,...)
Definition: elog.h:144
int allowedModes
Definition: execnodes.h:305
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4916
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:310
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:714
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:303
#define Int32GetDatum(X)
Definition: postgres.h:479
TupleDesc setDesc
Definition: execnodes.h:311
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:87
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3241
XLogRecPtr apply

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1834 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1835 {
1836  bool changed = false;
1838 
1839  Assert(lsn != InvalidXLogRecPtr);
1840  SpinLockAcquire(&slot->mutex);
1841  if (slot->data.restart_lsn != lsn)
1842  {
1843  changed = true;
1844  slot->data.restart_lsn = lsn;
1845  }
1846  SpinLockRelease(&slot->mutex);
1847 
1848  if (changed)
1849  {
1852  }
1853 
1854  /*
1855  * One could argue that the slot should be saved to disk now, but that'd
1856  * be energy wasted - the worst lost information can do here is give us
1857  * wrong information in a statistics view - we'll just potentially be more
1858  * conservative in removing files.
1859  */
1860 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:143
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:804
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:738
XLogRecPtr restart_lsn
Definition: slot.h:80
slock_t mutex
Definition: slot.h:116
void ReplicationSlotMarkDirty(void)
Definition: slot.c:715

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1971 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1972 {
1973  bool changed = false;
1975 
1976  SpinLockAcquire(&slot->mutex);
1978 
1979  /*
1980  * For physical replication we don't need the interlock provided by xmin
1981  * and effective_xmin since the consequences of a missed increase are
1982  * limited to query cancellations, so set both at once.
1983  */
1984  if (!TransactionIdIsNormal(slot->data.xmin) ||
1985  !TransactionIdIsNormal(feedbackXmin) ||
1986  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1987  {
1988  changed = true;
1989  slot->data.xmin = feedbackXmin;
1990  slot->effective_xmin = feedbackXmin;
1991  }
1992  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1993  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1994  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1995  {
1996  changed = true;
1997  slot->data.catalog_xmin = feedbackCatalogXmin;
1998  slot->effective_catalog_xmin = feedbackCatalogXmin;
1999  }
2000  SpinLockRelease(&slot->mutex);
2001 
2002  if (changed)
2003  {
2006  }
2007 }
TransactionId xmin
Definition: proc.h:235
ReplicationSlotPersistentData data
Definition: slot.h:143
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:139
TransactionId catalog_xmin
Definition: slot.h:77
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:69
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:140
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:116
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:754
void ReplicationSlotMarkDirty(void)
Definition: slot.c:715

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1699 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1700 {
1701  unsigned char firstchar;
1702  int r;
1703  bool received = false;
1704 
1706 
1707  for (;;)
1708  {
1709  pq_startmsgread();
1710  r = pq_getbyte_if_available(&firstchar);
1711  if (r < 0)
1712  {
1713  /* unexpected error or EOF */
1715  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1716  errmsg("unexpected EOF on standby connection")));
1717  proc_exit(0);
1718  }
1719  if (r == 0)
1720  {
1721  /* no data available without blocking */
1722  pq_endmsgread();
1723  break;
1724  }
1725 
1726  /* Read the message contents */
1728  if (pq_getmessage(&reply_message, 0))
1729  {
1731  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1732  errmsg("unexpected EOF on standby connection")));
1733  proc_exit(0);
1734  }
1735 
1736  /*
1737  * If we already received a CopyDone from the frontend, the frontend
1738  * should not send us anything until we've closed our end of the COPY.
1739  * XXX: In theory, the frontend could already send the next command
1740  * before receiving the CopyDone, but libpq doesn't currently allow
1741  * that.
1742  */
1743  if (streamingDoneReceiving && firstchar != 'X')
1744  ereport(FATAL,
1745  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1746  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1747  firstchar)));
1748 
1749  /* Handle the very limited subset of commands expected in this phase */
1750  switch (firstchar)
1751  {
1752  /*
1753  * 'd' means a standby reply wrapped in a CopyData packet.
1754  */
1755  case 'd':
1757  received = true;
1758  break;
1759 
1760  /*
1761  * CopyDone means the standby requested to finish streaming.
1762  * Reply with CopyDone, if we had not sent that already.
1763  */
1764  case 'c':
1765  if (!streamingDoneSending)
1766  {
1767  pq_putmessage_noblock('c', NULL, 0);
1768  streamingDoneSending = true;
1769  }
1770 
1771  streamingDoneReceiving = true;
1772  received = true;
1773  break;
1774 
1775  /*
1776  * 'X' means that the standby is closing down the socket.
1777  */
1778  case 'X':
1779  proc_exit(0);
1780 
1781  default:
1782  ereport(FATAL,
1783  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1784  errmsg("invalid standby message type \"%c\"",
1785  firstchar)));
1786  }
1787  }
1788 
1789  /*
1790  * Save the last reply timestamp if we've received at least one reply.
1791  */
1792  if (received)
1793  {
1795  waiting_for_ping_response = false;
1796  }
1797 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1803
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
static TimestampTz last_processing
Definition: walsender.c:162
void pq_startmsgread(void)
Definition: pqcomm.c:1197
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1027
static bool streamingDoneSending
Definition: walsender.c:179
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1259
static StringInfoData reply_message
Definition: walsender.c:158
void pq_endmsgread(void)
Definition: pqcomm.c:1221
#define ereport(elevel,...)
Definition: elog.h:144
static bool streamingDoneReceiving
Definition: walsender.c:180
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2051 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, log_min_messages, WalSnd::mutex, MyPgXact, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

2052 {
2053  TransactionId feedbackXmin;
2054  uint32 feedbackEpoch;
2055  TransactionId feedbackCatalogXmin;
2056  uint32 feedbackCatalogEpoch;
2057  TimestampTz replyTime;
2058 
2059  /*
2060  * Decipher the reply message. The caller already consumed the msgtype
2061  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2062  * of this message.
2063  */
2064  replyTime = pq_getmsgint64(&reply_message);
2065  feedbackXmin = pq_getmsgint(&reply_message, 4);
2066  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2067  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2068  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2069 
2070  if (log_min_messages <= DEBUG2)
2071  {
2072  char *replyTimeStr;
2073 
2074  /* Copy because timestamptz_to_str returns a static buffer */
2075  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2076 
2077  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2078  feedbackXmin,
2079  feedbackEpoch,
2080  feedbackCatalogXmin,
2081  feedbackCatalogEpoch,
2082  replyTimeStr);
2083 
2084  pfree(replyTimeStr);
2085  }
2086 
2087  /*
2088  * Update shared state for this WalSender process based on reply data from
2089  * standby.
2090  */
2091  {
2092  WalSnd *walsnd = MyWalSnd;
2093 
2094  SpinLockAcquire(&walsnd->mutex);
2095  walsnd->replyTime = replyTime;
2096  SpinLockRelease(&walsnd->mutex);
2097  }
2098 
2099  /*
2100  * Unset WalSender's xmins if the feedback message values are invalid.
2101  * This happens when the downstream turned hot_standby_feedback off.
2102  */
2103  if (!TransactionIdIsNormal(feedbackXmin)
2104  && !TransactionIdIsNormal(feedbackCatalogXmin))
2105  {
2107  if (MyReplicationSlot != NULL)
2108  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2109  return;
2110  }
2111 
2112  /*
2113  * Check that the provided xmin/epoch are sane, that is, not in the future
2114  * and not so far back as to be already wrapped around. Ignore if not.
2115  */
2116  if (TransactionIdIsNormal(feedbackXmin) &&
2117  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2118  return;
2119 
2120  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2121  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2122  return;
2123 
2124  /*
2125  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2126  * the xmin will be taken into account by GetOldestXmin. This will hold
2127  * back the removal of dead rows and thereby prevent the generation of
2128  * cleanup conflicts on the standby server.
2129  *
2130  * There is a small window for a race condition here: although we just
2131  * checked that feedbackXmin precedes nextXid, the nextXid could have
2132  * gotten advanced between our fetching it and applying the xmin below,
2133  * perhaps far enough to make feedbackXmin wrap around. In that case the
2134  * xmin we set here would be "in the future" and have no effect. No point
2135  * in worrying about this since it's too late to save the desired data
2136  * anyway. Assuming that the standby sends us an increasing sequence of
2137  * xmins, this could only happen during the first reply cycle, else our
2138  * own xmin would prevent nextXid from advancing so far.
2139  *
2140  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2141  * is assumed atomic, and there's no real need to prevent a concurrent
2142  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2143  * safe, and if we're moving it backwards, well, the data is at risk
2144  * already since a VACUUM could have just finished calling GetOldestXmin.)
2145  *
2146  * If we're using a replication slot we reserve the xmin via that,
2147  * otherwise via the walsender's PGXACT entry. We can only track the
2148  * catalog xmin separately when using a slot, so we store the least of the
2149  * two provided when not using a slot.
2150  *
2151  * XXX: It might make sense to generalize the ephemeral slot concept and
2152  * always use the slot mechanism to handle the feedback xmin.
2153  */
2154  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2155  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2156  else
2157  {
2158  if (TransactionIdIsNormal(feedbackCatalogXmin)
2159  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2160  MyPgXact->xmin = feedbackCatalogXmin;
2161  else
2162  MyPgXact->xmin = feedbackXmin;
2163  }
2164 }
uint32 TransactionId
Definition: c.h:513
TransactionId xmin
Definition: proc.h:235
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
slock_t mutex
PGXACT * MyPgXact
Definition: proc.c:68
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2020
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1056
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:367
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
int log_min_messages
Definition: guc.c:534
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:214
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1971
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1736

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1803 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1804 {
1805  char msgtype;
1806 
1807  /*
1808  * Check message type from the first byte.
1809  */
1810  msgtype = pq_getmsgbyte(&reply_message);
1811 
1812  switch (msgtype)
1813  {
1814  case 'r':
1816  break;
1817 
1818  case 'h':
1820  break;
1821 
1822  default:
1824  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1825  errmsg("unexpected message type \"%c\"", msgtype)));
1826  proc_exit(0);
1827  }
1828 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
#define COMMERROR
Definition: elog.h:30
static StringInfoData reply_message
Definition: walsender.c:158
#define ereport(elevel,...)
Definition: elog.h:144
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1866
int errmsg(const char *fmt,...)
Definition: elog.c:824
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2051

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1866 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), log_min_messages, LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1867 {
1868  XLogRecPtr writePtr,
1869  flushPtr,
1870  applyPtr;
1871  bool replyRequested;
1872  TimeOffset writeLag,
1873  flushLag,
1874  applyLag;
1875  bool clearLagTimes;
1876  TimestampTz now;
1877  TimestampTz replyTime;
1878 
1879  static bool fullyAppliedLastTime = false;
1880 
1881  /* the caller already consumed the msgtype byte */
1882  writePtr = pq_getmsgint64(&reply_message);
1883  flushPtr = pq_getmsgint64(&reply_message);
1884  applyPtr = pq_getmsgint64(&reply_message);
1885  replyTime = pq_getmsgint64(&reply_message);
1886  replyRequested = pq_getmsgbyte(&reply_message);
1887 
1888  if (log_min_messages <= DEBUG2)
1889  {
1890  char *replyTimeStr;
1891 
1892  /* Copy because timestamptz_to_str returns a static buffer */
1893  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1894 
1895  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1896  (uint32) (writePtr >> 32), (uint32) writePtr,
1897  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1898  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1899  replyRequested ? " (reply requested)" : "",
1900  replyTimeStr);
1901 
1902  pfree(replyTimeStr);
1903  }
1904 
1905  /* See if we can compute the round-trip lag for these positions. */
1906  now = GetCurrentTimestamp();
1907  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1908  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1909  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1910 
1911  /*
1912  * If the standby reports that it has fully replayed the WAL in two
1913  * consecutive reply messages, then the second such message must result
1914  * from wal_receiver_status_interval expiring on the standby. This is a
1915  * convenient time to forget the lag times measured when it last
1916  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1917  * until more WAL traffic arrives.
1918  */
1919  clearLagTimes = false;
1920  if (applyPtr == sentPtr)
1921  {
1922  if (fullyAppliedLastTime)
1923  clearLagTimes = true;
1924  fullyAppliedLastTime = true;
1925  }
1926  else
1927  fullyAppliedLastTime = false;
1928 
1929  /* Send a reply if the standby requested one. */
1930  if (replyRequested)
1931  WalSndKeepalive(false);
1932 
1933  /*
1934  * Update shared state for this WalSender process based on reply data from
1935  * standby.
1936  */
1937  {
1938  WalSnd *walsnd = MyWalSnd;
1939 
1940  SpinLockAcquire(&walsnd->mutex);
1941  walsnd->write = writePtr;
1942  walsnd->flush = flushPtr;
1943  walsnd->apply = applyPtr;
1944  if (writeLag != -1 || clearLagTimes)
1945  walsnd->writeLag = writeLag;
1946  if (flushLag != -1 || clearLagTimes)
1947  walsnd->flushLag = flushLag;
1948  if (applyLag != -1 || clearLagTimes)
1949  walsnd->applyLag = applyLag;
1950  walsnd->replyTime = replyTime;
1951  SpinLockRelease(&walsnd->mutex);
1952  }
1953 
1956 
1957  /*
1958  * Advance our local xmin horizon when the client confirmed a flush.
1959  */
1960  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1961  {
1964  else
1966  }
1967 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1056
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3461
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1834
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:367
#define SlotIsLogical(slot)
Definition: slot.h:165
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3584
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
int log_min_messages
Definition: guc.c:534
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1012
#define elog(elevel,...)
Definition: elog.h:214
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:429
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
bool am_cascading_walsender
Definition: walsender.c:116
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1736

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 465 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

466 {
468  char histfname[MAXFNAMELEN];
469  char path[MAXPGPATH];
470  int fd;
471  off_t histfilelen;
472  off_t bytesleft;
473  Size len;
474 
475  /*
476  * Reply with a result set with one row, and two columns. The first col is
477  * the name of the history file, 2nd is the contents.
478  */
479 
480  TLHistoryFileName(histfname, cmd->timeline);
481  TLHistoryFilePath(path, cmd->timeline);
482 
483  /* Send a RowDescription message */
484  pq_beginmessage(&buf, 'T');
485  pq_sendint16(&buf, 2); /* 2 fields */
486 
487  /* first field */
488  pq_sendstring(&buf, "filename"); /* col name */
489  pq_sendint32(&buf, 0); /* table oid */
490  pq_sendint16(&buf, 0); /* attnum */
491  pq_sendint32(&buf, TEXTOID); /* type oid */
492  pq_sendint16(&buf, -1); /* typlen */
493  pq_sendint32(&buf, 0); /* typmod */
494  pq_sendint16(&buf, 0); /* format code */
495 
496  /* second field */
497  pq_sendstring(&buf, "content"); /* col name */
498  pq_sendint32(&buf, 0); /* table oid */
499  pq_sendint16(&buf, 0); /* attnum */
500  pq_sendint32(&buf, BYTEAOID); /* type oid */
501  pq_sendint16(&buf, -1); /* typlen */
502  pq_sendint32(&buf, 0); /* typmod */
503  pq_sendint16(&buf, 0); /* format code */
504  pq_endmessage(&buf);
505 
506  /* Send a DataRow message */
507  pq_beginmessage(&buf, 'D');
508  pq_sendint16(&buf, 2); /* # of columns */
509  len = strlen(histfname);
510  pq_sendint32(&buf, len); /* col1 len */
511  pq_sendbytes(&buf, histfname, len);
512 
513  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
514  if (fd < 0)
515  ereport(ERROR,
517  errmsg("could not open file \"%s\": %m", path)));
518 
519  /* Determine file length and send it to client */
520  histfilelen = lseek(fd, 0, SEEK_END);
521  if (histfilelen < 0)
522  ereport(ERROR,
524  errmsg("could not seek to end of file \"%s\": %m", path)));
525  if (lseek(fd, 0, SEEK_SET) != 0)
526  ereport(ERROR,
528  errmsg("could not seek to beginning of file \"%s\": %m", path)));
529 
530  pq_sendint32(&buf, histfilelen); /* col2 len */
531 
532  bytesleft = histfilelen;
533  while (bytesleft > 0)
534  {
535  PGAlignedBlock rbuf;
536  int nread;
537 
539  nread = read(fd, rbuf.data, sizeof(rbuf));
541  if (nread < 0)
542  ereport(ERROR,
544  errmsg("could not read file \"%s\": %m",
545  path)));
546  else if (nread == 0)
547  ereport(ERROR,
549  errmsg("could not read file \"%s\": read %d of %zu",
550  path, nread, (Size) bytesleft)));
551 
552  pq_sendbytes(&buf, rbuf.data, nread);
553  bytesleft -= nread;
554  }
555 
556  if (CloseTransientFile(fd) != 0)
557  ereport(ERROR,
559  errmsg("could not close file \"%s\": %m", path)));
560 
561  pq_endmessage(&buf);
562 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
int errcode(int sqlerrcode)
Definition: elog.c:610
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1233
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1104
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2372
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:67
int errcode_for_file_access(void)
Definition: elog.c:633
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1381
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int CloseTransientFile(int fd)
Definition: fd.c:2549
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:144
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1357
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:824
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1138 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, SAB_Error, WalSnd::sentPtr, sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), XLogRecPtrIsInvalid, and XLogSendLogical().

Referenced by exec_replication_command().

1139 {
1141  QueryCompletion qc;
1142 
1143  /* make sure that our requirements are still fulfilled */
1145 
1147 
1149 
1151  ereport(ERROR,
1152  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1153  errmsg("cannot read from logical replication slot \"%s\"",
1154  cmd->slotname),
1155  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1156 
1157  /*
1158  * Force a disconnect, so that the decoding code doesn't need to care
1159  * about an eventual switch from running in recovery, to running in a
1160  * normal environment. Client code is expected to handle reconnects.
1161  */
1163  {
1164  ereport(LOG,
1165  (errmsg("terminating walsender process after promotion")));
1166  got_STOPPING = true;
1167  }
1168 
1169  /*
1170  * Create our decoding context, making it start at the previously ack'ed
1171  * position.
1172  *
1173  * Do this before sending a CopyBothResponse message, so that any errors
1174  * are reported early.
1175  */
1177  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1178  XL_ROUTINE(.page_read = logical_read_xlog_page,
1179  .segment_open = WalSndSegmentOpen,
1180  .segment_close = wal_segment_close),
1184 
1186 
1187  /* Send a CopyBothResponse message, and start streaming */
1188  pq_beginmessage(&buf, 'W');
1189  pq_sendbyte(&buf, 0);
1190  pq_sendint16(&buf, 0);
1191  pq_endmessage(&buf);
1192  pq_flush();
1193 
1194  /* Start reading WAL from the oldest required WAL. */
1197 
1198  /*
1199  * Report the location after which we'll send out further commits as the
1200  * current sentPtr.
1201  */
1203 
1204  /* Also update the sent position status in shared memory */
1208 
1209  replication_active = true;
1210 
1212 
1213  /* Main loop of walsender */
1215 
1218 
1219  replication_active = false;
1220  if (got_STOPPING)
1221  proc_exit(0);
1223 
1224  /* Get out of COPY mode (CommandComplete). */
1225  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1226  EndCommand(&qc, DestRemote, false);
1227 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:85
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:813
#define pq_flush()
Definition: libpq.h:39
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
ReplicationSlotPersistentData data
Definition: slot.h:143
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8071
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:91
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:376
static char * buf
Definition: pg_test_fsync.c:67
int errdetail(const char *fmt,...)
Definition: elog.c:957
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:240
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1265
void ReplicationSlotRelease(void)
Definition: slot.c:476
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:400
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2251
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1238
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2458
#define ereport(elevel,...)
Definition: elog.h:144
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:738
void WalSndSetState(WalSndState state)
Definition: walsender.c:3203
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:511
static void XLogSendLogical(void)
Definition: walsender.c:2837
XLogRecPtr restart_lsn
Definition: slot.h:80
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
int errmsg(const char *fmt,...)
Definition: elog.c:824
XLogReaderState * reader
Definition: logical.h:41
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:367
Definition: slot.h:42
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:815
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
bool am_cascading_walsender
Definition: walsender.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1355

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 571 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), SAB_Error, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

572 {
574  XLogRecPtr FlushPtr;
575 
576  if (ThisTimeLineID == 0)
577  ereport(ERROR,
578  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
579  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
580 
581  /* create xlogreader for physical replication */
582  xlogreader =
584  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
585  .segment_close = wal_segment_close),
586  NULL);
587 
588  if (!xlogreader)
589  ereport(ERROR,
590  (errcode(ERRCODE_OUT_OF_MEMORY),
591  errmsg("out of memory")));
592 
593  /*
594  * We assume here that we're logging enough information in the WAL for
595  * log-shipping, since this is checked in PostmasterMain().
596  *
597  * NOTE: wal_level can only change at shutdown, so in most cases it is
598  * difficult for there to be WAL data that we can still see that was
599  * written at wal_level='minimal'.
600  */
601 
602  if (cmd->slotname)
603  {
606  ereport(ERROR,
607  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
608  errmsg("cannot use a logical replication slot for physical replication")));
609 
610  /*
611  * We don't need to verify the slot's restart_lsn here; instead we
612  * rely on the caller requesting the starting point to use. If the
613  * WAL segment doesn't exist, we'll fail later.
614  */
615  }
616 
617  /*
618  * Select the timeline. If it was given explicitly by the client, use
619  * that. Otherwise use the timeline of the last replayed record, which is
620  * kept in ThisTimeLineID.
621  */
623  {
624  /* this also updates ThisTimeLineID */
625  FlushPtr = GetStandbyFlushRecPtr();
626  }
627  else
628  FlushPtr = GetFlushRecPtr();
629 
630  if (cmd->timeline != 0)
631  {
632  XLogRecPtr switchpoint;
633 
634  sendTimeLine = cmd->timeline;
636  {
637  sendTimeLineIsHistoric = false;
639  }
640  else
641  {
642  List *timeLineHistory;
643 
644  sendTimeLineIsHistoric = true;
645 
646  /*
647  * Check that the timeline the client requested exists, and the
648  * requested start location is on that timeline.
649  */
650  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
651  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
653  list_free_deep(timeLineHistory);
654 
655  /*
656  * Found the requested timeline in the history. Check that
657  * requested startpoint is on that timeline in our history.
658  *
659  * This is quite loose on purpose. We only check that we didn't
660  * fork off the requested timeline before the switchpoint. We
661  * don't check that we switched *to* it before the requested
662  * starting point. This is because the client can legitimately
663  * request to start replication from the beginning of the WAL
664  * segment that contains switchpoint, but on the new timeline, so
665  * that it doesn't end up with a partial segment. If you ask for
666  * too old a starting point, you'll get an error later when we
667  * fail to find the requested WAL segment in pg_wal.
668  *
669  * XXX: we could be more strict here and only allow a startpoint
670  * that's older than the switchpoint, if it's still in the same
671  * WAL segment.
672  */
673  if (!XLogRecPtrIsInvalid(switchpoint) &&
674  switchpoint < cmd->startpoint)
675  {
676  ereport(ERROR,
677  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
678  (uint32) (cmd->startpoint >> 32),
679  (uint32) (cmd->startpoint),
680  cmd->timeline),
681  errdetail("This server's history forked from timeline %u at %X/%X.",
682  cmd->timeline,
683  (uint32) (switchpoint >> 32),
684  (uint32) (switchpoint))));
685  }
686  sendTimeLineValidUpto = switchpoint;
687  }
688  }
689  else
690  {
693  sendTimeLineIsHistoric = false;
694  }
695 
697 
698  /* If there is nothing to stream, don't even enter COPY mode */
700  {
701  /*
702  * When we first start replication the standby will be behind the
703  * primary. For some applications, for example synchronous
704  * replication, it is important to have a clear state for this initial
705  * catchup mode, so we can trigger actions when we change streaming
706  * state later. We may stay in this state for a long time, which is
707  * exactly why we want to be able to monitor whether or not we are
708  * still here.
709  */
711 
712  /* Send a CopyBothResponse message, and start streaming */
713  pq_beginmessage(&buf, 'W');
714  pq_sendbyte(&buf, 0);
715  pq_sendint16(&buf, 0);
716  pq_endmessage(&buf);
717  pq_flush();
718 
719  /*
720  * Don't allow a request to stream from a future point in WAL that
721  * hasn't been flushed to disk in this server yet.
722  */
723  if (FlushPtr < cmd->startpoint)
724  {
725  ereport(ERROR,
726  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
727  (uint32) (cmd->startpoint >> 32),
728  (uint32) (cmd->startpoint),
729  (uint32) (FlushPtr >> 32),
730  (uint32) (FlushPtr))));
731  }
732 
733  /* Start streaming from the requested point */
734  sentPtr = cmd->startpoint;
735 
736  /* Initialize shared memory status, too */
740 
742 
743  /* Main loop of walsender */
744  replication_active = true;
745 
747 
748  replication_active = false;
749  if (got_STOPPING)
750  proc_exit(0);
752 
754  }
755 
756  if (cmd->slotname)
758 
759  /*
760  * Copy is finished now. Send a single-row result set indicating the next
761  * timeline.
762  */
764  {
765  char startpos_str[8 + 1 + 8 + 1];
767  TupOutputState *tstate;
768  TupleDesc tupdesc;
769  Datum values[2];
770  bool nulls[2];
771 
772  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
773  (uint32) (sendTimeLineValidUpto >> 32),
775 
777  MemSet(nulls, false, sizeof(nulls));
778 
779  /*
780  * Need a tuple descriptor representing two columns. int8 may seem
781  * like a surprising data type for this, but in theory int4 would not
782  * be wide enough for this, as TimeLineID is unsigned.
783  */
784  tupdesc = CreateTemplateTupleDesc(2);
785  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
786  INT8OID, -1, 0);
787  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
788  TEXTOID, -1, 0);
789 
790  /* prepare for projection of tuple */
791  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
792 
793  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
794  values[1] = CStringGetTextDatum(startpos_str);
795 
796  /* send it to dest */
797  do_tup_output(tstate, values, nulls);
798 
799  end_tup_output(tstate);
800  }
801 
802  /* Send CommandComplete message */
803  pq_puttextmessage('C', "START_STREAMING");
804 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:813
#define pq_flush()
Definition: libpq.h:39
int wal_segment_size
Definition: xlog.c:116
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
static void XLogSendPhysical(void)
Definition: walsender.c:2536
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
#define MemSet(start, val, len)
Definition: c.h:971
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8422
void list_free_deep(List *list)
Definition: list.c:1390
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
static char * buf
Definition: pg_test_fsync.c:67
static bool streamingDoneSending
Definition: walsender.c:179
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:957
unsigned int uint32
Definition: c.h:367
void ReplicationSlotRelease(void)
Definition: slot.c:476
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:75
#define SlotIsLogical(slot)
Definition: slot.h:165
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1701
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:400
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2251
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:367
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2458
TimeLineID ThisTimeLineID
Definition: xlog.c:191
#define ereport(elevel,...)
Definition: elog.h:144
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:738
void WalSndSetState(WalSndState state)
Definition: walsender.c:3203
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
static bool streamingDoneReceiving
Definition: walsender.c:180
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:824
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:367
Definition: slot.h:42
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:87
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2960
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
Definition: pg_list.h:50
#define snprintf
Definition: port.h:193
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
bool am_cascading_walsender
Definition: walsender.c:116

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2020 of file walsender.c.

References EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

2021 {
2022  FullTransactionId nextFullXid;
2023  TransactionId nextXid;
2024  uint32 nextEpoch;
2025 
2026  nextFullXid = ReadNextFullTransactionId();
2027  nextXid = XidFromFullTransactionId(nextFullXid);
2028  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2029 
2030  if (xid <= nextXid)
2031  {
2032  if (epoch != nextEpoch)
2033  return false;
2034  }
2035  else
2036  {
2037  if (epoch + 1 != nextEpoch)
2038  return false;
2039  }
2040 
2041  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2042  return false; /* epoch OK, but it's wrapped around */
2043 
2044  return true;
2045 }
uint32 TransactionId
Definition: c.h:513
#define XidFromFullTransactionId(x)
Definition: transam.h:48
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:367
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ UpdateSpillStats()

static void UpdateSpillStats ( LogicalDecodingContext ctx)
static

Definition at line 3682 of file walsender.c.

References DEBUG2, elog, WalSnd::mutex, LogicalDecodingContext::reorder, WalSnd::spillBytes, ReorderBuffer::spillBytes, WalSnd::spillCount, ReorderBuffer::spillCount, WalSnd::spillTxns, ReorderBuffer::spillTxns, SpinLockAcquire, and SpinLockRelease.

Referenced by WalSndUpdateProgress().

3683 {
3684  ReorderBuffer *rb = ctx->reorder;
3685 
3686  elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
3687  rb,
3688  (long long) rb->spillTxns,
3689  (long long) rb->spillCount,
3690  (long long) rb->spillBytes);
3691 
3693  MyWalSnd->spillTxns = rb->spillTxns;
3697 }
struct ReorderBuffer * reorder
Definition: logical.h:42
slock_t mutex
int64 spillCount
int64 spillBytes
#define SpinLockAcquire(lock)
Definition: spin.h:62
int64 spillTxns
#define DEBUG2
Definition: elog.h:24
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define elog(elevel,...)
Definition: elog.h:214

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2224 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2225 {
2226  TimestampTz timeout;
2227 
2228  /* don't bail out if we're doing something that doesn't require timeouts */
2229  if (last_reply_timestamp <= 0)
2230  return;
2231 
2234 
2235  if (wal_sender_timeout > 0 && last_processing >= timeout)
2236  {
2237  /*
2238  * Since typically expiration of replication timeout means
2239  * communication problem, we don't send the error message to the
2240  * standby.
2241  */
2243  (errmsg("terminating walsender process due to replication timeout")));
2244 
2245  WalSndShutdown();
2246  }
2247 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:162
#define COMMERROR
Definition: elog.h:30
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg(const char *fmt,...)
Definition: elog.c:824
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2174 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2175 {
2176  long sleeptime = 10000; /* 10 s */
2177 
2179  {
2180  TimestampTz wakeup_time;
2181  long sec_to_timeout;
2182  int microsec_to_timeout;
2183 
2184  /*
2185  * At the latest stop sleeping once wal_sender_timeout has been
2186  * reached.
2187  */
2190 
2191  /*
2192  * If no ping has been sent yet, wakeup when it's time to do so.
2193  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2194  * the timeout passed without a response.
2195  */
2198  wal_sender_timeout / 2);
2199 
2200  /* Compute relative time until wakeup. */
2201  TimestampDifference(now, wakeup_time,
2202  &sec_to_timeout, &microsec_to_timeout);
2203 
2204  sleeptime = sec_to_timeout * 1000 +
2205  microsec_to_timeout / 1000;
2206  }
2207 
2208  return sleeptime;
2209 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:41
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1648
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2917 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2918 {
2919  XLogRecPtr replicatedPtr;
2920 
2921  /* ... let's just be real sure we're caught up ... */
2922  send_data();
2923 
2924  /*
2925  * To figure out whether all WAL has successfully been replicated, check
2926  * flush location if valid, write otherwise. Tools like pg_receivewal will
2927  * usually (unless in synchronous mode) return an invalid flush location.
2928  */
2929  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2931 
2932  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2933  !pq_is_send_pending())
2934  {
2935  QueryCompletion qc;
2936 
2937  /* Inform the standby that XLOG streaming is done */
2938  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
2939  EndCommand(&qc, DestRemote, false);
2940  pq_flush();
2941 
2942  proc_exit(0);
2943  }
2945  {
2946  WalSndKeepalive(true);
2948  }
2949 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3461
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:183
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 296 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

297 {
301 
302  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
304 
305  if (MyReplicationSlot != NULL)
307 
309 
310  replication_active = false;
311 
312  /*
313  * If there is a transaction in progress, it will clean up our
314  * ResourceOwner, but if a replication command set up a resource owner
315  * without a transaction, we've got to clean that up now.
316  */
318  WalSndResourceCleanup(false);
319 
320  if (got_STOPPING || got_SIGUSR2)
321  proc_exit(0);
322 
323  /* Revert back to startup state */
325 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:813
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4674
WALOpenSegment seg
Definition: xlogreader.h:213
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:476
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1381
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:331
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3203
void ReplicationSlotCleanup(void)
Definition: slot.c:531
void LWLockReleaseAll(void)
Definition: lwlock.c:1911

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3222 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3223 {
3224  switch (state)
3225  {
3226  case WALSNDSTATE_STARTUP:
3227  return "startup";
3228  case WALSNDSTATE_BACKUP:
3229  return "backup";
3230  case WALSNDSTATE_CATCHUP:
3231  return "catchup";
3232  case WALSNDSTATE_STREAMING:
3233  return "streaming";
3234  case WALSNDSTATE_STOPPING:
3235  return "stopping";
3236  }
3237  return "UNKNOWN";
3238 }
Definition: regguts.h:298

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3139 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3140 {
3141  int i;
3142 
3143  for (i = 0; i < max_wal_senders; i++)
3144  {
3145  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3146  pid_t pid;
3147 
3148  SpinLockAcquire(&walsnd->mutex);
3149  pid = walsnd->pid;
3150  SpinLockRelease(&walsnd->mutex);
3151 
3152  if (pid == 0)
3153  continue;
3154 
3156  }
3157 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3461 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3462 {
3463  elog(DEBUG2, "sending replication keepalive");
3464 
3465  /* construct the message... */
3467  pq_sendbyte(&output_message, 'k');
3470  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3471 
3472  /* ... and send it wrapped in CopyData */
3474 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
static StringInfoData output_message
Definition: walsender.c:157
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog(elevel,...)
Definition: elog.h:214

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3480 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3481 {
3482  TimestampTz ping_time;
3483 
3484  /*
3485  * Don't send keepalive messages if timeouts are globally disabled or
3486  * we're doing something not partaking in timeouts.
3487  */
3488  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3489  return;
3490 
3492  return;
3493 
3494  /*
3495  * If half of wal_sender_timeout has lapsed without receiving any reply
3496  * from the standby, send a keep-alive message to the standby requesting
3497  * an immediate reply.
3498  */
3500  wal_sender_timeout / 2);
3501  if (last_processing >= ping_time)
3502  {
3503  WalSndKeepalive(true);
3505 
3506  /* Try to flush pending output to the client */
3507  if (pq_flush_if_writable() != 0)
3508  WalSndShutdown();
3509  }
3510 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3461
static TimestampTz last_processing
Definition: walsender.c:162
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2440 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2441 {
2442  WalSnd *walsnd = MyWalSnd;
2443 
2444  Assert(walsnd != NULL);
2445 
2446  MyWalSnd = NULL;
2447 
2448  SpinLockAcquire(&walsnd->mutex);
2449  /* clear latch while holding the spinlock, so it can safely be read */
2450  walsnd->latch = NULL;
2451  /* Mark WalSnd struct as no longer being in use. */
2452  walsnd->pid = 0;
2453  SpinLockRelease(&walsnd->mutex);
2454 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:738

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3035 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

3036 {
3037  int save_errno = errno;
3038 
3039  got_SIGUSR2 = true;
3040  SetLatch(MyLatch);
3041 
3042  errno = save_errno;
3043 }
void SetLatch(Latch *latch)
Definition: latch.c:457
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
struct Latch * MyLatch
Definition: globals.c:54

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2251 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

2252 {
2253  /*
2254  * Initialize the last reply timestamp. That enables timeout processing
2255  * from hereon.
2256  */
2258  waiting_for_ping_response = false;
2259 
2260  /*
2261  * Loop until we reach the end of this timeline or the client requests to
2262  * stop streaming.
2263  */
2264  for (;;)
2265  {
2266  /* Clear any already-pending wakeups */
2268 
2270 
2271  /* Process any requests or signals received recently */
2272  if (ConfigReloadPending)
2273  {
2274  ConfigReloadPending = false;
2277  }
2278 
2279  /* Check for input from the client */
2281 
2282  /*
2283  * If we have received CopyDone from the client, sent CopyDone
2284  * ourselves, and the output buffer is empty, it's time to exit
2285  * streaming.
2286  */
2288  !pq_is_send_pending())
2289  break;
2290 
2291  /*
2292  * If we don't have any pending data in the output buffer, try to send
2293  * some more. If there is some, we don't bother to call send_data
2294  * again until we've flushed it ... but we'd better assume we are not
2295  * caught up.
2296  */
2297  if (!pq_is_send_pending())
2298  send_data();
2299  else
2300  WalSndCaughtUp = false;
2301 
2302  /* Try to flush pending output to the client */
2303  if (pq_flush_if_writable() != 0)
2304  WalSndShutdown();
2305 
2306  /* If nothing remains to be sent right now ... */
2308  {
2309  /*
2310  * If we're in catchup state, move to streaming. This is an
2311  * important state change for users to know about, since before
2312  * this point data loss might occur if the primary dies and we
2313  * need to failover to the standby. The state change is also
2314  * important for synchronous replication, since commits that
2315  * started to wait at that point might wait for some time.
2316  */
2318  {
2319  ereport(DEBUG1,
2320  (errmsg("\"%s\" has now caught up with upstream server",
2321  application_name)));
2323  }
2324 
2325  /*
2326  * When SIGUSR2 arrives, we send any outstanding logs up to the
2327  * shutdown checkpoint record (i.e., the latest record), wait for
2328  * them to be replicated to the standby, and exit. This may be a
2329  * normal termination at shutdown, or a promotion, the walsender
2330  * is not sure which.
2331  */
2332  if (got_SIGUSR2)
2333  WalSndDone(send_data);
2334  }
2335 
2336  /* Check for replication timeout. */
2338 
2339  /* Send keepalive if the time has come */
2341 
2342  /*
2343  * Block if we have unsent data. XXX For logical replication, let
2344  * WalSndWaitForWal() handle any other blocking; idle receivers need
2345  * its additional actions. For physical replication, also block if
2346  * caught up; its send_data does not block.
2347  */
2348  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2351  {
2352  long sleeptime;
2353  int wakeEvents;
2354 
2355  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2357 
2358  /*
2359  * Use fresh timestamp, not last_processing, to reduce the chance
2360  * of reaching wal_sender_timeout before sending a keepalive.
2361  */
2363 
2364  if (pq_is_send_pending())
2365  wakeEvents |= WL_SOCKET_WRITEABLE;
2366 
2367  /* Sleep until something happens or we time out */
2368  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2369  MyProcPort->sock, sleeptime,
2371  }
2372  }
2373 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2917
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:41
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:540
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:390
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
void SyncRepInitConfig(void)
Definition: syncrep.c:400
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3480
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2224
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
#define ereport(elevel,...)
Definition: elog.h:144
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2174
void WalSndSetState(WalSndState state)
Definition: walsender.c:3203
static void XLogSendLogical(void)
Definition: walsender.c:2837
static bool streamingDoneReceiving
Definition: walsender.c:180
char * application_name
Definition: guc.c:559
int errmsg(const char *fmt,...)
Definition: elog.c:824
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:171
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1699
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1238 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1239 {
1240  /* can't have sync rep confused by sending the same LSN several times */
1241  if (!last_write)
1242  lsn = InvalidXLogRecPtr;
1243 
1244  resetStringInfo(ctx->out);
1245 
1246  pq_sendbyte(ctx->out, 'w');
1247  pq_sendint64(ctx->out, lsn); /* dataStart */
1248  pq_sendint64(ctx->out, lsn); /* walEnd */
1249 
1250  /*
1251  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1252  * reserve space here.
1253  */
1254  pq_sendint64(ctx->out, 0); /* sendtime */
1255 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
StringInfo out
Definition: logical.h:70

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 331 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

332 {
333  ResourceOwner resowner;
334 
335  if (CurrentResourceOwner == NULL)
336  return;
337 
338  /*
339  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
340  * in a local variable and clear it first.
341  */
342  resowner = CurrentResourceOwner;
343  CurrentResourceOwner = NULL;
344 
345  /* Now we can release resources and delete it. */
346  ResourceOwnerRelease(resowner,
347  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
348  ResourceOwnerRelease(resowner,
349  RESOURCE_RELEASE_LOCKS, isCommit, true);
350  ResourceOwnerRelease(resowner,
351  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
352  ResourceOwnerDelete(resowner);
353 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:712
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:478

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2990 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2991 {
2992  int i;
2993 
2994  for (i = 0; i < max_wal_senders; i++)
2995  {
2996  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2997 
2998  SpinLockAcquire(&walsnd->mutex);
2999  if (walsnd->pid == 0)
3000  {
3001  SpinLockRelease(&walsnd->mutex);
3002  continue;
3003  }
3004  walsnd->needreload = true;
3005  SpinLockRelease(&walsnd->mutex);
3006  }
3007 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2458 of file walsender.c.

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, WALOpenSegment::ws_file, WALOpenSegment::ws_segno, WALSegmentContext::ws_segsize, XLByteToSeg, XLogFileName, and XLogFilePath.

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

2460 {
2461  char path[MAXPGPATH];
2462 
2463  /*-------
2464  * When reading from a historic timeline, and there is a timeline switch
2465  * within this segment, read from the WAL segment belonging to the new
2466  * timeline.
2467  *
2468  * For example, imagine that this server is currently on timeline 5, and
2469  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2470  * 0/13002088. In pg_wal, we have these files:
2471  *
2472  * ...
2473  * 000000040000000000000012
2474  * 000000040000000000000013
2475  * 000000050000000000000013
2476  * 000000050000000000000014
2477  * ...
2478  *
2479  * In this situation, when requested to send the WAL from segment 0x13, on
2480  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2481  * recovery prefers files from newer timelines, so if the segment was
2482  * restored from the archive on this server, the file belonging to the old
2483  * timeline, 000000040000000000000013, might not exist. Their contents are
2484  * equal up to the switchpoint, because at a timeline switch, the used
2485  * portion of the old segment is copied to the new file. -------
2486  */
2487  *tli_p = sendTimeLine;
2489  {
2490  XLogSegNo endSegNo;
2491 
2493  if (state->seg.ws_segno == endSegNo)
2494  *tli_p = sendTimeLineNextTLI;
2495  }
2496 
2497  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2498  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2499  if (state->seg.ws_file >= 0)
2500  return;
2501 
2502  /*
2503  * If the file is not found, assume it's because the standby asked for a
2504  * too old WAL segment that has already been removed or recycled.
2505  */
2506  if (errno == ENOENT)
2507  {
2508  char xlogfname[MAXFNAMELEN];
2509  int save_errno = errno;
2510 
2511  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2512  errno = save_errno;
2513  ereport(ERROR,
2515  errmsg("requested WAL segment %s has already been removed",
2516  xlogfname)));
2517  }
2518  else
2519  ereport(ERROR,
2521  errmsg("could not open file \"%s\": %m",
2522  path)));
2523 }
int wal_segment_size
Definition: xlog.c:116
#define PG_BINARY
Definition: c.h:1233
WALOpenSegment seg
Definition: xlogreader.h:213
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
XLogSegNo ws_segno
Definition: xlogreader.h:47
int errcode_for_file_access(void)
Definition: elog.c:633
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:144
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:985
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:824
WALSegmentContext segcxt
Definition: xlogreader.h:212
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3203 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3204 {
3205  WalSnd *walsnd = MyWalSnd;
3206 
3208 
3209  if (walsnd->state == state)
3210  return;
3211 
3212  SpinLockAcquire(&walsnd->mutex);
3213  walsnd->state = state;
3214  SpinLockRelease(&walsnd->mutex);
3215 }
slock_t mutex
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:738
Definition: regguts.h:298

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3078 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3079 {
3080  bool found;
3081  int i;
3082 
3083  WalSndCtl = (WalSndCtlData *)
3084  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3085 
3086  if (!found)
3087  {
3088  /* First time through, so initialize */
3090 
3091  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3093 
3094  for (i = 0; i < max_wal_senders; i++)
3095  {
3096  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3097 
3098  SpinLockInit(&walsnd->mutex);
3099  }
3100  }
3101 }
Size WalSndShmemSize(void)
Definition: walsender.c:3066
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:971
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3066 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3067 {
3068  Size size = 0;
3069 
3070  size = offsetof(WalSndCtlData, walsnds);
3071  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3072 
3073  return size;
3074 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:466
#define offsetof(type, field)
Definition: c.h:661

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 229 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndResourceCleanup(), WalSndWaitForWal(), and WalSndWriteData().

263 {
265 
266  /* Create a per-walsender data structure in shared memory */
268 
269  /*
270  * We don't currently need any ResourceOwner in a walsender process, but
271  * if we did, we could call CreateAuxProcessResourceOwner here.
272  */
273 
274  /*
275  * Let postmaster know that we're a WAL sender. Once we've declared us as
276  * a WAL sender process, postmaster will let us outlive the bgwriter and
277  * kill us last in the shutdown sequence, so we get a chance to stream all
278  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
279  * there's no going back, and we mustn't write any WAL records after this.
280  */
283 
284  /* Initialize empty timestamp buffer for lag tracking. */
286 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2377
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
bool RecoveryInProgress(void)
Definition: xlog.c:8071
static LagTracker * lag_tracker
Definition: walsender.c:219
MemoryContext TopMemoryContext
Definition: mcxt.c:44
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
bool am_cascading_walsender
Definition: walsender.c:116

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3047 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3048 {
3049  /* Set up signal handlers */
3051  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3052  pqsignal(SIGTERM, die); /* request shutdown */
3053  pqsignal(SIGQUIT, quickdie); /* hard crash time */
3054  InitializeTimeouts(); /* establishes SIGALRM handler */
3057  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3058  * shutdown */
3059 
3060  /* Reset some signals that are accepted by postmaster but not here */
3062 }
void InitializeTimeouts(void)
Definition: timeout.c:346
#define SIGQUIT
Definition: win32_port.h:154
#define SIGUSR1
Definition: win32_port.h:165
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3035
#define SIGCHLD
Definition: win32_port.h:163
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:158
#define SIGUSR2
Definition: win32_port.h:166
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2857
#define SIGHUP
Definition: win32_port.h:153
#define SIG_IGN
Definition: win32_port.h:150
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:148
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:551
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2759
#define die(msg)
Definition: pg_test_fsync.c:96

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1355 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), UpdateSpillStats(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1356 {
1357  static TimestampTz sendTime = 0;
1359 
1360  /*
1361  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1362  * avoid flooding the lag tracker when we commit frequently.
1363  */
1364 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1365  if (!TimestampDifferenceExceeds(sendTime, now,
1367  return;
1368 
1369  LagTrackerWrite(lsn, now);
1370  sendTime = now;
1371 
1372  /*
1373  * Update statistics about transactions that spilled to disk.
1374  */
1375  UpdateSpillStats(ctx);
1376 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1673
static void UpdateSpillStats(LogicalDecodingContext *ctx)
Definition: walsender.c:3682
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3519
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1386 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1387 {
1388  int wakeEvents;
1389  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1390 
1391  /*
1392  * Fast path to avoid acquiring the spinlock in case we already know we
1393  * have enough WAL available. This is particularly interesting if we're
1394  * far behind.
1395  */
1396  if (RecentFlushPtr != InvalidXLogRecPtr &&
1397  loc <= RecentFlushPtr)
1398  return RecentFlushPtr;
1399 
1400  /* Get a more recent flush pointer. */
1401  if (!RecoveryInProgress())
1402  RecentFlushPtr = GetFlushRecPtr();
1403  else
1404  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1405 
1406  for (;;)
1407  {
1408  long sleeptime;
1409 
1410  /* Clear any already-pending wakeups */
1412 
1414 
1415  /* Process any requests or signals received recently */
1416  if (ConfigReloadPending)
1417  {
1418  ConfigReloadPending = false;
1421  }
1422 
1423  /* Check for input from the client */
1425 
1426  /*
1427  * If we're shutting down, trigger pending WAL to be written out,
1428  * otherwise we'd possibly end up waiting for WAL that never gets
1429  * written, because walwriter has shut down already.
1430  */
1431  if (got_STOPPING)
1433 
1434  /* Update our idea of the currently flushed position. */
1435  if (!RecoveryInProgress())
1436  RecentFlushPtr = GetFlushRecPtr();
1437  else
1438  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1439 
1440  /*
1441  * If postmaster asked us to stop, don't wait anymore.
1442  *
1443  * It's important to do this check after the recomputation of
1444  * RecentFlushPtr, so we can send all remaining data before shutting
1445  * down.
1446  */
1447  if (got_STOPPING)
1448  break;
1449 
1450  /*
1451  * We only send regular messages to the client for full decoded
1452  * transactions, but a synchronous replication and walsender shutdown
1453  * possibly are waiting for a later location. So, before sleeping, we
1454  * send a ping containing the flush location. If the receiver is
1455  * otherwise idle, this keepalive will trigger a reply. Processing the
1456  * reply will update these MyWalSnd locations.
1457  */
1458  if (MyWalSnd->flush < sentPtr &&
1459  MyWalSnd->write < sentPtr &&
1461  {
1462  WalSndKeepalive(false);
1464  }
1465 
1466  /* check whether we're done */
1467  if (loc <= RecentFlushPtr)
1468  break;
1469 
1470  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1471  WalSndCaughtUp = true;
1472 
1473  /*
1474  * Try to flush any pending output to the client.
1475  */
1476  if (pq_flush_if_writable() != 0)
1477  WalSndShutdown();
1478 
1479  /*
1480  * If we have received CopyDone from the client, sent CopyDone
1481  * ourselves, and the output buffer is empty, it's time to exit
1482  * streaming, so fail the current WAL fetch request.
1483  */
1485  !pq_is_send_pending())
1486  break;
1487 
1488  /* die if timeout was reached */
1490 
1491  /* Send keepalive if the time has come */
1493 
1494  /*
1495  * Sleep until something happens or we time out. Also wait for the
1496  * socket becoming writable, if there's still pending output.
1497  * Otherwise we might sit on sendable output data while waiting for
1498  * new WAL to be generated. (But if we have nothing to send, we don't
1499  * want to wake on socket-writable.)
1500  */
1502 
1503  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1505 
1506  if (pq_is_send_pending())
1507  wakeEvents |= WL_SOCKET_WRITEABLE;
1508 
1509  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1510  MyProcPort->sock, sleeptime,
1512  }
1513 
1514  /* reactivate latch so WalSndLoop knows to continue */
1515  SetLatch(MyLatch);
1516  return RecentFlushPtr;
1517 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8422
int sleeptime
Definition: pg_standby.c:41
bool RecoveryInProgress(void)
Definition: xlog.c:8071
void SetLatch(Latch *latch)
Definition: latch.c:457
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:540
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3461
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11467
bool XLogBackgroundFlush(void)
Definition: xlog.c:3035
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:390
void SyncRepInitConfig(void)
Definition: syncrep.c:400
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3480
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2224
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2174
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool streamingDoneReceiving
Definition: walsender.c:180
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:171
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1699
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3165 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3166 {
3167  for (;;)
3168  {
3169  int i;
3170  bool all_stopped = true;
3171 
3172  for (i = 0; i < max_wal_senders; i++)
3173  {
3174  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3175 
3176  SpinLockAcquire(&walsnd->mutex);
3177 
3178  if (walsnd->pid == 0)
3179  {
3180  SpinLockRelease(&walsnd->mutex);
3181  continue;
3182  }
3183 
3184  if (walsnd->state != WALSNDSTATE_STOPPING)
3185  {
3186  all_stopped = false;
3187  SpinLockRelease(&walsnd->mutex);
3188  break;
3189  }
3190  SpinLockRelease(&walsnd->mutex);
3191  }
3192 
3193  /* safe to leave if confirmation is done for all WAL senders */
3194  if (all_stopped)
3195  return;
3196 
3197  pg_usleep(10000L); /* wait for 10 msec */
3198  }
3199 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3110 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3111 {
3112  int i;
3113 
3114  for (i = 0; i < max_wal_senders; i++)
3115  {
3116  Latch *latch;
3117  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3118 
3119  /*
3120  * Get latch pointer with spinlock held, for the unlikely case that
3121  * pointer reads aren't atomic (as they're 8 bytes).
3122  */
3123  SpinLockAcquire(&walsnd->mutex);
3124  latch = walsnd->latch;
3125  SpinLockRelease(&walsnd->mutex);
3126 
3127  if (latch != NULL)
3128  SetLatch(latch);
3129  }
3130 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:457
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1265 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1267 {
1268  TimestampTz now;
1269 
1270  /*
1271  * Fill the send timestamp last, so that it is taken as late as possible.
1272  * This is somewhat ugly, but the protocol is set as it's already used for
1273  * several releases by streaming physical replication.
1274  */
1276  now = GetCurrentTimestamp();
1277  pq_sendint64(&tmpbuf, now);
1278  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1279  tmpbuf.data, sizeof(int64));
1280 
1281  /* output previously gathered data in a CopyData packet */
1282  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1283 
1285 
1286  /* Try to flush pending output to the client */
1287  if (pq_flush_if_writable() != 0)
1288  WalSndShutdown();
1289 
1290  /* Try taking fast path unless we get too close to walsender timeout. */
1292  wal_sender_timeout / 2) &&
1293  !pq_is_send_pending())
1294  {
1295  return;
1296  }
1297 
1298  /* If we have pending write here, go to slow path */
1299  for (;;)
1300  {
1301  int wakeEvents;
1302  long sleeptime;
1303 
1304  /* Check for input from the client */
1306 
1307  /* die if timeout was reached */
1309 
1310  /* Send keepalive if the time has come */
1312 
1313  if (!pq_is_send_pending())
1314  break;
1315 
1317 
1318  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1320 
1321  /* Sleep until something happens or we time out */
1322  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1323  MyProcPort->sock, sleeptime,
1325 
1326  /* Clear any already-pending wakeups */
1328 
1330 
1331  /* Process any requests or signals received recently */
1332  if (ConfigReloadPending)
1333  {
1334  ConfigReloadPending = false;
1337  }
1338 
1339  /* Try to flush pending output to the client */
1340  if (pq_flush_if_writable() != 0)
1341  WalSndShutdown();
1342  }
1343 
1344  /* reactivate latch so WalSndLoop knows to continue */
1345  SetLatch(MyLatch);
1346 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
int wal_sender_timeout
Definition: walsender.c:123
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:41
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void SetLatch(Latch *latch)
Definition: latch.c:457
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:540
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:390
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void SyncRepInitConfig(void)
Definition: syncrep.c:400
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3480
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2224
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2174
static StringInfoData tmpbuf
Definition: walsender.c:159
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:70
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1699