PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   11
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (TimestampTz now)
 
static void XLogRead (char *buf, XLogRecPtr startptr, Size count)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (TimestampTz now)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static int sendFile = -1
 
static XLogSegNo sendSegNo = 0
 
static uint32 sendOff = 0
 
static TimeLineID curFileTimeLine = 0
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
struct {
   XLogRecPtr   last_lsn
 
   WalTimeSample   buffer [LAG_TRACKER_BUFFER_SIZE]
 
   int   write_head
 
   int   read_heads [NUM_SYNC_REP_WAIT_MODE]
 
   WalTimeSample   last_read [NUM_SYNC_REP_WAIT_MODE]
 
LagTracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 208 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 105 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   11

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 224 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 835 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf(), CreateReplicationSlotCmd::temporary, TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

836 {
837  const char *snapshot_name = NULL;
838  char xloc[MAXFNAMELEN];
839  char *slot_name;
840  bool reserve_wal = false;
841  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
843  TupOutputState *tstate;
844  TupleDesc tupdesc;
845  Datum values[4];
846  bool nulls[4];
847 
849 
850  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
851 
852  /* setup state for XLogReadPage */
853  sendTimeLineIsHistoric = false;
855 
856  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
857  {
858  ReplicationSlotCreate(cmd->slotname, false,
860  }
861  else
862  {
864 
865  /*
866  * Initially create persistent slot as ephemeral - that allows us to
867  * nicely handle errors during initialization because it'll get
868  * dropped if this transaction fails. We'll make it persistent at the
869  * end. Temporary slots can be created as temporary from beginning as
870  * they get dropped on error as well.
871  */
872  ReplicationSlotCreate(cmd->slotname, true,
874  }
875 
876  if (cmd->kind == REPLICATION_KIND_LOGICAL)
877  {
879  bool need_full_snapshot = false;
880 
881  /*
882  * Do options check early so that we can bail before calling the
883  * DecodingContextFindStartpoint which can take long time.
884  */
885  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
886  {
887  if (IsTransactionBlock())
888  ereport(ERROR,
889  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
890  "must not be called inside a transaction")));
891 
892  need_full_snapshot = true;
893  }
894  else if (snapshot_action == CRS_USE_SNAPSHOT)
895  {
896  if (!IsTransactionBlock())
897  ereport(ERROR,
898  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
899  "must be called inside a transaction")));
900 
902  ereport(ERROR,
903  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
904  "must be called in REPEATABLE READ isolation mode transaction")));
905 
906  if (FirstSnapshotSet)
907  ereport(ERROR,
908  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
909  "must be called before any query")));
910 
911  if (IsSubTransaction())
912  ereport(ERROR,
913  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
914  "must not be called in a subtransaction")));
915 
916  need_full_snapshot = true;
917  }
918 
919  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
923 
924  /*
925  * Signal that we don't need the timeout mechanism. We're just
926  * creating the replication slot and don't yet accept feedback
927  * messages or send keepalives. As we possibly need to wait for
928  * further WAL the walsender would otherwise possibly be killed too
929  * soon.
930  */
932 
933  /* build initial snapshot, might take a while */
935 
936  /*
937  * Export or use the snapshot if we've been asked to do so.
938  *
939  * NB. We will convert the snapbuild.c kind of snapshot to normal
940  * snapshot when doing this.
941  */
942  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
943  {
944  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
945  }
946  else if (snapshot_action == CRS_USE_SNAPSHOT)
947  {
948  Snapshot snap;
949 
952  }
953 
954  /* don't need the decoding context anymore */
955  FreeDecodingContext(ctx);
956 
957  if (!cmd->temporary)
959  }
960  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
961  {
963 
965 
966  /* Write this slot to disk if it's a permanent one. */
967  if (!cmd->temporary)
969  }
970 
971  snprintf(xloc, sizeof(xloc), "%X/%X",
974 
976  MemSet(nulls, false, sizeof(nulls));
977 
978  /*----------
979  * Need a tuple descriptor representing four columns:
980  * - first field: the slot name
981  * - second field: LSN at which we became consistent
982  * - third field: exported snapshot's name
983  * - fourth field: output plugin
984  *----------
985  */
986  tupdesc = CreateTemplateTupleDesc(4, false);
987  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
988  TEXTOID, -1, 0);
989  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
990  TEXTOID, -1, 0);
991  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
992  TEXTOID, -1, 0);
993  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
994  TEXTOID, -1, 0);
995 
996  /* prepare for projection of tuples */
997  tstate = begin_tup_output_tupdesc(dest, tupdesc);
998 
999  /* slot_name */
1000  slot_name = NameStr(MyReplicationSlot->data.name);
1001  values[0] = CStringGetTextDatum(slot_name);
1002 
1003  /* consistent wal location */
1004  values[1] = CStringGetTextDatum(xloc);
1005 
1006  /* snapshot name, or NULL if none */
1007  if (snapshot_name != NULL)
1008  values[2] = CStringGetTextDatum(snapshot_name);
1009  else
1010  nulls[2] = true;
1011 
1012  /* plugin, or NULL if none */
1013  if (cmd->plugin != NULL)
1014  values[3] = CStringGetTextDatum(cmd->plugin);
1015  else
1016  nulls[3] = true;
1017 
1018  /* send it to dest */
1019  do_tup_output(tstate, values, nulls);
1020  end_tup_output(tstate);
1021 
1023 }
#define NIL
Definition: pg_list.h:69
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:782
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:30
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2191
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1253
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define MemSet(start, val, len)
Definition: c.h:853
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ReplicationSlotSave(void)
Definition: slot.c:638
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:216
ReplicationSlotPersistentData data
Definition: slot.h:120
XLogRecPtr confirmed_flush
Definition: slot.h:81
ReplicationKind kind
Definition: replnodes.h:56
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:748
bool IsTransactionBlock(void)
Definition: xact.c:4447
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:635
void ReplicationSlotReserveWal(void)
Definition: slot.c:985
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:432
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1311
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:576
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:203
void ReplicationSlotPersist(void)
Definition: slot.c:673
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1151
unsigned int uint32
Definition: c.h:296
void ReplicationSlotRelease(void)
Definition: slot.c:416
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1235
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1124
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:372
TimeLineID ThisTimeLineID
Definition: xlog.c:181
struct SnapBuild * snapshot_builder
Definition: logical.h:46
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:148
#define Assert(condition)
Definition: c.h:670
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:475
int XactIsoLevel
Definition: xact.c:74
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:43
bool IsSubTransaction(void)
Definition: xact.c:4520
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:545
static Datum values[MAXATTR]
Definition: bootstrap.c:164
int errmsg(const char *fmt,...)
Definition: elog.c:797
CRSSnapshotAction
Definition: walsender.h:22
#define NameStr(name)
Definition: c.h:547
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1241

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1029 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1030 {
1031  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1032  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1033 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:509

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1416 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, generate_unaccent_rules::dest, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, PreventTransactionChain(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), WalSnd::state, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1417 {
1418  int parse_rc;
1419  Node *cmd_node;
1420  MemoryContext cmd_context;
1421  MemoryContext old_context;
1422 
1423  /*
1424  * If WAL sender has been told that shutdown is getting close, switch its
1425  * status accordingly to handle the next replication commands correctly.
1426  */
1427  if (got_STOPPING)
1429 
1430  /*
1431  * Throw error if in stopping mode. We need prevent commands that could
1432  * generate WAL while the shutdown checkpoint is being written. To be
1433  * safe, we just prohibit all new commands.
1434  */
1436  ereport(ERROR,
1437  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1438 
1439  /*
1440  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1441  * command arrives. Clean up the old stuff if there's anything.
1442  */
1444 
1446 
1448  "Replication command context",
1450  old_context = MemoryContextSwitchTo(cmd_context);
1451 
1452  replication_scanner_init(cmd_string);
1453  parse_rc = replication_yyparse();
1454  if (parse_rc != 0)
1455  ereport(ERROR,
1456  (errcode(ERRCODE_SYNTAX_ERROR),
1457  (errmsg_internal("replication command parser returned %d",
1458  parse_rc))));
1459 
1460  cmd_node = replication_parse_result;
1461 
1462  /*
1463  * Log replication command if log_replication_commands is enabled. Even
1464  * when it's disabled, log the command with DEBUG1 level for backward
1465  * compatibility. Note that SQL commands are not logged here, and will be
1466  * logged later if log_statement is enabled.
1467  */
1468  if (cmd_node->type != T_SQLCmd)
1470  (errmsg("received replication command: %s", cmd_string)));
1471 
1472  /*
1473  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1474  * called outside of transaction the snapshot should be cleared here.
1475  */
1476  if (!IsTransactionBlock())
1478 
1479  /*
1480  * For aborted transactions, don't allow anything except pure SQL, the
1481  * exec_simple_query() will handle it correctly.
1482  */
1483  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1484  ereport(ERROR,
1485  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1486  errmsg("current transaction is aborted, "
1487  "commands ignored until end of transaction block")));
1488 
1490 
1491  /*
1492  * Allocate buffers that will be used for each outgoing and incoming
1493  * message. We do this just once per command to reduce palloc overhead.
1494  */
1498 
1499  switch (cmd_node->type)
1500  {
1501  case T_IdentifySystemCmd:
1502  IdentifySystem();
1503  break;
1504 
1505  case T_BaseBackupCmd:
1506  PreventTransactionChain(true, "BASE_BACKUP");
1507  SendBaseBackup((BaseBackupCmd *) cmd_node);
1508  break;
1509 
1512  break;
1513 
1516  break;
1517 
1518  case T_StartReplicationCmd:
1519  {
1520  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1521 
1522  PreventTransactionChain(true, "START_REPLICATION");
1523 
1524  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1525  StartReplication(cmd);
1526  else
1528  break;
1529  }
1530 
1531  case T_TimeLineHistoryCmd:
1532  PreventTransactionChain(true, "TIMELINE_HISTORY");
1534  break;
1535 
1536  case T_VariableShowStmt:
1537  {
1539  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1540 
1541  GetPGVariable(n->name, dest);
1542  }
1543  break;
1544 
1545  case T_SQLCmd:
1546  if (MyDatabaseId == InvalidOid)
1547  ereport(ERROR,
1548  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1549 
1550  /* Tell the caller that this wasn't a WalSender command. */
1551  return false;
1552 
1553  default:
1554  elog(ERROR, "unrecognized replication command node tag: %u",
1555  cmd_node->type);
1556  }
1557 
1558  /* done */
1559  MemoryContextSwitchTo(old_context);
1560  MemoryContextDelete(cmd_context);
1561 
1562  /* Send CommandComplete message */
1563  EndCommand("SELECT", DestRemote);
1564 
1565  return true;
1566 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:563
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:427
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1029
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:371
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:512
static StringInfoData output_message
Definition: walsender.c:160
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7921
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4447
ReplicationKind kind
Definition: replnodes.h:82
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:691
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:170
WalSndState state
NodeTag type
Definition: nodes.h:514
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:523
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static StringInfoData reply_message
Definition: walsender.c:161
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:342
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1040
Oid MyDatabaseId
Definition: globals.c:77
WalSnd * MyWalSnd
Definition: walsender.c:111
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
void WalSndSetState(WalSndState state)
Definition: walsender.c:3097
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:835
static StringInfoData tmpbuf
Definition: walsender.c:162
bool log_replication_commands
Definition: walsender.c:124
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define elog
Definition: elog.h:219
static void IdentifySystem(void)
Definition: walsender.c:338
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:696
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3153
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2849 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2850 {
2851  XLogRecPtr replayPtr;
2852  TimeLineID replayTLI;
2853  XLogRecPtr receivePtr;
2855  XLogRecPtr result;
2856 
2857  /*
2858  * We can safely send what's already been replayed. Also, if walreceiver
2859  * is streaming WAL from the same timeline, we can send anything that it
2860  * has streamed, but hasn't been replayed yet.
2861  */
2862 
2863  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2864  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2865 
2866  ThisTimeLineID = replayTLI;
2867 
2868  result = replayPtr;
2869  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2870  result = receivePtr;
2871 
2872  return result;
2873 }
uint32 TimeLineID
Definition: xlogdefs.h:45
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11125
static TimeLineID receiveTLI
Definition: xlog.c:203
TimeLineID ThisTimeLineID
Definition: xlog.c:181
uint64 XLogRecPtr
Definition: xlogdefs.h:21

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 2902 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

2903 {
2905 
2906  /*
2907  * If replication has not yet started, die like with SIGTERM. If
2908  * replication is active, only set a flag and wake up the main loop. It
2909  * will send any outstanding WAL, wait for it to be replicated to the
2910  * standby, and then exit gracefully.
2911  */
2912  if (!replication_active)
2913  kill(MyProcPid, SIGTERM);
2914  else
2915  got_STOPPING = true;
2916 }
int MyProcPid
Definition: globals.c:39
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:437
bool am_walsender
Definition: walsender.c:114
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:670

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 338 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, INT4OID, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf(), StartTransactionCommand(), TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

339 {
340  char sysid[32];
341  char xloc[MAXFNAMELEN];
342  XLogRecPtr logptr;
343  char *dbname = NULL;
345  TupOutputState *tstate;
346  TupleDesc tupdesc;
347  Datum values[4];
348  bool nulls[4];
349 
350  /*
351  * Reply with a result set with one row, four columns. First col is system
352  * ID, second is timeline ID, third is current xlog location and the
353  * fourth contains the database name if we are connected to one.
354  */
355 
356  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
358 
361  {
362  /* this also updates ThisTimeLineID */
363  logptr = GetStandbyFlushRecPtr();
364  }
365  else
366  logptr = GetFlushRecPtr();
367 
368  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
369 
370  if (MyDatabaseId != InvalidOid)
371  {
373 
374  /* syscache access needs a transaction env. */
376  /* make dbname live outside TX context */
380  /* CommitTransactionCommand switches to TopMemoryContext */
382  }
383 
385  MemSet(nulls, false, sizeof(nulls));
386 
387  /* need a tuple descriptor representing four columns */
388  tupdesc = CreateTemplateTupleDesc(4, false);
389  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
390  TEXTOID, -1, 0);
391  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
392  INT4OID, -1, 0);
393  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
394  TEXTOID, -1, 0);
395  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
396  TEXTOID, -1, 0);
397 
398  /* prepare for projection of tuples */
399  tstate = begin_tup_output_tupdesc(dest, tupdesc);
400 
401  /* column 1: system identifier */
402  values[0] = CStringGetTextDatum(sysid);
403 
404  /* column 2: timeline */
405  values[1] = Int32GetDatum(ThisTimeLineID);
406 
407  /* column 3: wal location */
408  values[2] = CStringGetTextDatum(xloc);
409 
410  /* column 4: database name, or NULL if none */
411  if (dbname)
412  values[3] = CStringGetTextDatum(dbname);
413  else
414  nulls[3] = true;
415 
416  /* send it to dest */
417  do_tup_output(tstate, values, nulls);
418 
419  end_tup_output(tstate);
420 }
#define TEXTOID
Definition: pg_type.h:324
void CommitTransactionCommand(void)
Definition: xact.c:2744
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1253
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define INT4OID
Definition: pg_type.h:316
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:853
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8254
bool RecoveryInProgress(void)
Definition: xlog.c:7922
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1311
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:576
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2056
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
unsigned int uint32
Definition: c.h:296
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1235
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:77
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:181
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2673
char * dbname
Definition: streamutil.c:42
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:43
static Datum values[MAXATTR]
Definition: bootstrap.c:164
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4711
#define Int32GetDatum(X)
Definition: postgres.h:485
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2849
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:339
bool am_cascading_walsender
Definition: walsender.c:115

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2212 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, ereport, errcode(), errmsg(), FATAL, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2213 {
2214  int i;
2215 
2216  /*
2217  * WalSndCtl should be set up already (we inherit this by fork() or
2218  * EXEC_BACKEND mechanism from the postmaster).
2219  */
2220  Assert(WalSndCtl != NULL);
2221  Assert(MyWalSnd == NULL);
2222 
2223  /*
2224  * Find a free walsender slot and reserve it. If this fails, we must be
2225  * out of WalSnd structures.
2226  */
2227  for (i = 0; i < max_wal_senders; i++)
2228  {
2229  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2230 
2231  SpinLockAcquire(&walsnd->mutex);
2232 
2233  if (walsnd->pid != 0)
2234  {
2235  SpinLockRelease(&walsnd->mutex);
2236  continue;
2237  }
2238  else
2239  {
2240  /*
2241  * Found a free slot. Reserve it for us.
2242  */
2243  walsnd->pid = MyProcPid;
2244  walsnd->sentPtr = InvalidXLogRecPtr;
2245  walsnd->write = InvalidXLogRecPtr;
2246  walsnd->flush = InvalidXLogRecPtr;
2247  walsnd->apply = InvalidXLogRecPtr;
2248  walsnd->writeLag = -1;
2249  walsnd->flushLag = -1;
2250  walsnd->applyLag = -1;
2251  walsnd->state = WALSNDSTATE_STARTUP;
2252  walsnd->latch = &MyProc->procLatch;
2253  SpinLockRelease(&walsnd->mutex);
2254  /* don't need the lock anymore */
2255  MyWalSnd = (WalSnd *) walsnd;
2256 
2257  break;
2258  }
2259  }
2260  if (MyWalSnd == NULL)
2261  ereport(FATAL,
2262  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2263  errmsg("number of requested standby connections "
2264  "exceeds max_wal_senders (currently %d)",
2265  max_wal_senders)));
2266 
2267  /* Arrange to clean up at walsender exit */
2269 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:39
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
int errcode(int sqlerrcode)
Definition: elog.c:575
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2273
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define FATAL
Definition: elog.h:52
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:670
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3441 of file walsender.c.

References Assert, LAG_TRACKER_BUFFER_SIZE, LagTracker, WalTimeSample::lsn, next, and WalTimeSample::time.

Referenced by ProcessStandbyReplyMessage().

3442 {
3443  TimestampTz time = 0;
3444 
3445  /* Read all unread samples up to this LSN or end of buffer. */
3446  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3447  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3448  {
3449  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3450  LagTracker.last_read[head] =
3451  LagTracker.buffer[LagTracker.read_heads[head]];
3452  LagTracker.read_heads[head] =
3453  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3454  }
3455 
3456  /*
3457  * If the lag tracker is empty, that means the standby has processed
3458  * everything we've ever sent so we should now clear 'last_read'. If we
3459  * didn't do that, we'd risk using a stale and irrelevant sample for
3460  * interpolation at the beginning of the next burst of WAL after a period
3461  * of idleness.
3462  */
3463  if (LagTracker.read_heads[head] == LagTracker.write_head)
3464  LagTracker.last_read[head].time = 0;
3465 
3466  if (time > now)
3467  {
3468  /* If the clock somehow went backwards, treat as not found. */
3469  return -1;
3470  }
3471  else if (time == 0)
3472  {
3473  /*
3474  * We didn't cross a time. If there is a future sample that we
3475  * haven't reached yet, and we've already reached at least one sample,
3476  * let's interpolate the local flushed time. This is mainly useful
3477  * for reporting a completely stuck apply position as having
3478  * increasing lag, since otherwise we'd have to wait for it to
3479  * eventually start moving again and cross one of our samples before
3480  * we can show the lag increasing.
3481  */
3482  if (LagTracker.read_heads[head] == LagTracker.write_head)
3483  {
3484  /* There are no future samples, so we can't interpolate. */
3485  return -1;
3486  }
3487  else if (LagTracker.last_read[head].time != 0)
3488  {
3489  /* We can interpolate between last_read and the next sample. */
3490  double fraction;
3491  WalTimeSample prev = LagTracker.last_read[head];
3492  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3493 
3494  if (lsn < prev.lsn)
3495  {
3496  /*
3497  * Reported LSNs shouldn't normally go backwards, but it's
3498  * possible when there is a timeline change. Treat as not
3499  * found.
3500  */
3501  return -1;
3502  }
3503 
3504  Assert(prev.lsn < next.lsn);
3505 
3506  if (prev.time > next.time)
3507  {
3508  /* If the clock somehow went backwards, treat as not found. */
3509  return -1;
3510  }
3511 
3512  /* See how far we are between the previous and next samples. */
3513  fraction =
3514  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3515 
3516  /* Scale the local flush time proportionally. */
3517  time = (TimestampTz)
3518  ((double) prev.time + (next.time - prev.time) * fraction);
3519  }
3520  else
3521  {
3522  /*
3523  * We have only a future sample, implying that we were entirely
3524  * caught up but and now there is a new burst of WAL and the
3525  * standby hasn't processed the first sample yet. Until the
3526  * standby reaches the future sample the best we can do is report
3527  * the hypothetical lag if that sample were to be replayed now.
3528  */
3529  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3530  }
3531  }
3532 
3533  /* Return the elapsed time since local flush time in microseconds. */
3534  Assert(time != 0);
3535  return now - time;
3536 }
static int32 next
Definition: blutils.c:210
int64 TimestampTz
Definition: timestamp.h:39
TimestampTz time
Definition: walsender.c:204
static struct @26 LagTracker
XLogRecPtr lsn
Definition: walsender.c:203
#define Assert(condition)
Definition: c.h:670
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3376 of file walsender.c.

References am_walsender, i, LAG_TRACKER_BUFFER_SIZE, LagTracker, and NUM_SYNC_REP_WAIT_MODE.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3377 {
3378  bool buffer_full;
3379  int new_write_head;
3380  int i;
3381 
3382  if (!am_walsender)
3383  return;
3384 
3385  /*
3386  * If the lsn hasn't advanced since last time, then do nothing. This way
3387  * we only record a new sample when new WAL has been written.
3388  */
3389  if (LagTracker.last_lsn == lsn)
3390  return;
3391  LagTracker.last_lsn = lsn;
3392 
3393  /*
3394  * If advancing the write head of the circular buffer would crash into any
3395  * of the read heads, then the buffer is full. In other words, the
3396  * slowest reader (presumably apply) is the one that controls the release
3397  * of space.
3398  */
3399  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3400  buffer_full = false;
3401  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3402  {
3403  if (new_write_head == LagTracker.read_heads[i])
3404  buffer_full = true;
3405  }
3406 
3407  /*
3408  * If the buffer is full, for now we just rewind by one slot and overwrite
3409  * the last sample, as a simple (if somewhat uneven) way to lower the
3410  * sampling rate. There may be better adaptive compaction algorithms.
3411  */
3412  if (buffer_full)
3413  {
3414  new_write_head = LagTracker.write_head;
3415  if (LagTracker.write_head > 0)
3416  LagTracker.write_head--;
3417  else
3418  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3419  }
3420 
3421  /* Store a sample at the current write head position. */
3422  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3423  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3424  LagTracker.write_head = new_write_head;
3425 }
static struct @26 LagTracker
bool am_walsender
Definition: walsender.c:114
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page,
TimeLineID pageTLI 
)
static

Definition at line 748 of file walsender.c.

References XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WalSndWaitForWal(), XLogRead(), and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

750 {
751  XLogRecPtr flushptr;
752  int count;
753 
754  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
756  sendTimeLine = state->currTLI;
758  sendTimeLineNextTLI = state->nextTLI;
759 
760  /* make sure we have enough WAL available */
761  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
762 
763  /* fail if not (implies we are going to shut down) */
764  if (flushptr < targetPagePtr + reqLen)
765  return -1;
766 
767  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
768  count = XLOG_BLCKSZ; /* more than one block available */
769  else
770  count = flushptr - targetPagePtr; /* part of the page available */
771 
772  /* now actually read the data, we know it's there */
773  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
774 
775  return count;
776 }
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:802
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:180
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2301
TimeLineID nextTLI
Definition: xlogreader.h:186
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1267
TimeLineID ThisTimeLineID
Definition: xlog.c:181
TimeLineID currTLI
Definition: xlogreader.h:170
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
static bool sendTimeLineIsHistoric
Definition: walsender.c:150

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3135 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3136 {
3137  Interval *result = palloc(sizeof(Interval));
3138 
3139  result->month = 0;
3140  result->day = 0;
3141  result->time = offset;
3142 
3143  return result;
3144 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:848

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 782 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

785 {
786  ListCell *lc;
787  bool snapshot_action_given = false;
788  bool reserve_wal_given = false;
789 
790  /* Parse options */
791  foreach(lc, cmd->options)
792  {
793  DefElem *defel = (DefElem *) lfirst(lc);
794 
795  if (strcmp(defel->defname, "export_snapshot") == 0)
796  {
797  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
798  ereport(ERROR,
799  (errcode(ERRCODE_SYNTAX_ERROR),
800  errmsg("conflicting or redundant options")));
801 
802  snapshot_action_given = true;
803  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
805  }
806  else if (strcmp(defel->defname, "use_snapshot") == 0)
807  {
808  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
809  ereport(ERROR,
810  (errcode(ERRCODE_SYNTAX_ERROR),
811  errmsg("conflicting or redundant options")));
812 
813  snapshot_action_given = true;
814  *snapshot_action = CRS_USE_SNAPSHOT;
815  }
816  else if (strcmp(defel->defname, "reserve_wal") == 0)
817  {
818  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
819  ereport(ERROR,
820  (errcode(ERRCODE_SYNTAX_ERROR),
821  errmsg("conflicting or redundant options")));
822 
823  reserve_wal_given = true;
824  *reserve_wal = true;
825  }
826  else
827  elog(ERROR, "unrecognized option: %s", defel->defname);
828  }
829 }
int errcode(int sqlerrcode)
Definition: elog.c:575
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:719
#define elog
Definition: elog.h:219

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3151 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), i, Int32GetDatum, IntervalPGetDatum, IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, superuser(), SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3152 {
3153 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3154  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3155  TupleDesc tupdesc;
3156  Tuplestorestate *tupstore;
3157  MemoryContext per_query_ctx;
3158  MemoryContext oldcontext;
3159  List *sync_standbys;
3160  int i;
3161 
3162  /* check to see if caller supports us returning a tuplestore */
3163  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3164  ereport(ERROR,
3165  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3166  errmsg("set-valued function called in context that cannot accept a set")));
3167  if (!(rsinfo->allowedModes & SFRM_Materialize))
3168  ereport(ERROR,
3169  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3170  errmsg("materialize mode required, but it is not " \
3171  "allowed in this context")));
3172 
3173  /* Build a tuple descriptor for our result type */
3174  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3175  elog(ERROR, "return type must be a row type");
3176 
3177  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3178  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3179 
3180  tupstore = tuplestore_begin_heap(true, false, work_mem);
3181  rsinfo->returnMode = SFRM_Materialize;
3182  rsinfo->setResult = tupstore;
3183  rsinfo->setDesc = tupdesc;
3184 
3185  MemoryContextSwitchTo(oldcontext);
3186 
3187  /*
3188  * Get the currently active synchronous standbys.
3189  */
3190  LWLockAcquire(SyncRepLock, LW_SHARED);
3191  sync_standbys = SyncRepGetSyncStandbys(NULL);
3192  LWLockRelease(SyncRepLock);
3193 
3194  for (i = 0; i < max_wal_senders; i++)
3195  {
3196  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3198  XLogRecPtr write;
3199  XLogRecPtr flush;
3200  XLogRecPtr apply;
3201  TimeOffset writeLag;
3202  TimeOffset flushLag;
3203  TimeOffset applyLag;
3204  int priority;
3205  int pid;
3208  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3209 
3210  SpinLockAcquire(&walsnd->mutex);
3211  if (walsnd->pid == 0)
3212  {
3213  SpinLockRelease(&walsnd->mutex);
3214  continue;
3215  }
3216  pid = walsnd->pid;
3217  sentPtr = walsnd->sentPtr;
3218  state = walsnd->state;
3219  write = walsnd->write;
3220  flush = walsnd->flush;
3221  apply = walsnd->apply;
3222  writeLag = walsnd->writeLag;
3223  flushLag = walsnd->flushLag;
3224  applyLag = walsnd->applyLag;
3225  priority = walsnd->sync_standby_priority;
3226  SpinLockRelease(&walsnd->mutex);
3227 
3228  memset(nulls, 0, sizeof(nulls));
3229  values[0] = Int32GetDatum(pid);
3230 
3231  if (!superuser())
3232  {
3233  /*
3234  * Only superusers can see details. Other users only get the pid
3235  * value to know it's a walsender, but no details.
3236  */
3237  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3238  }
3239  else
3240  {
3241  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3242 
3243  if (XLogRecPtrIsInvalid(sentPtr))
3244  nulls[2] = true;
3245  values[2] = LSNGetDatum(sentPtr);
3246 
3247  if (XLogRecPtrIsInvalid(write))
3248  nulls[3] = true;
3249  values[3] = LSNGetDatum(write);
3250 
3251  if (XLogRecPtrIsInvalid(flush))
3252  nulls[4] = true;
3253  values[4] = LSNGetDatum(flush);
3254 
3255  if (XLogRecPtrIsInvalid(apply))
3256  nulls[5] = true;
3257  values[5] = LSNGetDatum(apply);
3258 
3259  /*
3260  * Treat a standby such as a pg_basebackup background process
3261  * which always returns an invalid flush location, as an
3262  * asynchronous standby.
3263  */
3264  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3265 
3266  if (writeLag < 0)
3267  nulls[6] = true;
3268  else
3269  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3270 
3271  if (flushLag < 0)
3272  nulls[7] = true;
3273  else
3274  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3275 
3276  if (applyLag < 0)
3277  nulls[8] = true;
3278  else
3279  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3280 
3281  values[9] = Int32GetDatum(priority);
3282 
3283  /*
3284  * More easily understood version of standby state. This is purely
3285  * informational.
3286  *
3287  * In quorum-based sync replication, the role of each standby
3288  * listed in synchronous_standby_names can be changing very
3289  * frequently. Any standbys considered as "sync" at one moment can
3290  * be switched to "potential" ones at the next moment. So, it's
3291  * basically useless to report "sync" or "potential" as their sync
3292  * states. We report just "quorum" for them.
3293  */
3294  if (priority == 0)
3295  values[10] = CStringGetTextDatum("async");
3296  else if (list_member_int(sync_standbys, i))
3298  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3299  else
3300  values[10] = CStringGetTextDatum("potential");
3301  }
3302 
3303  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3304  }
3305 
3306  /* clean up and return the tuplestore */
3307  tuplestore_donestoring(tupstore);
3308 
3309  return (Datum) 0;
3310 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:563
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
#define write(a, b, c)
Definition: win32.h:14
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:853
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:680
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1722
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
bool list_member_int(const List *list, int datum)
Definition: list.c:485
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:120
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:372
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3116
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:113
static XLogRecPtr sentPtr
Definition: walsender.c:157
int allowedModes
Definition: execnodes.h:269
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:271
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1118
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:203
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:274
static Datum values[MAXATTR]
Definition: bootstrap.c:164
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:267
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:275
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3135
XLogRecPtr apply
Definition: pg_list.h:45

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1706 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1707 {
1708  bool changed = false;
1710 
1711  Assert(lsn != InvalidXLogRecPtr);
1712  SpinLockAcquire(&slot->mutex);
1713  if (slot->data.restart_lsn != lsn)
1714  {
1715  changed = true;
1716  slot->data.restart_lsn = lsn;
1717  }
1718  SpinLockRelease(&slot->mutex);
1719 
1720  if (changed)
1721  {
1724  }
1725 
1726  /*
1727  * One could argue that the slot should be saved to disk now, but that'd
1728  * be energy wasted - the worst lost information can do here is give us
1729  * wrong information in a statistics view - we'll just potentially be more
1730  * conservative in removing files.
1731  */
1732 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:120
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:741
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:670
XLogRecPtr restart_lsn
Definition: slot.h:73
slock_t mutex
Definition: slot.h:93
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1830 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1831 {
1832  bool changed = false;
1834 
1835  SpinLockAcquire(&slot->mutex);
1837 
1838  /*
1839  * For physical replication we don't need the interlock provided by xmin
1840  * and effective_xmin since the consequences of a missed increase are
1841  * limited to query cancellations, so set both at once.
1842  */
1843  if (!TransactionIdIsNormal(slot->data.xmin) ||
1844  !TransactionIdIsNormal(feedbackXmin) ||
1845  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1846  {
1847  changed = true;
1848  slot->data.xmin = feedbackXmin;
1849  slot->effective_xmin = feedbackXmin;
1850  }
1851  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1852  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1853  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1854  {
1855  changed = true;
1856  slot->data.catalog_xmin = feedbackCatalogXmin;
1857  slot->effective_catalog_xmin = feedbackCatalogXmin;
1858  }
1859  SpinLockRelease(&slot->mutex);
1860 
1861  if (changed)
1862  {
1865  }
1866 }
TransactionId xmin
Definition: proc.h:225
ReplicationSlotPersistentData data
Definition: slot.h:120
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:116
TransactionId catalog_xmin
Definition: slot.h:70
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:62
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:117
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:93
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:695
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1573 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1574 {
1575  unsigned char firstchar;
1576  int r;
1577  bool received = false;
1578 
1579  for (;;)
1580  {
1581  pq_startmsgread();
1582  r = pq_getbyte_if_available(&firstchar);
1583  if (r < 0)
1584  {
1585  /* unexpected error or EOF */
1587  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1588  errmsg("unexpected EOF on standby connection")));
1589  proc_exit(0);
1590  }
1591  if (r == 0)
1592  {
1593  /* no data available without blocking */
1594  pq_endmsgread();
1595  break;
1596  }
1597 
1598  /* Read the message contents */
1600  if (pq_getmessage(&reply_message, 0))
1601  {
1603  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1604  errmsg("unexpected EOF on standby connection")));
1605  proc_exit(0);
1606  }
1607 
1608  /*
1609  * If we already received a CopyDone from the frontend, the frontend
1610  * should not send us anything until we've closed our end of the COPY.
1611  * XXX: In theory, the frontend could already send the next command
1612  * before receiving the CopyDone, but libpq doesn't currently allow
1613  * that.
1614  */
1615  if (streamingDoneReceiving && firstchar != 'X')
1616  ereport(FATAL,
1617  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1618  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1619  firstchar)));
1620 
1621  /* Handle the very limited subset of commands expected in this phase */
1622  switch (firstchar)
1623  {
1624  /*
1625  * 'd' means a standby reply wrapped in a CopyData packet.
1626  */
1627  case 'd':
1629  received = true;
1630  break;
1631 
1632  /*
1633  * CopyDone means the standby requested to finish streaming.
1634  * Reply with CopyDone, if we had not sent that already.
1635  */
1636  case 'c':
1637  if (!streamingDoneSending)
1638  {
1639  pq_putmessage_noblock('c', NULL, 0);
1640  streamingDoneSending = true;
1641  }
1642 
1643  streamingDoneReceiving = true;
1644  received = true;
1645  break;
1646 
1647  /*
1648  * 'X' means that the standby is closing down the socket.
1649  */
1650  case 'X':
1651  proc_exit(0);
1652 
1653  default:
1654  ereport(FATAL,
1655  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1656  errmsg("invalid standby message type \"%c\"",
1657  firstchar)));
1658  }
1659  }
1660 
1661  /*
1662  * Save the last reply timestamp if we've received at least one reply.
1663  */
1664  if (received)
1665  {
1667  waiting_for_ping_response = false;
1668  }
1669 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1675
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
void pq_startmsgread(void)
Definition: pqcomm.c:1210
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1040
static bool streamingDoneSending
Definition: walsender.c:179
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:122
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1272
static StringInfoData reply_message
Definition: walsender.c:161
void pq_endmsgread(void)
Definition: pqcomm.c:1234
static bool streamingDoneReceiving
Definition: walsender.c:180
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1907 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, MyPgXact, MyReplicationSlot, PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1908 {
1909  TransactionId feedbackXmin;
1910  uint32 feedbackEpoch;
1911  TransactionId feedbackCatalogXmin;
1912  uint32 feedbackCatalogEpoch;
1913 
1914  /*
1915  * Decipher the reply message. The caller already consumed the msgtype
1916  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1917  * of this message.
1918  */
1919  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1920  feedbackXmin = pq_getmsgint(&reply_message, 4);
1921  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1922  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1923  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1924 
1925  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1926  feedbackXmin,
1927  feedbackEpoch,
1928  feedbackCatalogXmin,
1929  feedbackCatalogEpoch);
1930 
1931  /*
1932  * Unset WalSender's xmins if the feedback message values are invalid.
1933  * This happens when the downstream turned hot_standby_feedback off.
1934  */
1935  if (!TransactionIdIsNormal(feedbackXmin)
1936  && !TransactionIdIsNormal(feedbackCatalogXmin))
1937  {
1939  if (MyReplicationSlot != NULL)
1940  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1941  return;
1942  }
1943 
1944  /*
1945  * Check that the provided xmin/epoch are sane, that is, not in the future
1946  * and not so far back as to be already wrapped around. Ignore if not.
1947  */
1948  if (TransactionIdIsNormal(feedbackXmin) &&
1949  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1950  return;
1951 
1952  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1953  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1954  return;
1955 
1956  /*
1957  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1958  * the xmin will be taken into account by GetOldestXmin. This will hold
1959  * back the removal of dead rows and thereby prevent the generation of
1960  * cleanup conflicts on the standby server.
1961  *
1962  * There is a small window for a race condition here: although we just
1963  * checked that feedbackXmin precedes nextXid, the nextXid could have
1964  * gotten advanced between our fetching it and applying the xmin below,
1965  * perhaps far enough to make feedbackXmin wrap around. In that case the
1966  * xmin we set here would be "in the future" and have no effect. No point
1967  * in worrying about this since it's too late to save the desired data
1968  * anyway. Assuming that the standby sends us an increasing sequence of
1969  * xmins, this could only happen during the first reply cycle, else our
1970  * own xmin would prevent nextXid from advancing so far.
1971  *
1972  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1973  * is assumed atomic, and there's no real need to prevent a concurrent
1974  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1975  * safe, and if we're moving it backwards, well, the data is at risk
1976  * already since a VACUUM could have just finished calling GetOldestXmin.)
1977  *
1978  * If we're using a replication slot we reserve the xmin via that,
1979  * otherwise via the walsender's PGXACT entry. We can only track the
1980  * catalog xmin separately when using a slot, so we store the least of the
1981  * two provided when not using a slot.
1982  *
1983  * XXX: It might make sense to generalize the ephemeral slot concept and
1984  * always use the slot mechanism to handle the feedback xmin.
1985  */
1986  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1987  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1988  else
1989  {
1990  if (TransactionIdIsNormal(feedbackCatalogXmin)
1991  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
1992  MyPgXact->xmin = feedbackCatalogXmin;
1993  else
1994  MyPgXact->xmin = feedbackXmin;
1995  }
1996 }
uint32 TransactionId
Definition: c.h:445
TransactionId xmin
Definition: proc.h:225
PGXACT * MyPgXact
Definition: proc.c:68
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1879
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:296
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:161
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1830
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define elog
Definition: elog.h:219
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1675 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1676 {
1677  char msgtype;
1678 
1679  /*
1680  * Check message type from the first byte.
1681  */
1682  msgtype = pq_getmsgbyte(&reply_message);
1683 
1684  switch (msgtype)
1685  {
1686  case 'r':
1688  break;
1689 
1690  case 'h':
1692  break;
1693 
1694  default:
1696  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1697  errmsg("unexpected message type \"%c\"", msgtype)));
1698  proc_exit(0);
1699  }
1700 }
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static StringInfoData reply_message
Definition: walsender.c:161
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1738
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1907

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1738 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1739 {
1740  XLogRecPtr writePtr,
1741  flushPtr,
1742  applyPtr;
1743  bool replyRequested;
1744  TimeOffset writeLag,
1745  flushLag,
1746  applyLag;
1747  bool clearLagTimes;
1748  TimestampTz now;
1749 
1750  static bool fullyAppliedLastTime = false;
1751 
1752  /* the caller already consumed the msgtype byte */
1753  writePtr = pq_getmsgint64(&reply_message);
1754  flushPtr = pq_getmsgint64(&reply_message);
1755  applyPtr = pq_getmsgint64(&reply_message);
1756  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1757  replyRequested = pq_getmsgbyte(&reply_message);
1758 
1759  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1760  (uint32) (writePtr >> 32), (uint32) writePtr,
1761  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1762  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1763  replyRequested ? " (reply requested)" : "");
1764 
1765  /* See if we can compute the round-trip lag for these positions. */
1766  now = GetCurrentTimestamp();
1767  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1768  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1769  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1770 
1771  /*
1772  * If the standby reports that it has fully replayed the WAL in two
1773  * consecutive reply messages, then the second such message must result
1774  * from wal_receiver_status_interval expiring on the standby. This is a
1775  * convenient time to forget the lag times measured when it last
1776  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1777  * until more WAL traffic arrives.
1778  */
1779  clearLagTimes = false;
1780  if (applyPtr == sentPtr)
1781  {
1782  if (fullyAppliedLastTime)
1783  clearLagTimes = true;
1784  fullyAppliedLastTime = true;
1785  }
1786  else
1787  fullyAppliedLastTime = false;
1788 
1789  /* Send a reply if the standby requested one. */
1790  if (replyRequested)
1791  WalSndKeepalive(false);
1792 
1793  /*
1794  * Update shared state for this WalSender process based on reply data from
1795  * standby.
1796  */
1797  {
1798  WalSnd *walsnd = MyWalSnd;
1799 
1800  SpinLockAcquire(&walsnd->mutex);
1801  walsnd->write = writePtr;
1802  walsnd->flush = flushPtr;
1803  walsnd->apply = applyPtr;
1804  if (writeLag != -1 || clearLagTimes)
1805  walsnd->writeLag = writeLag;
1806  if (flushLag != -1 || clearLagTimes)
1807  walsnd->flushLag = flushLag;
1808  if (applyLag != -1 || clearLagTimes)
1809  walsnd->applyLag = applyLag;
1810  SpinLockRelease(&walsnd->mutex);
1811  }
1812 
1815 
1816  /*
1817  * Advance our local xmin horizon when the client confirmed a flush.
1818  */
1819  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1820  {
1823  else
1825  }
1826 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3318
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1706
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:296
#define SlotIsLogical(slot)
Definition: slot.h:142
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:161
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3441
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:914
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define elog
Definition: elog.h:219
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:412
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
bool am_cascading_walsender
Definition: walsender.c:115
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 427 of file walsender.c.

References buf, BYTEAOID, CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TEXTOID, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

428 {
430  char histfname[MAXFNAMELEN];
431  char path[MAXPGPATH];
432  int fd;
433  off_t histfilelen;
434  off_t bytesleft;
435  Size len;
436 
437  /*
438  * Reply with a result set with one row, and two columns. The first col is
439  * the name of the history file, 2nd is the contents.
440  */
441 
442  TLHistoryFileName(histfname, cmd->timeline);
443  TLHistoryFilePath(path, cmd->timeline);
444 
445  /* Send a RowDescription message */
446  pq_beginmessage(&buf, 'T');
447  pq_sendint16(&buf, 2); /* 2 fields */
448 
449  /* first field */
450  pq_sendstring(&buf, "filename"); /* col name */
451  pq_sendint32(&buf, 0); /* table oid */
452  pq_sendint16(&buf, 0); /* attnum */
453  pq_sendint32(&buf, TEXTOID); /* type oid */
454  pq_sendint16(&buf, -1); /* typlen */
455  pq_sendint32(&buf, 0); /* typmod */
456  pq_sendint16(&buf, 0); /* format code */
457 
458  /* second field */
459  pq_sendstring(&buf, "content"); /* col name */
460  pq_sendint32(&buf, 0); /* table oid */
461  pq_sendint16(&buf, 0); /* attnum */
462  pq_sendint32(&buf, BYTEAOID); /* type oid */
463  pq_sendint16(&buf, -1); /* typlen */
464  pq_sendint32(&buf, 0); /* typmod */
465  pq_sendint16(&buf, 0); /* format code */
466  pq_endmessage(&buf);
467 
468  /* Send a DataRow message */
469  pq_beginmessage(&buf, 'D');
470  pq_sendint16(&buf, 2); /* # of columns */
471  len = strlen(histfname);
472  pq_sendint32(&buf, len); /* col1 len */
473  pq_sendbytes(&buf, histfname, len);
474 
475  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
476  if (fd < 0)
477  ereport(ERROR,
479  errmsg("could not open file \"%s\": %m", path)));
480 
481  /* Determine file length and send it to client */
482  histfilelen = lseek(fd, 0, SEEK_END);
483  if (histfilelen < 0)
484  ereport(ERROR,
486  errmsg("could not seek to end of file \"%s\": %m", path)));
487  if (lseek(fd, 0, SEEK_SET) != 0)
488  ereport(ERROR,
490  errmsg("could not seek to beginning of file \"%s\": %m", path)));
491 
492  pq_sendint32(&buf, histfilelen); /* col2 len */
493 
494  bytesleft = histfilelen;
495  while (bytesleft > 0)
496  {
497  char rbuf[BLCKSZ];
498  int nread;
499 
501  nread = read(fd, rbuf, sizeof(rbuf));
503  if (nread <= 0)
504  ereport(ERROR,
506  errmsg("could not read file \"%s\": %m",
507  path)));
508  pq_sendbytes(&buf, rbuf, nread);
509  bytesleft -= nread;
510  }
511  CloseTransientFile(fd);
512 
513  pq_endmessage(&buf);
514 }
#define TEXTOID
Definition: pg_type.h:324
static void pq_sendint32(StringInfo buf, int32 i)
Definition: pqformat.h:148
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1025
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2392
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:67
int errcode_for_file_access(void)
Definition: elog.c:598
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1244
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2562
#define MAXFNAMELEN
size_t Size
Definition: c.h:404
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1220
#define BYTEAOID
Definition: pg_type.h:292
static void pq_sendint16(StringInfo buf, int16 i)
Definition: pqformat.h:140
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1040 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1041 {
1043 
1044  /* make sure that our requirements are still fulfilled */
1046 
1048 
1049  ReplicationSlotAcquire(cmd->slotname, true);
1050 
1051  /*
1052  * Force a disconnect, so that the decoding code doesn't need to care
1053  * about an eventual switch from running in recovery, to running in a
1054  * normal environment. Client code is expected to handle reconnects.
1055  */
1057  {
1058  ereport(LOG,
1059  (errmsg("terminating walsender process after promotion")));
1060  got_STOPPING = true;
1061  }
1062 
1064 
1065  /* Send a CopyBothResponse message, and start streaming */
1066  pq_beginmessage(&buf, 'W');
1067  pq_sendbyte(&buf, 0);
1068  pq_sendint16(&buf, 0);
1069  pq_endmessage(&buf);
1070  pq_flush();
1071 
1072  /*
1073  * Initialize position to the last ack'ed one, then the xlog records begin
1074  * to be shipped from that position.
1075  */
1081 
1082  /* Start reading WAL from the oldest required WAL. */
1084 
1085  /*
1086  * Report the location after which we'll send out further commits as the
1087  * current sentPtr.
1088  */
1090 
1091  /* Also update the sent position status in shared memory */
1095 
1096  replication_active = true;
1097 
1099 
1100  /* Main loop of walsender */
1102 
1105 
1106  replication_active = false;
1107  if (got_STOPPING)
1108  proc_exit(0);
1110 
1111  /* Get out of COPY mode (CommandComplete). */
1112  EndCommand("COPY 0", DestRemote);
1113 }
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:99
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
ReplicationSlotPersistentData data
Definition: slot.h:120
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7922
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:81
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:748
#define SpinLockAcquire(lock)
Definition: spin.h:62
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
static char * buf
Definition: pg_test_fsync.c:67
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1151
static XLogRecPtr logical_startptr
Definition: walsender.c:198
void ReplicationSlotRelease(void)
Definition: slot.c:416
void SyncRepInitConfig(void)
Definition: syncrep.c:384
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2075
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1124
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:343
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:670
void WalSndSetState(WalSndState state)
Definition: walsender.c:3097
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:475
static void XLogSendLogical(void)
Definition: walsender.c:2739
XLogRecPtr restart_lsn
Definition: slot.h:73
static void pq_sendint16(StringInfo buf, int16 i)
Definition: pqformat.h:140
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
bool am_cascading_walsender
Definition: walsender.c:115
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1241

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 523 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), INT8OID, InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TEXTOID, ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TupleDescInitBuiltinEntry(), values, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

524 {
526  XLogRecPtr FlushPtr;
527 
528  if (ThisTimeLineID == 0)
529  ereport(ERROR,
530  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
531  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
532 
533  /*
534  * We assume here that we're logging enough information in the WAL for
535  * log-shipping, since this is checked in PostmasterMain().
536  *
537  * NOTE: wal_level can only change at shutdown, so in most cases it is
538  * difficult for there to be WAL data that we can still see that was
539  * written at wal_level='minimal'.
540  */
541 
542  if (cmd->slotname)
543  {
544  ReplicationSlotAcquire(cmd->slotname, true);
546  ereport(ERROR,
547  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548  (errmsg("cannot use a logical replication slot for physical replication"))));
549  }
550 
551  /*
552  * Select the timeline. If it was given explicitly by the client, use
553  * that. Otherwise use the timeline of the last replayed record, which is
554  * kept in ThisTimeLineID.
555  */
557  {
558  /* this also updates ThisTimeLineID */
559  FlushPtr = GetStandbyFlushRecPtr();
560  }
561  else
562  FlushPtr = GetFlushRecPtr();
563 
564  if (cmd->timeline != 0)
565  {
566  XLogRecPtr switchpoint;
567 
568  sendTimeLine = cmd->timeline;
570  {
571  sendTimeLineIsHistoric = false;
573  }
574  else
575  {
576  List *timeLineHistory;
577 
578  sendTimeLineIsHistoric = true;
579 
580  /*
581  * Check that the timeline the client requested exists, and the
582  * requested start location is on that timeline.
583  */
584  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
585  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
587  list_free_deep(timeLineHistory);
588 
589  /*
590  * Found the requested timeline in the history. Check that
591  * requested startpoint is on that timeline in our history.
592  *
593  * This is quite loose on purpose. We only check that we didn't
594  * fork off the requested timeline before the switchpoint. We
595  * don't check that we switched *to* it before the requested
596  * starting point. This is because the client can legitimately
597  * request to start replication from the beginning of the WAL
598  * segment that contains switchpoint, but on the new timeline, so
599  * that it doesn't end up with a partial segment. If you ask for
600  * too old a starting point, you'll get an error later when we
601  * fail to find the requested WAL segment in pg_wal.
602  *
603  * XXX: we could be more strict here and only allow a startpoint
604  * that's older than the switchpoint, if it's still in the same
605  * WAL segment.
606  */
607  if (!XLogRecPtrIsInvalid(switchpoint) &&
608  switchpoint < cmd->startpoint)
609  {
610  ereport(ERROR,
611  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
612  (uint32) (cmd->startpoint >> 32),
613  (uint32) (cmd->startpoint),
614  cmd->timeline),
615  errdetail("This server's history forked from timeline %u at %X/%X.",
616  cmd->timeline,
617  (uint32) (switchpoint >> 32),
618  (uint32) (switchpoint))));
619  }
620  sendTimeLineValidUpto = switchpoint;
621  }
622  }
623  else
624  {
627  sendTimeLineIsHistoric = false;
628  }
629 
631 
632  /* If there is nothing to stream, don't even enter COPY mode */
634  {
635  /*
636  * When we first start replication the standby will be behind the
637  * primary. For some applications, for example synchronous
638  * replication, it is important to have a clear state for this initial
639  * catchup mode, so we can trigger actions when we change streaming
640  * state later. We may stay in this state for a long time, which is
641  * exactly why we want to be able to monitor whether or not we are
642  * still here.
643  */
645 
646  /* Send a CopyBothResponse message, and start streaming */
647  pq_beginmessage(&buf, 'W');
648  pq_sendbyte(&buf, 0);
649  pq_sendint16(&buf, 0);
650  pq_endmessage(&buf);
651  pq_flush();
652 
653  /*
654  * Don't allow a request to stream from a future point in WAL that
655  * hasn't been flushed to disk in this server yet.
656  */
657  if (FlushPtr < cmd->startpoint)
658  {
659  ereport(ERROR,
660  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
661  (uint32) (cmd->startpoint >> 32),
662  (uint32) (cmd->startpoint),
663  (uint32) (FlushPtr >> 32),
664  (uint32) (FlushPtr))));
665  }
666 
667  /* Start streaming from the requested point */
668  sentPtr = cmd->startpoint;
669 
670  /* Initialize shared memory status, too */
674 
676 
677  /* Main loop of walsender */
678  replication_active = true;
679 
681 
682  replication_active = false;
683  if (got_STOPPING)
684  proc_exit(0);
686 
688  }
689 
690  if (cmd->slotname)
692 
693  /*
694  * Copy is finished now. Send a single-row result set indicating the next
695  * timeline.
696  */
698  {
699  char startpos_str[8 + 1 + 8 + 1];
701  TupOutputState *tstate;
702  TupleDesc tupdesc;
703  Datum values[2];
704  bool nulls[2];
705 
706  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
707  (uint32) (sendTimeLineValidUpto >> 32),
709 
711  MemSet(nulls, false, sizeof(nulls));
712 
713  /*
714  * Need a tuple descriptor representing two columns. int8 may seem
715  * like a surprising data type for this, but in theory int4 would not
716  * be wide enough for this, as TimeLineID is unsigned.
717  */
718  tupdesc = CreateTemplateTupleDesc(2, false);
719  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
720  INT8OID, -1, 0);
721  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
722  TEXTOID, -1, 0);
723 
724  /* prepare for projection of tuple */
725  tstate = begin_tup_output_tupdesc(dest, tupdesc);
726 
727  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
728  values[1] = CStringGetTextDatum(startpos_str);
729 
730  /* send it to dest */
731  do_tup_output(tstate, values, nulls);
732 
733  end_tup_output(tstate);
734  }
735 
736  /* Send CommandComplete message */
737  pq_puttextmessage('C', "START_STREAMING");
738 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
#define TEXTOID
Definition: pg_type.h:324
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2476
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1253
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define MemSet(start, val, len)
Definition: c.h:853
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8254
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
void list_free_deep(List *list)
Definition: list.c:1147
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1311
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:576
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:67
static bool streamingDoneSending
Definition: walsender.c:179
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
int errdetail(const char *fmt,...)
Definition: elog.c:873
unsigned int uint32
Definition: c.h:296
void ReplicationSlotRelease(void)
Definition: slot.c:416
#define SlotIsLogical(slot)
Definition: slot.h:142
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1786
void SyncRepInitConfig(void)
Definition: syncrep.c:384
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1235
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2075
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:372
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:561
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
TimeLineID ThisTimeLineID
Definition: xlog.c:181
#define INT8OID
Definition: pg_type.h:304
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:670
void WalSndSetState(WalSndState state)
Definition: walsender.c:3097
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:43
static bool streamingDoneReceiving
Definition: walsender.c:180
static void pq_sendint16(StringInfo buf, int16 i)
Definition: pqformat.h:140
static Datum values[MAXATTR]
Definition: bootstrap.c:164
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2849
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
Definition: pg_list.h:45
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
bool am_cascading_walsender
Definition: walsender.c:115

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 1879 of file walsender.c.

References GetNextXidAndEpoch(), and TransactionIdPrecedesOrEquals().

Referenced by ProcessStandbyHSFeedbackMessage().

1880 {
1881  TransactionId nextXid;
1882  uint32 nextEpoch;
1883 
1884  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1885 
1886  if (xid <= nextXid)
1887  {
1888  if (epoch != nextEpoch)
1889  return false;
1890  }
1891  else
1892  {
1893  if (epoch + 1 != nextEpoch)
1894  return false;
1895  }
1896 
1897  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1898  return false; /* epoch OK, but it's wrapped around */
1899 
1900  return true;
1901 }
uint32 TransactionId
Definition: c.h:445
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8323
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:296
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( TimestampTz  now)
static

Definition at line 2048 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2049 {
2050  TimestampTz timeout;
2051 
2052  /* don't bail out if we're doing something that doesn't require timeouts */
2053  if (last_reply_timestamp <= 0)
2054  return;
2055 
2058 
2059  if (wal_sender_timeout > 0 && now >= timeout)
2060  {
2061  /*
2062  * Since typically expiration of replication timeout means
2063  * communication problem, we don't send the error message to the
2064  * standby.
2065  */
2067  (errmsg("terminating walsender process due to replication timeout")));
2068 
2069  WalSndShutdown();
2070  }
2071 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndShutdown(void)
Definition: walsender.c:228
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:797
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2006 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2007 {
2008  long sleeptime = 10000; /* 10 s */
2009 
2011  {
2012  TimestampTz wakeup_time;
2013  long sec_to_timeout;
2014  int microsec_to_timeout;
2015 
2016  /*
2017  * At the latest stop sleeping once wal_sender_timeout has been
2018  * reached.
2019  */
2022 
2023  /*
2024  * If no ping has been sent yet, wakeup when it's time to do so.
2025  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2026  * the timeout passed without a response.
2027  */
2030  wal_sender_timeout / 2);
2031 
2032  /* Compute relative time until wakeup. */
2033  TimestampDifference(now, wakeup_time,
2034  &sec_to_timeout, &microsec_to_timeout);
2035 
2036  sleeptime = sec_to_timeout * 1000 +
2037  microsec_to_timeout / 1000;
2038  }
2039 
2040  return sleeptime;
2041 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:42
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2809 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2810 {
2811  XLogRecPtr replicatedPtr;
2812 
2813  /* ... let's just be real sure we're caught up ... */
2814  send_data();
2815 
2816  /*
2817  * To figure out whether all WAL has successfully been replicated, check
2818  * flush location if valid, write otherwise. Tools like pg_receivewal will
2819  * usually (unless in synchronous mode) return an invalid flush location.
2820  */
2821  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2823 
2824  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2825  !pq_is_send_pending())
2826  {
2827  /* Inform the standby that XLOG streaming is done */
2828  EndCommand("COPY 0", DestRemote);
2829  pq_flush();
2830 
2831  proc_exit(0);
2832  }
2834  {
2835  WalSndKeepalive(true);
2837  }
2838 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3318
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static bool WalSndCaughtUp
Definition: walsender.c:183
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 291 of file walsender.c.

References close, ConditionVariableCancelSleep(), DestNone, DestRemote, got_SIGUSR2, got_STOPPING, LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, WalSndSetState(), WalSndShutdown(), WALSNDSTATE_STARTUP, and whereToSendOutput.

Referenced by PostgresMain().

292 {
296 
297  if (sendFile >= 0)
298  {
299  close(sendFile);
300  sendFile = -1;
301  }
302 
303  if (MyReplicationSlot != NULL)
305 
307 
308  replication_active = false;
309 
310  if (got_STOPPING || got_SIGUSR2)
311  proc_exit(0);
312 
313  /* Revert back to startup state */
315 }
static int sendFile
Definition: walsender.c:135
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:99
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:416
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1244
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3097
void ReplicationSlotCleanup(void)
Definition: slot.c:471
void LWLockReleaseAll(void)
Definition: lwlock.c:1821
#define close(a)
Definition: win32.h:12

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3116 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3117 {
3118  switch (state)
3119  {
3120  case WALSNDSTATE_STARTUP:
3121  return "startup";
3122  case WALSNDSTATE_BACKUP:
3123  return "backup";
3124  case WALSNDSTATE_CATCHUP:
3125  return "catchup";
3126  case WALSNDSTATE_STREAMING:
3127  return "streaming";
3128  case WALSNDSTATE_STOPPING:
3129  return "stopping";
3130  }
3131  return "UNKNOWN";
3132 }
Definition: regguts.h:298

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3033 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3034 {
3035  int i;
3036 
3037  for (i = 0; i < max_wal_senders; i++)
3038  {
3039  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3040  pid_t pid;
3041 
3042  SpinLockAcquire(&walsnd->mutex);
3043  pid = walsnd->pid;
3044  SpinLockRelease(&walsnd->mutex);
3045 
3046  if (pid == 0)
3047  continue;
3048 
3050  }
3051 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3318 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3319 {
3320  elog(DEBUG2, "sending replication keepalive");
3321 
3322  /* construct the message... */
3324  pq_sendbyte(&output_message, 'k');
3327  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3328 
3329  /* ... and send it wrapped in CopyData */
3331 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
static StringInfoData output_message
Definition: walsender.c:160
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static XLogRecPtr sentPtr
Definition: walsender.c:157
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog
Definition: elog.h:219

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( TimestampTz  now)
static

Definition at line 3337 of file walsender.c.

References last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3338 {
3339  TimestampTz ping_time;
3340 
3341  /*
3342  * Don't send keepalive messages if timeouts are globally disabled or
3343  * we're doing something not partaking in timeouts.
3344  */
3345  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3346  return;
3347 
3349  return;
3350 
3351  /*
3352  * If half of wal_sender_timeout has lapsed without receiving any reply
3353  * from the standby, send a keep-alive message to the standby requesting
3354  * an immediate reply.
3355  */
3357  wal_sender_timeout / 2);
3358  if (now >= ping_time)
3359  {
3360  WalSndKeepalive(true);
3362 
3363  /* Try to flush pending output to the client */
3364  if (pq_flush_if_writable() != 0)
3365  WalSndShutdown();
3366  }
3367 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3318
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:228
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2273 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2274 {
2275  WalSnd *walsnd = MyWalSnd;
2276 
2277  Assert(walsnd != NULL);
2278 
2279  MyWalSnd = NULL;
2280 
2281  SpinLockAcquire(&walsnd->mutex);
2282  /* clear latch while holding the spinlock, so it can safely be read */
2283  walsnd->latch = NULL;
2284  /* Mark WalSnd struct as no longer being in use. */
2285  walsnd->pid = 0;
2286  SpinLockRelease(&walsnd->mutex);
2287 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:670

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2924 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2925 {
2926  int save_errno = errno;
2927 
2928  got_SIGUSR2 = true;
2929  SetLatch(MyLatch);
2930 
2931  errno = save_errno;
2932 }
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
struct Latch * MyLatch
Definition: globals.c:52

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2075 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, now(), PGC_SIGHUP, pgstat_report_activity(), PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, STATE_RUNNING, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

2076 {
2077  /*
2078  * Initialize the last reply timestamp. That enables timeout processing
2079  * from hereon.
2080  */
2082  waiting_for_ping_response = false;
2083 
2084  /* Report to pgstat that this process is running */
2086 
2087  /*
2088  * Loop until we reach the end of this timeline or the client requests to
2089  * stop streaming.
2090  */
2091  for (;;)
2092  {
2093  TimestampTz now;
2094 
2095  /*
2096  * Emergency bailout if postmaster has died. This is to avoid the
2097  * necessity for manual cleanup of all postmaster children.
2098  */
2099  if (!PostmasterIsAlive())
2100  exit(1);
2101 
2102  /* Clear any already-pending wakeups */
2104 
2106 
2107  /* Process any requests or signals received recently */
2108  if (ConfigReloadPending)
2109  {
2110  ConfigReloadPending = false;
2113  }
2114 
2115  /* Check for input from the client */
2117 
2118  /*
2119  * If we have received CopyDone from the client, sent CopyDone
2120  * ourselves, and the output buffer is empty, it's time to exit
2121  * streaming.
2122  */
2124  !pq_is_send_pending())
2125  break;
2126 
2127  /*
2128  * If we don't have any pending data in the output buffer, try to send
2129  * some more. If there is some, we don't bother to call send_data
2130  * again until we've flushed it ... but we'd better assume we are not
2131  * caught up.
2132  */
2133  if (!pq_is_send_pending())
2134  send_data();
2135  else
2136  WalSndCaughtUp = false;
2137 
2138  /* Try to flush pending output to the client */
2139  if (pq_flush_if_writable() != 0)
2140  WalSndShutdown();
2141 
2142  /* If nothing remains to be sent right now ... */
2144  {
2145  /*
2146  * If we're in catchup state, move to streaming. This is an
2147  * important state change for users to know about, since before
2148  * this point data loss might occur if the primary dies and we
2149  * need to failover to the standby. The state change is also
2150  * important for synchronous replication, since commits that
2151  * started to wait at that point might wait for some time.
2152  */
2154  {
2155  ereport(DEBUG1,
2156  (errmsg("standby \"%s\" has now caught up with primary",
2157  application_name)));
2159  }
2160 
2161  /*
2162  * When SIGUSR2 arrives, we send any outstanding logs up to the
2163  * shutdown checkpoint record (i.e., the latest record), wait for
2164  * them to be replicated to the standby, and exit. This may be a
2165  * normal termination at shutdown, or a promotion, the walsender
2166  * is not sure which.
2167  */
2168  if (got_SIGUSR2)
2169  WalSndDone(send_data);
2170  }
2171 
2172  now = GetCurrentTimestamp();
2173 
2174  /* Check for replication timeout. */
2175  WalSndCheckTimeOut(now);
2176 
2177  /* Send keepalive if the time has come */
2179 
2180  /*
2181  * We don't block if not caught up, unless there is unsent data
2182  * pending in which case we'd better block until the socket is
2183  * write-ready. This test is only needed for the case where the
2184  * send_data callback handled a subset of the available data but then
2185  * pq_flush_if_writable flushed it all --- we should immediately try
2186  * to send more.
2187  */
2189  {
2190  long sleeptime;
2191  int wakeEvents;
2192 
2193  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2195 
2196  sleeptime = WalSndComputeSleeptime(now);
2197 
2198  if (pq_is_send_pending())
2199  wakeEvents |= WL_SOCKET_WRITEABLE;
2200 
2201  /* Sleep until something happens or we time out */
2202  WaitLatchOrSocket(MyLatch, wakeEvents,
2203  MyProcPort->sock, sleeptime,
2205  }
2206  }
2207  return;
2208 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:41
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2809
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:42
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
pgsocket sock
Definition: libpq-be.h:118
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:179
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
void SyncRepInitConfig(void)
Definition: syncrep.c:384
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:228
WalSnd * MyWalSnd
Definition: walsender.c:111
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3337
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2006
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2048
void WalSndSetState(WalSndState state)
Definition: walsender.c:3097
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static bool streamingDoneReceiving
Definition: walsender.c:180
char * application_name
Definition: guc.c:470
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static bool waiting_for_ping_response
Definition: walsender.c:171
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1573
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1124 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1125 {
1126  /* can't have sync rep confused by sending the same LSN several times */
1127  if (!last_write)
1128  lsn = InvalidXLogRecPtr;
1129 
1130  resetStringInfo(ctx->out);
1131 
1132  pq_sendbyte(ctx->out, 'w');
1133  pq_sendint64(ctx->out, lsn); /* dataStart */
1134  pq_sendint64(ctx->out, lsn); /* walEnd */
1135 
1136  /*
1137  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1138  * reserve space here.
1139  */
1140  pq_sendint64(ctx->out, 0); /* sendtime */
1141 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
StringInfo out
Definition: logical.h:66

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2879 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2880 {
2881  int i;
2882 
2883  for (i = 0; i < max_wal_senders; i++)
2884  {
2885  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2886 
2887  SpinLockAcquire(&walsnd->mutex);
2888  if (walsnd->pid == 0)
2889  {
2890  SpinLockRelease(&walsnd->mutex);
2891  continue;
2892  }
2893  walsnd->needreload = true;
2894  SpinLockRelease(&walsnd->mutex);
2895  }
2896 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3097 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3098 {
3099  WalSnd *walsnd = MyWalSnd;
3100 
3102 
3103  if (walsnd->state == state)
3104  return;
3105 
3106  SpinLockAcquire(&walsnd->mutex);
3107  walsnd->state = state;
3108  SpinLockRelease(&walsnd->mutex);
3109 }
slock_t mutex
bool am_walsender
Definition: walsender.c:114
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:670
Definition: regguts.h:298

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 2972 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

2973 {
2974  bool found;
2975  int i;
2976 
2977  WalSndCtl = (WalSndCtlData *)
2978  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2979 
2980  if (!found)
2981  {
2982  /* First time through, so initialize */
2984 
2985  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2987 
2988  for (i = 0; i < max_wal_senders; i++)
2989  {
2990  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2991 
2992  SpinLockInit(&walsnd->mutex);
2993  }
2994  }
2995 }
Size WalSndShmemSize(void)
Definition: walsender.c:2960
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
#define MemSet(start, val, len)
Definition: c.h:853
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:120
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 2960 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2961 {
2962  Size size = 0;
2963 
2964  size = offsetof(WalSndCtlData, walsnds);
2965  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2966 
2967  return size;
2968 }
int max_wal_senders
Definition: walsender.c:120
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:404
#define offsetof(type, field)
Definition: c.h:593

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 228 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndErrorCleanup(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

260 {
262 
263  /* Create a per-walsender data structure in shared memory */
265 
266  /* Set up resource owner */
267  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
268 
269  /*
270  * Let postmaster know that we're a WAL sender. Once we've declared us as
271  * a WAL sender process, postmaster will let us outlive the bgwriter and
272  * kill us last in the shutdown sequence, so we get a chance to stream all
273  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
274  * there's no going back, and we mustn't write any WAL records after this.
275  */
278 
279  /* Initialize empty timestamp buffer for lag tracking. */
280  memset(&LagTracker, 0, sizeof(LagTracker));
281 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2212
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
bool RecoveryInProgress(void)
Definition: xlog.c:7922
static struct @26 LagTracker
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
bool am_cascading_walsender
Definition: walsender.c:115
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 2936 of file walsender.c.

References die(), InitializeTimeouts(), PostgresSigHupHandler(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

2937 {
2938  /* Set up signal handlers */
2939  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2940  * file */
2941  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2942  pqsignal(SIGTERM, die); /* request shutdown */
2943  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2944  InitializeTimeouts(); /* establishes SIGALRM handler */
2947  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2948  * shutdown */
2949 
2950  /* Reset some signals that are accepted by postmaster but not here */
2956 }
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGQUIT
Definition: win32_port.h:164
#define SIGTTOU
Definition: win32_port.h:175
#define SIGTTIN
Definition: win32_port.h:174
#define SIGUSR1
Definition: win32_port.h:177
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2924
#define SIGCHLD
Definition: win32_port.h:173
#define SIGWINCH
Definition: win32_port.h:176
#define SIGCONT
Definition: win32_port.h:172
#define SIGPIPE
Definition: win32_port.h:168
#define SIGUSR2
Definition: win32_port.h:178
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2683
#define SIGHUP
Definition: win32_port.h:163
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2723
#define SIG_IGN
Definition: win32_port.h:160
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIG_DFL
Definition: win32_port.h:158
void die(SIGNAL_ARGS)
Definition: postgres.c:2652
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:261
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2591

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1241 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1242 {
1243  static TimestampTz sendTime = 0;
1245 
1246  /*
1247  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1248  * avoid flooding the lag tracker when we commit frequently.
1249  */
1250 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1251  if (!TimestampDifferenceExceeds(sendTime, now,
1253  return;
1254 
1255  LagTrackerWrite(lsn, now);
1256  sendTime = now;
1257 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3376
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1267 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, now(), PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1268 {
1269  int wakeEvents;
1270  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1271 
1272 
1273  /*
1274  * Fast path to avoid acquiring the spinlock in case we already know we
1275  * have enough WAL available. This is particularly interesting if we're
1276  * far behind.
1277  */
1278  if (RecentFlushPtr != InvalidXLogRecPtr &&
1279  loc <= RecentFlushPtr)
1280  return RecentFlushPtr;
1281 
1282  /* Get a more recent flush pointer. */
1283  if (!RecoveryInProgress())
1284  RecentFlushPtr = GetFlushRecPtr();
1285  else
1286  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1287 
1288  for (;;)
1289  {
1290  long sleeptime;
1291  TimestampTz now;
1292 
1293  /*
1294  * Emergency bailout if postmaster has died. This is to avoid the
1295  * necessity for manual cleanup of all postmaster children.
1296  */
1297  if (!PostmasterIsAlive())
1298  exit(1);
1299 
1300  /* Clear any already-pending wakeups */
1302 
1304 
1305  /* Process any requests or signals received recently */
1306  if (ConfigReloadPending)
1307  {
1308  ConfigReloadPending = false;
1311  }
1312 
1313  /* Check for input from the client */
1315 
1316  /*
1317  * If we're shutting down, trigger pending WAL to be written out,
1318  * otherwise we'd possibly end up waiting for WAL that never gets
1319  * written, because walwriter has shut down already.
1320  */
1321  if (got_STOPPING)
1323 
1324  /* Update our idea of the currently flushed position. */
1325  if (!RecoveryInProgress())
1326  RecentFlushPtr = GetFlushRecPtr();
1327  else
1328  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1329 
1330  /*
1331  * If postmaster asked us to stop, don't wait anymore.
1332  *
1333  * It's important to do this check after the recomputation of
1334  * RecentFlushPtr, so we can send all remaining data before shutting
1335  * down.
1336  */
1337  if (got_STOPPING)
1338  break;
1339 
1340  /*
1341  * We only send regular messages to the client for full decoded
1342  * transactions, but a synchronous replication and walsender shutdown
1343  * possibly are waiting for a later location. So we send pings
1344  * containing the flush location every now and then.
1345  */
1346  if (MyWalSnd->flush < sentPtr &&
1347  MyWalSnd->write < sentPtr &&
1349  {
1350  WalSndKeepalive(false);
1352  }
1353 
1354  /* check whether we're done */
1355  if (loc <= RecentFlushPtr)
1356  break;
1357 
1358  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1359  WalSndCaughtUp = true;
1360 
1361  /*
1362  * Try to flush any pending output to the client.
1363  */
1364  if (pq_flush_if_writable() != 0)
1365  WalSndShutdown();
1366 
1367  /*
1368  * If we have received CopyDone from the client, sent CopyDone
1369  * ourselves, and the output buffer is empty, it's time to exit
1370  * streaming, so fail the current WAL fetch request.
1371  */
1373  !pq_is_send_pending())
1374  break;
1375 
1376  now = GetCurrentTimestamp();
1377 
1378  /* die if timeout was reached */
1379  WalSndCheckTimeOut(now);
1380 
1381  /* Send keepalive if the time has come */
1383 
1384  /*
1385  * Sleep until something happens or we time out. Also wait for the
1386  * socket becoming writable, if there's still pending output.
1387  * Otherwise we might sit on sendable output data while waiting for
1388  * new WAL to be generated. (But if we have nothing to send, we don't
1389  * want to wake on socket-writable.)
1390  */
1391  sleeptime = WalSndComputeSleeptime(now);
1392 
1393  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1395 
1396  if (pq_is_send_pending())
1397  wakeEvents |= WL_SOCKET_WRITEABLE;
1398 
1399  WaitLatchOrSocket(MyLatch, wakeEvents,
1400  MyProcPort->sock, sleeptime,
1402  }
1403 
1404  /* reactivate latch so WalSndLoop knows to continue */
1405  SetLatch(MyLatch);
1406  return RecentFlushPtr;
1407 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:41
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8254
int sleeptime
Definition: pg_standby.c:42
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
bool RecoveryInProgress(void)
Definition: xlog.c:7922
pgsocket sock
Definition: libpq-be.h:118
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3318
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11125
bool XLogBackgroundFlush(void)
Definition: xlog.c:2952
static bool streamingDoneSending
Definition: walsender.c:179
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void SyncRepInitConfig(void)
Definition: syncrep.c:384
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:228
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3337
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2006
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2048
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static bool streamingDoneReceiving
Definition: walsender.c:180
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static bool waiting_for_ping_response
Definition: walsender.c:171
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1573
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3059 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3060 {
3061  for (;;)
3062  {
3063  int i;
3064  bool all_stopped = true;
3065 
3066  for (i = 0; i < max_wal_senders; i++)
3067  {
3068  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3069 
3070  SpinLockAcquire(&walsnd->mutex);
3071 
3072  if (walsnd->pid == 0)
3073  {
3074  SpinLockRelease(&walsnd->mutex);
3075  continue;
3076  }
3077 
3078  if (walsnd->state != WALSNDSTATE_STOPPING)
3079  {
3080  all_stopped = false;
3081  SpinLockRelease(&walsnd->mutex);
3082  break;
3083  }
3084  SpinLockRelease(&walsnd->mutex);
3085  }
3086 
3087  /* safe to leave if confirmation is done for all WAL senders */
3088  if (all_stopped)
3089  return;
3090 
3091  pg_usleep(10000L); /* wait for 10 msec */
3092  }
3093 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3004 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3005 {
3006  int i;
3007 
3008  for (i = 0; i < max_wal_senders; i++)
3009  {
3010  Latch *latch;
3011  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3012 
3013  /*
3014  * Get latch pointer with spinlock held, for the unlikely case that
3015  * pointer reads aren't atomic (as they're 8 bytes).
3016  */
3017  SpinLockAcquire(&walsnd->mutex);
3018  latch = walsnd->latch;
3019  SpinLockRelease(&walsnd->mutex);
3020 
3021  if (latch != NULL)
3022  SetLatch(latch);
3023  }
3024 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1151 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1153 {
1154  /* output previously gathered data in a CopyData packet */
1155  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1156 
1157  /*
1158  * Fill the send timestamp last, so that it is taken as late as possible.
1159  * This is somewhat ugly, but the protocol is set as it's already used for
1160  * several releases by streaming physical replication.
1161  */
1164  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1165  tmpbuf.data, sizeof(int64));
1166 
1167  /* fast path */
1168  /* Try to flush pending output to the client */
1169  if (pq_flush_if_writable() != 0)
1170  WalSndShutdown();
1171 
1172  if (!pq_is_send_pending())
1173  return;
1174 
1175  for (;;)
1176  {
1177  int wakeEvents;
1178  long sleeptime;
1179  TimestampTz now;
1180 
1181  /*
1182  * Emergency bailout if postmaster has died. This is to avoid the
1183  * necessity for manual cleanup of all postmaster children.
1184  */
1185  if (!PostmasterIsAlive())
1186  exit(1);
1187 
1188  /* Clear any already-pending wakeups */
1190 
1192 
1193  /* Process any requests or signals received recently */
1194  if (ConfigReloadPending)
1195  {
1196  ConfigReloadPending = false;
1199  }
1200 
1201  /* Check for input from the client */
1203 
1204  /* Try to flush pending output to the client */
1205  if (pq_flush_if_writable() != 0)
1206  WalSndShutdown();
1207 
1208  /* If we finished clearing the buffered data, we're done here. */
1209  if (!pq_is_send_pending())
1210  break;
1211 
1212  now = GetCurrentTimestamp();
1213 
1214  /* die if timeout was reached */
1215  WalSndCheckTimeOut(now);
1216 
1217  /* Send keepalive if the time has come */
1219 
1220  sleeptime = WalSndComputeSleeptime(now);
1221 
1222  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1224 
1225  /* Sleep until something happens or we time out */
1226  WaitLatchOrSocket(MyLatch, wakeEvents,
1227  MyProcPort->sock, sleeptime,
1229  }
1230 
1231  /* reactivate latch so WalSndLoop knows to continue */
1232  SetLatch(MyLatch);
1233 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
struct Port * MyProcPort
Definition: globals.c:41
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:42
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
pgsocket sock
Definition: libpq-be.h:118
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
void SyncRepInitConfig(void)
Definition: syncrep.c:384
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:228
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3337
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2006
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2048
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static StringInfoData tmpbuf
Definition: walsender.c:162
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:66
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1573
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ XLogRead()

static void XLogRead ( char *  buf,
XLogRecPtr  startptr,
Size  count 
)
static

Definition at line 2301 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), buf, CheckXLogRemoved(), close, curFileTimeLine, ereport, errcode_for_file_access(), errmsg(), ERROR, MAXPGPATH, WalSnd::mutex, MyWalSnd, WalSnd::needreload, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sendFile, sendOff, sendSegNo, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, WAIT_EVENT_WAL_READ, wal_segment_size, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegmentOffset.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2302 {
2303  char *p;
2304  XLogRecPtr recptr;
2305  Size nbytes;
2306  XLogSegNo segno;
2307 
2308 retry:
2309  p = buf;
2310  recptr = startptr;
2311  nbytes = count;
2312 
2313  while (nbytes > 0)
2314  {
2315  uint32 startoff;
2316  int segbytes;
2317  int readbytes;
2318 
2319  startoff = XLogSegmentOffset(recptr, wal_segment_size);
2320 
2321  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
2322  {
2323  char path[MAXPGPATH];
2324 
2325  /* Switch to another logfile segment */
2326  if (sendFile >= 0)
2327  close(sendFile);
2328 
2330 
2331  /*-------
2332  * When reading from a historic timeline, and there is a timeline
2333  * switch within this segment, read from the WAL segment belonging
2334  * to the new timeline.
2335  *
2336  * For example, imagine that this server is currently on timeline
2337  * 5, and we're streaming timeline 4. The switch from timeline 4
2338  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2339  *
2340  * ...
2341  * 000000040000000000000012
2342  * 000000040000000000000013
2343  * 000000050000000000000013
2344  * 000000050000000000000014
2345  * ...
2346  *
2347  * In this situation, when requested to send the WAL from
2348  * segment 0x13, on timeline 4, we read the WAL from file
2349  * 000000050000000000000013. Archive recovery prefers files from
2350  * newer timelines, so if the segment was restored from the
2351  * archive on this server, the file belonging to the old timeline,
2352  * 000000040000000000000013, might not exist. Their contents are
2353  * equal up to the switchpoint, because at a timeline switch, the
2354  * used portion of the old segment is copied to the new file.
2355  *-------
2356  */
2359  {
2360  XLogSegNo endSegNo;
2361 
2363  if (sendSegNo == endSegNo)
2365  }
2366 
2368 
2369  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2370  if (sendFile < 0)
2371  {
2372  /*
2373  * If the file is not found, assume it's because the standby
2374  * asked for a too old WAL segment that has already been
2375  * removed or recycled.
2376  */
2377  if (errno == ENOENT)
2378  ereport(ERROR,
2380  errmsg("requested WAL segment %s has already been removed",
2382  else
2383  ereport(ERROR,
2385  errmsg("could not open file \"%s\": %m",
2386  path)));
2387  }
2388  sendOff = 0;
2389  }
2390 
2391  /* Need to seek in the file? */
2392  if (sendOff != startoff)
2393  {
2394  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2395  ereport(ERROR,
2397  errmsg("could not seek in log segment %s to offset %u: %m",
2399  startoff)));
2400  sendOff = startoff;
2401  }
2402 
2403  /* How many bytes are within this segment? */
2404  if (nbytes > (wal_segment_size - startoff))
2405  segbytes = wal_segment_size - startoff;
2406  else
2407  segbytes = nbytes;
2408 
2410  readbytes = read(sendFile, p, segbytes);
2412  if (readbytes <= 0)
2413  {
2414  ereport(ERROR,
2416  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2418  sendOff, (unsigned long) segbytes)));
2419  }
2420 
2421  /* Update state for read */
2422  recptr += readbytes;
2423 
2424  sendOff += readbytes;
2425  nbytes -= readbytes;
2426  p += readbytes;
2427  }
2428 
2429  /*
2430  * After reading into the buffer, check that what we read was valid. We do
2431  * this after reading, because even though the segment was present when we
2432  * opened it, it might get recycled or removed while we read it. The
2433  * read() succeeds in that case, but the data we tried to read might
2434  * already have been overwritten with new WAL records.
2435  */
2436  XLByteToSeg(startptr, segno, wal_segment_size);
2438 
2439  /*
2440  * During recovery, the currently-open WAL file might be replaced with the
2441  * file of the same name retrieved from archive. So we always need to
2442  * check what we read was valid after reading into the buffer. If it's
2443  * invalid, we try to open and read the file again.
2444  */
2446  {
2447  WalSnd *walsnd = MyWalSnd;
2448  bool reload;
2449 
2450  SpinLockAcquire(&walsnd->mutex);
2451  reload = walsnd->needreload;
2452  walsnd->needreload = false;
2453  SpinLockRelease(&walsnd->mutex);
2454 
2455  if (reload && sendFile >= 0)
2456  {
2457  close(sendFile);
2458  sendFile = -1;
2459 
2460  goto retry;
2461  }
2462  }
2463 }
int wal_segment_size
Definition: xlog.c:113
static int sendFile
Definition: walsender.c:135
#define PG_BINARY
Definition: c.h:1025
slock_t mutex
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3774
#define MAXPGPATH
static TimeLineID curFileTimeLine
Definition: walsender.c:140
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10161
static char * buf
Definition: pg_test_fsync.c:67
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:296
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1244
#define ereport(elevel, rest)
Definition: elog.h:122
#define SpinLockRelease(lock)
Definition: spin.h:64
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeLineID ThisTimeLineID
Definition: xlog.c:181
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool needreload
size_t Size
Definition: c.h:404
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1220
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
static XLogSegNo sendSegNo
Definition: walsender.c:136
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:938
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 sendOff
Definition: walsender.c:137
#define close(a)
Definition: win32.h:12
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
bool am_cascading_walsender
Definition: walsender.c:115
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2739 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2740 {
2741  XLogRecord *record;
2742  char *errm;
2743 
2744  /*
2745  * Don't know whether we've caught up yet. We'll set it to true in
2746  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2747  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2748  * i.e. when we're shutting down.
2749  */
2750  WalSndCaughtUp = false;
2751 
2754 
2755  /* xlog record was invalid */
2756  if (errm != NULL)
2757  elog(ERROR, "%s", errm);
2758 
2759  if (record != NULL)
2760  {
2761  /*
2762  * Note the lack of any call to LagTrackerWrite() which is handled by
2763  * WalSndUpdateProgress which is called by output plugin through
2764  * logical decoding write api.
2765  */
2767 
2769  }
2770  else
2771  {
2772  /*
2773  * If the record we just wanted read is at or beyond the flushed
2774  * point, then we're caught up.
2775  */
2777  {
2778  WalSndCaughtUp = true;
2779 
2780  /*
2781  * Have WalSndLoop() terminate the connection in an orderly
2782  * manner, after writing out all the pending data.
2783  */
2784  if (got_STOPPING)
2785  got_SIGUSR2 = true;
2786  }
2787  }
2788 
2789  /* Update shared memory status */
2790  {
2791  WalSnd *walsnd = MyWalSnd;
2792 
2793  SpinLockAcquire(&walsnd->mutex);
2794  walsnd->sentPtr = sentPtr;
2795  SpinLockRelease(&walsnd->mutex);
2796  }
2797 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8254
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:195
XLogRecPtr EndRecPtr
Definition: xlogreader.h:120
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
static XLogRecPtr logical_startptr
Definition: walsender.c:198
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static bool WalSndCaughtUp
Definition: walsender.c:183
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
XLogReaderState * reader
Definition: logical.h:44
#define elog
Definition: elog.h:219

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 2476 of file walsender.c.

References am_cascading_walsender, Assert, close, StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendFile, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, and XLogRead().

Referenced by StartReplication().

2477 {
2478  XLogRecPtr SendRqstPtr;
2479  XLogRecPtr startptr;
2480  XLogRecPtr endptr;
2481  Size nbytes;
2482 
2483  /* If requested switch the WAL sender to the stopping state. */
2484  if (got_STOPPING)
2486 
2488  {
2489  WalSndCaughtUp = true;
2490  return;
2491  }
2492 
2493  /* Figure out how far we can safely send the WAL. */
2495  {
2496  /*
2497  * Streaming an old timeline that's in this server's history, but is
2498  * not the one we're currently inserting or replaying. It can be
2499  * streamed up to the point where we switched off that timeline.
2500  */
2501  SendRqstPtr = sendTimeLineValidUpto;
2502  }
2503  else if (am_cascading_walsender)
2504  {
2505  /*
2506  * Streaming the latest timeline on a standby.
2507  *
2508  * Attempt to send all WAL that has already been replayed, so that we
2509  * know it's valid. If we're receiving WAL through streaming
2510  * replication, it's also OK to send any WAL that has been received
2511  * but not replayed.
2512  *
2513  * The timeline we're recovering from can change, or we can be
2514  * promoted. In either case, the current timeline becomes historic. We
2515  * need to detect that so that we don't try to stream past the point
2516  * where we switched to another timeline. We check for promotion or
2517  * timeline switch after calculating FlushPtr, to avoid a race
2518  * condition: if the timeline becomes historic just after we checked
2519  * that it was still current, it's still be OK to stream it up to the
2520  * FlushPtr that was calculated before it became historic.
2521  */
2522  bool becameHistoric = false;
2523 
2524  SendRqstPtr = GetStandbyFlushRecPtr();
2525 
2526  if (!RecoveryInProgress())
2527  {
2528  /*
2529  * We have been promoted. RecoveryInProgress() updated
2530  * ThisTimeLineID to the new current timeline.
2531  */
2532  am_cascading_walsender = false;
2533  becameHistoric = true;
2534  }
2535  else
2536  {
2537  /*
2538  * Still a cascading standby. But is the timeline we're sending
2539  * still the one recovery is recovering from? ThisTimeLineID was
2540  * updated by the GetStandbyFlushRecPtr() call above.
2541  */
2543  becameHistoric = true;
2544  }
2545 
2546  if (becameHistoric)
2547  {
2548  /*
2549  * The timeline we were sending has become historic. Read the
2550  * timeline history file of the new timeline to see where exactly
2551  * we forked off from the timeline we were sending.
2552  */
2553  List *history;
2554 
2557 
2559  list_free_deep(history);
2560 
2561  sendTimeLineIsHistoric = true;
2562 
2563  SendRqstPtr = sendTimeLineValidUpto;
2564  }
2565  }
2566  else
2567  {
2568  /*
2569  * Streaming the current timeline on a master.
2570  *
2571  * Attempt to send all data that's already been written out and
2572  * fsync'd to disk. We cannot go further than what's been written out
2573  * given the current implementation of XLogRead(). And in any case
2574  * it's unsafe to send WAL that is not securely down to disk on the
2575  * master: if the master subsequently crashes and restarts, standbys
2576  * must not have applied any WAL that got lost on the master.
2577  */
2578  SendRqstPtr = GetFlushRecPtr();
2579  }
2580 
2581  /*
2582  * Record the current system time as an approximation of the time at which
2583  * this WAL location was written for the purposes of lag tracking.
2584  *
2585  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2586  * is flushed and we could get that time as well as the LSN when we call
2587  * GetFlushRecPtr() above (and likewise for the cascading standby
2588  * equivalent), but rather than putting any new code into the hot WAL path
2589  * it seems good enough to capture the time here. We should reach this
2590  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2591  * may take some time, we read the WAL flush pointer and take the time
2592  * very close to together here so that we'll get a later position if it is
2593  * still moving.
2594  *
2595  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2596  * this gives us a cheap approximation for the WAL flush time for this
2597  * LSN.
2598  *
2599  * Note that the LSN is not necessarily the LSN for the data contained in
2600  * the present message; it's the end of the WAL, which might be further
2601  * ahead. All the lag tracking machinery cares about is finding out when
2602  * that arbitrary LSN is eventually reported as written, flushed and
2603  * applied, so that it can measure the elapsed time.
2604  */
2605  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2606 
2607  /*
2608  * If this is a historic timeline and we've reached the point where we
2609  * forked to the next timeline, stop streaming.
2610  *
2611  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2612  * startup process will normally replay all WAL that has been received
2613  * from the master, before promoting, but if the WAL streaming is
2614  * terminated at a WAL page boundary, the valid portion of the timeline
2615  * might end in the middle of a WAL record. We might've already sent the
2616  * first half of that partial WAL record to the cascading standby, so that
2617  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2618  * replay the partial WAL record either, so it can still follow our
2619  * timeline switch.
2620  */
2622  {
2623  /* close the current file. */
2624  if (sendFile >= 0)
2625  close(sendFile);
2626  sendFile = -1;
2627 
2628  /* Send CopyDone */
2629  pq_putmessage_noblock('c', NULL, 0);
2630  streamingDoneSending = true;
2631 
2632  WalSndCaughtUp = true;
2633 
2634  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2636  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2637  return;
2638  }
2639 
2640  /* Do we have any work to do? */
2641  Assert(sentPtr <= SendRqstPtr);
2642  if (SendRqstPtr <= sentPtr)
2643  {
2644  WalSndCaughtUp = true;
2645  return;
2646  }
2647 
2648  /*
2649  * Figure out how much to send in one message. If there's no more than
2650  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2651  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2652  *
2653  * The rounding is not only for performance reasons. Walreceiver relies on
2654  * the fact that we never split a WAL record across two messages. Since a
2655  * long WAL record is split at page boundary into continuation records,
2656  * page boundary is always a safe cut-off point. We also assume that
2657  * SendRqstPtr never points to the middle of a WAL record.
2658  */
2659  startptr = sentPtr;
2660  endptr = startptr;
2661  endptr += MAX_SEND_SIZE;
2662 
2663  /* if we went beyond SendRqstPtr, back off */
2664  if (SendRqstPtr <= endptr)
2665  {