PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void XLogRead (char *buf, XLogRecPtr startptr, Size count)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static int sendFile = -1
 
static XLogSegNo sendSegNo = 0
 
static uint32 sendOff = 0
 
static TimeLineID curFileTimeLine = 0
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 211 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 105 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 229 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 848 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

849 {
850  const char *snapshot_name = NULL;
851  char xloc[MAXFNAMELEN];
852  char *slot_name;
853  bool reserve_wal = false;
854  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
856  TupOutputState *tstate;
857  TupleDesc tupdesc;
858  Datum values[4];
859  bool nulls[4];
860 
862 
863  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
864 
865  /* setup state for XLogReadPage */
866  sendTimeLineIsHistoric = false;
868 
869  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
870  {
871  ReplicationSlotCreate(cmd->slotname, false,
873  }
874  else
875  {
877 
878  /*
879  * Initially create persistent slot as ephemeral - that allows us to
880  * nicely handle errors during initialization because it'll get
881  * dropped if this transaction fails. We'll make it persistent at the
882  * end. Temporary slots can be created as temporary from beginning as
883  * they get dropped on error as well.
884  */
885  ReplicationSlotCreate(cmd->slotname, true,
887  }
888 
889  if (cmd->kind == REPLICATION_KIND_LOGICAL)
890  {
892  bool need_full_snapshot = false;
893 
894  /*
895  * Do options check early so that we can bail before calling the
896  * DecodingContextFindStartpoint which can take long time.
897  */
898  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
899  {
900  if (IsTransactionBlock())
901  ereport(ERROR,
902  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
903  "must not be called inside a transaction")));
904 
905  need_full_snapshot = true;
906  }
907  else if (snapshot_action == CRS_USE_SNAPSHOT)
908  {
909  if (!IsTransactionBlock())
910  ereport(ERROR,
911  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
912  "must be called inside a transaction")));
913 
915  ereport(ERROR,
916  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
917  "must be called in REPEATABLE READ isolation mode transaction")));
918 
919  if (FirstSnapshotSet)
920  ereport(ERROR,
921  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
922  "must be called before any query")));
923 
924  if (IsSubTransaction())
925  ereport(ERROR,
926  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
927  "must not be called in a subtransaction")));
928 
929  need_full_snapshot = true;
930  }
931 
932  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
936 
937  /*
938  * Signal that we don't need the timeout mechanism. We're just
939  * creating the replication slot and don't yet accept feedback
940  * messages or send keepalives. As we possibly need to wait for
941  * further WAL the walsender would otherwise possibly be killed too
942  * soon.
943  */
945 
946  /* build initial snapshot, might take a while */
948 
949  /*
950  * Export or use the snapshot if we've been asked to do so.
951  *
952  * NB. We will convert the snapbuild.c kind of snapshot to normal
953  * snapshot when doing this.
954  */
955  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
956  {
957  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
958  }
959  else if (snapshot_action == CRS_USE_SNAPSHOT)
960  {
961  Snapshot snap;
962 
965  }
966 
967  /* don't need the decoding context anymore */
968  FreeDecodingContext(ctx);
969 
970  if (!cmd->temporary)
972  }
973  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
974  {
976 
978 
979  /* Write this slot to disk if it's a permanent one. */
980  if (!cmd->temporary)
982  }
983 
984  snprintf(xloc, sizeof(xloc), "%X/%X",
987 
989  MemSet(nulls, false, sizeof(nulls));
990 
991  /*----------
992  * Need a tuple descriptor representing four columns:
993  * - first field: the slot name
994  * - second field: LSN at which we became consistent
995  * - third field: exported snapshot's name
996  * - fourth field: output plugin
997  *----------
998  */
999  tupdesc = CreateTemplateTupleDesc(4);
1000  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1001  TEXTOID, -1, 0);
1002  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1003  TEXTOID, -1, 0);
1004  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1005  TEXTOID, -1, 0);
1006  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1007  TEXTOID, -1, 0);
1008 
1009  /* prepare for projection of tuples */
1010  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1011 
1012  /* slot_name */
1013  slot_name = NameStr(MyReplicationSlot->data.name);
1014  values[0] = CStringGetTextDatum(slot_name);
1015 
1016  /* consistent wal location */
1017  values[1] = CStringGetTextDatum(xloc);
1018 
1019  /* snapshot name, or NULL if none */
1020  if (snapshot_name != NULL)
1021  values[2] = CStringGetTextDatum(snapshot_name);
1022  else
1023  nulls[2] = true;
1024 
1025  /* plugin, or NULL if none */
1026  if (cmd->plugin != NULL)
1027  values[3] = CStringGetTextDatum(cmd->plugin);
1028  else
1029  nulls[3] = true;
1030 
1031  /* send it to dest */
1032  do_tup_output(tstate, values, nulls);
1033  end_tup_output(tstate);
1034 
1036 }
#define NIL
Definition: pg_list.h:69
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:795
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:37
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2191
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:80
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2136
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define MemSet(start, val, len)
Definition: c.h:941
void ReplicationSlotSave(void)
Definition: slot.c:645
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:228
ReplicationSlotPersistentData data
Definition: slot.h:133
XLogRecPtr confirmed_flush
Definition: slot.h:81
ReplicationKind kind
Definition: replnodes.h:56
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:761
bool IsTransactionBlock(void)
Definition: xact.c:4457
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:635
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:455
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2194
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:668
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:203
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2116
void ReplicationSlotPersist(void)
Definition: slot.c:680
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1168
unsigned int uint32
Definition: c.h:358
void ReplicationSlotRelease(void)
Definition: slot.c:424
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1141
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
TimeLineID ThisTimeLineID
Definition: xlog.c:183
struct SnapBuild * snapshot_builder
Definition: logical.h:46
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:148
#define Assert(condition)
Definition: c.h:732
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:501
int XactIsoLevel
Definition: xact.c:74
bool IsSubTransaction(void)
Definition: xact.c:4530
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:545
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:784
CRSSnapshotAction
Definition: walsender.h:22
#define NameStr(name)
Definition: c.h:609
#define CStringGetTextDatum(s)
Definition: builtins.h:83
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:78
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:171
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1257

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1042 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1043 {
1044  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1045  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1046 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1422 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, generate_unaccent_rules::dest, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), WalSnd::state, STATE_IDLE, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1423 {
1424  int parse_rc;
1425  Node *cmd_node;
1426  MemoryContext cmd_context;
1427  MemoryContext old_context;
1428 
1429  /*
1430  * If WAL sender has been told that shutdown is getting close, switch its
1431  * status accordingly to handle the next replication commands correctly.
1432  */
1433  if (got_STOPPING)
1435 
1436  /*
1437  * Throw error if in stopping mode. We need prevent commands that could
1438  * generate WAL while the shutdown checkpoint is being written. To be
1439  * safe, we just prohibit all new commands.
1440  */
1442  ereport(ERROR,
1443  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1444 
1445  /*
1446  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1447  * command arrives. Clean up the old stuff if there's anything.
1448  */
1450 
1452 
1454  "Replication command context",
1456  old_context = MemoryContextSwitchTo(cmd_context);
1457 
1458  replication_scanner_init(cmd_string);
1459  parse_rc = replication_yyparse();
1460  if (parse_rc != 0)
1461  ereport(ERROR,
1462  (errcode(ERRCODE_SYNTAX_ERROR),
1463  (errmsg_internal("replication command parser returned %d",
1464  parse_rc))));
1465 
1466  cmd_node = replication_parse_result;
1467 
1468  /*
1469  * Log replication command if log_replication_commands is enabled. Even
1470  * when it's disabled, log the command with DEBUG1 level for backward
1471  * compatibility. Note that SQL commands are not logged here, and will be
1472  * logged later if log_statement is enabled.
1473  */
1474  if (cmd_node->type != T_SQLCmd)
1476  (errmsg("received replication command: %s", cmd_string)));
1477 
1478  /*
1479  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1480  * called outside of transaction the snapshot should be cleared here.
1481  */
1482  if (!IsTransactionBlock())
1484 
1485  /*
1486  * For aborted transactions, don't allow anything except pure SQL, the
1487  * exec_simple_query() will handle it correctly.
1488  */
1489  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1490  ereport(ERROR,
1491  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1492  errmsg("current transaction is aborted, "
1493  "commands ignored until end of transaction block")));
1494 
1496 
1497  /*
1498  * Allocate buffers that will be used for each outgoing and incoming
1499  * message. We do this just once per command to reduce palloc overhead.
1500  */
1504 
1505  /* Report to pgstat that this process is running */
1507 
1508  switch (cmd_node->type)
1509  {
1510  case T_IdentifySystemCmd:
1511  IdentifySystem();
1512  break;
1513 
1514  case T_BaseBackupCmd:
1515  PreventInTransactionBlock(true, "BASE_BACKUP");
1516  SendBaseBackup((BaseBackupCmd *) cmd_node);
1517  break;
1518 
1521  break;
1522 
1525  break;
1526 
1527  case T_StartReplicationCmd:
1528  {
1529  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1530 
1531  PreventInTransactionBlock(true, "START_REPLICATION");
1532 
1533  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1534  StartReplication(cmd);
1535  else
1537  break;
1538  }
1539 
1540  case T_TimeLineHistoryCmd:
1541  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1543  break;
1544 
1545  case T_VariableShowStmt:
1546  {
1548  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1549 
1550  GetPGVariable(n->name, dest);
1551  }
1552  break;
1553 
1554  case T_SQLCmd:
1555  if (MyDatabaseId == InvalidOid)
1556  ereport(ERROR,
1557  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1558 
1559  /* Report to pgstat that this process is now idle */
1561 
1562  /* Tell the caller that this wasn't a WalSender command. */
1563  return false;
1564 
1565  default:
1566  elog(ERROR, "unrecognized replication command node tag: %u",
1567  cmd_node->type);
1568  }
1569 
1570  /* done */
1571  MemoryContextSwitchTo(old_context);
1572  MemoryContextDelete(cmd_context);
1573 
1574  /* Send CommandComplete message */
1575  EndCommand("SELECT", DestRemote);
1576 
1577  /* Report to pgstat that this process is now idle */
1579 
1580  return true;
1581 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:568
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:169
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2990
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:434
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1042
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:352
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:517
static StringInfoData output_message
Definition: walsender.c:160
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8446
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4457
ReplicationKind kind
Definition: replnodes.h:82
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:758
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:191
WalSndState state
NodeTag type
Definition: nodes.h:519
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:536
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3188
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define ereport(elevel, rest)
Definition: elog.h:141
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static StringInfoData reply_message
Definition: walsender.c:161
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1053
Oid MyDatabaseId
Definition: globals.c:85
WalSnd * MyWalSnd
Definition: walsender.c:111
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:814
void WalSndSetState(WalSndState state)
Definition: walsender.c:3164
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:848
static StringInfoData tmpbuf
Definition: walsender.c:162
bool log_replication_commands
Definition: walsender.c:124
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void IdentifySystem(void)
Definition: walsender.c:345
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:696
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2920 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2921 {
2922  XLogRecPtr replayPtr;
2923  TimeLineID replayTLI;
2924  XLogRecPtr receivePtr;
2926  XLogRecPtr result;
2927 
2928  /*
2929  * We can safely send what's already been replayed. Also, if walreceiver
2930  * is streaming WAL from the same timeline, we can send anything that it
2931  * has streamed, but hasn't been replayed yet.
2932  */
2933 
2934  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2935  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2936 
2937  ThisTimeLineID = replayTLI;
2938 
2939  result = replayPtr;
2940  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2941  result = receivePtr;
2942 
2943  return result;
2944 }
uint32 TimeLineID
Definition: xlogdefs.h:45
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11163
static TimeLineID receiveTLI
Definition: xlog.c:205
TimeLineID ThisTimeLineID
Definition: xlog.c:183
uint64 XLogRecPtr
Definition: xlogdefs.h:21

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 2973 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

2974 {
2976 
2977  /*
2978  * If replication has not yet started, die like with SIGTERM. If
2979  * replication is active, only set a flag and wake up the main loop. It
2980  * will send any outstanding WAL, wait for it to be replicated to the
2981  * standby, and then exit gracefully.
2982  */
2983  if (!replication_active)
2984  kill(MyProcPid, SIGTERM);
2985  else
2986  got_STOPPING = true;
2987 }
int MyProcPid
Definition: globals.c:40
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
#define kill(pid, sig)
Definition: win32_port.h:435
bool am_walsender
Definition: walsender.c:114
static volatile sig_atomic_t replication_active
Definition: walsender.c:198
#define Assert(condition)
Definition: c.h:732

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 345 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

346 {
347  char sysid[32];
348  char xloc[MAXFNAMELEN];
349  XLogRecPtr logptr;
350  char *dbname = NULL;
352  TupOutputState *tstate;
353  TupleDesc tupdesc;
354  Datum values[4];
355  bool nulls[4];
356 
357  /*
358  * Reply with a result set with one row, four columns. First col is system
359  * ID, second is timeline ID, third is current xlog location and the
360  * fourth contains the database name if we are connected to one.
361  */
362 
363  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
365 
368  {
369  /* this also updates ThisTimeLineID */
370  logptr = GetStandbyFlushRecPtr();
371  }
372  else
373  logptr = GetFlushRecPtr();
374 
375  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
376 
377  if (MyDatabaseId != InvalidOid)
378  {
380 
381  /* syscache access needs a transaction env. */
383  /* make dbname live outside TX context */
387  /* CommitTransactionCommand switches to TopMemoryContext */
389  }
390 
392  MemSet(nulls, false, sizeof(nulls));
393 
394  /* need a tuple descriptor representing four columns */
395  tupdesc = CreateTemplateTupleDesc(4);
396  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
397  TEXTOID, -1, 0);
398  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
399  INT4OID, -1, 0);
400  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
401  TEXTOID, -1, 0);
402  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
403  TEXTOID, -1, 0);
404 
405  /* prepare for projection of tuples */
406  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
407 
408  /* column 1: system identifier */
409  values[0] = CStringGetTextDatum(sysid);
410 
411  /* column 2: timeline */
412  values[1] = Int32GetDatum(ThisTimeLineID);
413 
414  /* column 3: wal location */
415  values[2] = CStringGetTextDatum(xloc);
416 
417  /* column 4: database name, or NULL if none */
418  if (dbname)
419  values[3] = CStringGetTextDatum(dbname);
420  else
421  nulls[3] = true;
422 
423  /* send it to dest */
424  do_tup_output(tstate, values, nulls);
425 
426  end_tup_output(tstate);
427 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void CommitTransactionCommand(void)
Definition: xact.c:2779
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:80
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2136
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:941
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8226
bool RecoveryInProgress(void)
Definition: xlog.c:7894
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2194
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:668
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2116
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2058
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
unsigned int uint32
Definition: c.h:358
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:183
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2708
char * dbname
Definition: streamutil.c:51
static Datum values[MAXATTR]
Definition: bootstrap.c:167
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4812
#define Int32GetDatum(X)
Definition: postgres.h:464
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2920
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:401
bool am_cascading_walsender
Definition: walsender.c:115

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2264 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, ereport, errcode(), errmsg(), FATAL, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2265 {
2266  int i;
2267 
2268  /*
2269  * WalSndCtl should be set up already (we inherit this by fork() or
2270  * EXEC_BACKEND mechanism from the postmaster).
2271  */
2272  Assert(WalSndCtl != NULL);
2273  Assert(MyWalSnd == NULL);
2274 
2275  /*
2276  * Find a free walsender slot and reserve it. If this fails, we must be
2277  * out of WalSnd structures.
2278  */
2279  for (i = 0; i < max_wal_senders; i++)
2280  {
2281  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2282 
2283  SpinLockAcquire(&walsnd->mutex);
2284 
2285  if (walsnd->pid != 0)
2286  {
2287  SpinLockRelease(&walsnd->mutex);
2288  continue;
2289  }
2290  else
2291  {
2292  /*
2293  * Found a free slot. Reserve it for us.
2294  */
2295  walsnd->pid = MyProcPid;
2296  walsnd->sentPtr = InvalidXLogRecPtr;
2297  walsnd->write = InvalidXLogRecPtr;
2298  walsnd->flush = InvalidXLogRecPtr;
2299  walsnd->apply = InvalidXLogRecPtr;
2300  walsnd->writeLag = -1;
2301  walsnd->flushLag = -1;
2302  walsnd->applyLag = -1;
2303  walsnd->state = WALSNDSTATE_STARTUP;
2304  walsnd->latch = &MyProc->procLatch;
2305  walsnd->replyTime = 0;
2306  SpinLockRelease(&walsnd->mutex);
2307  /* don't need the lock anymore */
2308  MyWalSnd = (WalSnd *) walsnd;
2309 
2310  break;
2311  }
2312  }
2313  if (MyWalSnd == NULL)
2314  ereport(FATAL,
2315  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2316  errmsg("number of requested standby connections "
2317  "exceeds max_wal_senders (currently %d)",
2318  max_wal_senders)));
2319 
2320  /* Arrange to clean up at walsender exit */
2322 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:40
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
int errcode(int sqlerrcode)
Definition: elog.c:570
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2326
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define FATAL
Definition: elog.h:52
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:141
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:732
int errmsg(const char *fmt,...)
Definition: elog.c:784
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3516 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3517 {
3518  TimestampTz time = 0;
3519 
3520  /* Read all unread samples up to this LSN or end of buffer. */
3521  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3522  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3523  {
3524  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3525  lag_tracker->last_read[head] =
3527  lag_tracker->read_heads[head] =
3529  }
3530 
3531  /*
3532  * If the lag tracker is empty, that means the standby has processed
3533  * everything we've ever sent so we should now clear 'last_read'. If we
3534  * didn't do that, we'd risk using a stale and irrelevant sample for
3535  * interpolation at the beginning of the next burst of WAL after a period
3536  * of idleness.
3537  */
3539  lag_tracker->last_read[head].time = 0;
3540 
3541  if (time > now)
3542  {
3543  /* If the clock somehow went backwards, treat as not found. */
3544  return -1;
3545  }
3546  else if (time == 0)
3547  {
3548  /*
3549  * We didn't cross a time. If there is a future sample that we
3550  * haven't reached yet, and we've already reached at least one sample,
3551  * let's interpolate the local flushed time. This is mainly useful
3552  * for reporting a completely stuck apply position as having
3553  * increasing lag, since otherwise we'd have to wait for it to
3554  * eventually start moving again and cross one of our samples before
3555  * we can show the lag increasing.
3556  */
3558  {
3559  /* There are no future samples, so we can't interpolate. */
3560  return -1;
3561  }
3562  else if (lag_tracker->last_read[head].time != 0)
3563  {
3564  /* We can interpolate between last_read and the next sample. */
3565  double fraction;
3566  WalTimeSample prev = lag_tracker->last_read[head];
3568 
3569  if (lsn < prev.lsn)
3570  {
3571  /*
3572  * Reported LSNs shouldn't normally go backwards, but it's
3573  * possible when there is a timeline change. Treat as not
3574  * found.
3575  */
3576  return -1;
3577  }
3578 
3579  Assert(prev.lsn < next.lsn);
3580 
3581  if (prev.time > next.time)
3582  {
3583  /* If the clock somehow went backwards, treat as not found. */
3584  return -1;
3585  }
3586 
3587  /* See how far we are between the previous and next samples. */
3588  fraction =
3589  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3590 
3591  /* Scale the local flush time proportionally. */
3592  time = (TimestampTz)
3593  ((double) prev.time + (next.time - prev.time) * fraction);
3594  }
3595  else
3596  {
3597  /*
3598  * We have only a future sample, implying that we were entirely
3599  * caught up but and now there is a new burst of WAL and the
3600  * standby hasn't processed the first sample yet. Until the
3601  * standby reaches the future sample the best we can do is report
3602  * the hypothetical lag if that sample were to be replayed now.
3603  */
3604  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3605  }
3606  }
3607 
3608  /* Return the elapsed time since local flush time in microseconds. */
3609  Assert(time != 0);
3610  return now - time;
3611 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:217
static int32 next
Definition: blutils.c:211
int write_head
Definition: walsender.c:218
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:220
TimestampTz time
Definition: walsender.c:207
static LagTracker * lag_tracker
Definition: walsender.c:223
XLogRecPtr lsn
Definition: walsender.c:206
#define Assert(condition)
Definition: c.h:732
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:219
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:211
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3451 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3452 {
3453  bool buffer_full;
3454  int new_write_head;
3455  int i;
3456 
3457  if (!am_walsender)
3458  return;
3459 
3460  /*
3461  * If the lsn hasn't advanced since last time, then do nothing. This way
3462  * we only record a new sample when new WAL has been written.
3463  */
3464  if (lag_tracker->last_lsn == lsn)
3465  return;
3466  lag_tracker->last_lsn = lsn;
3467 
3468  /*
3469  * If advancing the write head of the circular buffer would crash into any
3470  * of the read heads, then the buffer is full. In other words, the
3471  * slowest reader (presumably apply) is the one that controls the release
3472  * of space.
3473  */
3474  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3475  buffer_full = false;
3476  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3477  {
3478  if (new_write_head == lag_tracker->read_heads[i])
3479  buffer_full = true;
3480  }
3481 
3482  /*
3483  * If the buffer is full, for now we just rewind by one slot and overwrite
3484  * the last sample, as a simple (if somewhat uneven) way to lower the
3485  * sampling rate. There may be better adaptive compaction algorithms.
3486  */
3487  if (buffer_full)
3488  {
3489  new_write_head = lag_tracker->write_head;
3490  if (lag_tracker->write_head > 0)
3492  else
3494  }
3495 
3496  /* Store a sample at the current write head position. */
3498  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3499  lag_tracker->write_head = new_write_head;
3500 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:217
int write_head
Definition: walsender.c:218
TimestampTz time
Definition: walsender.c:207
static LagTracker * lag_tracker
Definition: walsender.c:223
bool am_walsender
Definition: walsender.c:114
XLogRecPtr lsn
Definition: walsender.c:206
XLogRecPtr last_lsn
Definition: walsender.c:216
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:219
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:211
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page,
TimeLineID pageTLI 
)
static

Definition at line 761 of file walsender.c.

References XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WalSndWaitForWal(), XLogRead(), and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

763 {
764  XLogRecPtr flushptr;
765  int count;
766 
767  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
769  sendTimeLine = state->currTLI;
771  sendTimeLineNextTLI = state->nextTLI;
772 
773  /* make sure we have enough WAL available */
774  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
775 
776  /* fail if not (implies we are going to shut down) */
777  if (flushptr < targetPagePtr + reqLen)
778  return -1;
779 
780  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
781  count = XLOG_BLCKSZ; /* more than one block available */
782  else
783  count = flushptr - targetPagePtr; /* part of the page available */
784 
785  /* now actually read the data, we know it's there */
786  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
787 
788  return count;
789 }
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:803
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:180
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2354
TimeLineID nextTLI
Definition: xlogreader.h:186
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1283
TimeLineID ThisTimeLineID
Definition: xlog.c:183
TimeLineID currTLI
Definition: xlogreader.h:170
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
static bool sendTimeLineIsHistoric
Definition: walsender.c:150

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3202 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3203 {
3204  Interval *result = palloc(sizeof(Interval));
3205 
3206  result->month = 0;
3207  result->day = 0;
3208  result->time = offset;
3209 
3210  return result;
3211 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:924

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 795 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

798 {
799  ListCell *lc;
800  bool snapshot_action_given = false;
801  bool reserve_wal_given = false;
802 
803  /* Parse options */
804  foreach(lc, cmd->options)
805  {
806  DefElem *defel = (DefElem *) lfirst(lc);
807 
808  if (strcmp(defel->defname, "export_snapshot") == 0)
809  {
810  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
811  ereport(ERROR,
812  (errcode(ERRCODE_SYNTAX_ERROR),
813  errmsg("conflicting or redundant options")));
814 
815  snapshot_action_given = true;
816  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
818  }
819  else if (strcmp(defel->defname, "use_snapshot") == 0)
820  {
821  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
822  ereport(ERROR,
823  (errcode(ERRCODE_SYNTAX_ERROR),
824  errmsg("conflicting or redundant options")));
825 
826  snapshot_action_given = true;
827  *snapshot_action = CRS_USE_SNAPSHOT;
828  }
829  else if (strcmp(defel->defname, "reserve_wal") == 0)
830  {
831  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
832  ereport(ERROR,
833  (errcode(ERRCODE_SYNTAX_ERROR),
834  errmsg("conflicting or redundant options")));
835 
836  reserve_wal_given = true;
837  *reserve_wal = true;
838  }
839  else
840  elog(ERROR, "unrecognized option: %s", defel->defname);
841  }
842 }
int errcode(int sqlerrcode)
Definition: elog.c:570
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
char * defname
Definition: parsenodes.h:728

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3218 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, IntervalPGetDatum, is_member_of_role(), IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3219 {
3220 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3221  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3222  TupleDesc tupdesc;
3223  Tuplestorestate *tupstore;
3224  MemoryContext per_query_ctx;
3225  MemoryContext oldcontext;
3226  List *sync_standbys;
3227  int i;
3228 
3229  /* check to see if caller supports us returning a tuplestore */
3230  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3231  ereport(ERROR,
3232  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3233  errmsg("set-valued function called in context that cannot accept a set")));
3234  if (!(rsinfo->allowedModes & SFRM_Materialize))
3235  ereport(ERROR,
3236  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3237  errmsg("materialize mode required, but it is not " \
3238  "allowed in this context")));
3239 
3240  /* Build a tuple descriptor for our result type */
3241  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3242  elog(ERROR, "return type must be a row type");
3243 
3244  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3245  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3246 
3247  tupstore = tuplestore_begin_heap(true, false, work_mem);
3248  rsinfo->returnMode = SFRM_Materialize;
3249  rsinfo->setResult = tupstore;
3250  rsinfo->setDesc = tupdesc;
3251 
3252  MemoryContextSwitchTo(oldcontext);
3253 
3254  /*
3255  * Get the currently active synchronous standbys.
3256  */
3257  LWLockAcquire(SyncRepLock, LW_SHARED);
3258  sync_standbys = SyncRepGetSyncStandbys(NULL);
3259  LWLockRelease(SyncRepLock);
3260 
3261  for (i = 0; i < max_wal_senders; i++)
3262  {
3263  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3265  XLogRecPtr write;
3266  XLogRecPtr flush;
3267  XLogRecPtr apply;
3268  TimeOffset writeLag;
3269  TimeOffset flushLag;
3270  TimeOffset applyLag;
3271  int priority;
3272  int pid;
3274  TimestampTz replyTime;
3276  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3277 
3278  SpinLockAcquire(&walsnd->mutex);
3279  if (walsnd->pid == 0)
3280  {
3281  SpinLockRelease(&walsnd->mutex);
3282  continue;
3283  }
3284  pid = walsnd->pid;
3285  sentPtr = walsnd->sentPtr;
3286  state = walsnd->state;
3287  write = walsnd->write;
3288  flush = walsnd->flush;
3289  apply = walsnd->apply;
3290  writeLag = walsnd->writeLag;
3291  flushLag = walsnd->flushLag;
3292  applyLag = walsnd->applyLag;
3293  priority = walsnd->sync_standby_priority;
3294  replyTime = walsnd->replyTime;
3295  SpinLockRelease(&walsnd->mutex);
3296 
3297  memset(nulls, 0, sizeof(nulls));
3298  values[0] = Int32GetDatum(pid);
3299 
3300  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3301  {
3302  /*
3303  * Only superusers and members of pg_read_all_stats can see
3304  * details. Other users only get the pid value to know it's a
3305  * walsender, but no details.
3306  */
3307  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3308  }
3309  else
3310  {
3311  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3312 
3313  if (XLogRecPtrIsInvalid(sentPtr))
3314  nulls[2] = true;
3315  values[2] = LSNGetDatum(sentPtr);
3316 
3317  if (XLogRecPtrIsInvalid(write))
3318  nulls[3] = true;
3319  values[3] = LSNGetDatum(write);
3320 
3321  if (XLogRecPtrIsInvalid(flush))
3322  nulls[4] = true;
3323  values[4] = LSNGetDatum(flush);
3324 
3325  if (XLogRecPtrIsInvalid(apply))
3326  nulls[5] = true;
3327  values[5] = LSNGetDatum(apply);
3328 
3329  /*
3330  * Treat a standby such as a pg_basebackup background process
3331  * which always returns an invalid flush location, as an
3332  * asynchronous standby.
3333  */
3334  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3335 
3336  if (writeLag < 0)
3337  nulls[6] = true;
3338  else
3339  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3340 
3341  if (flushLag < 0)
3342  nulls[7] = true;
3343  else
3344  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3345 
3346  if (applyLag < 0)
3347  nulls[8] = true;
3348  else
3349  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3350 
3351  values[9] = Int32GetDatum(priority);
3352 
3353  /*
3354  * More easily understood version of standby state. This is purely
3355  * informational.
3356  *
3357  * In quorum-based sync replication, the role of each standby
3358  * listed in synchronous_standby_names can be changing very
3359  * frequently. Any standbys considered as "sync" at one moment can
3360  * be switched to "potential" ones at the next moment. So, it's
3361  * basically useless to report "sync" or "potential" as their sync
3362  * states. We report just "quorum" for them.
3363  */
3364  if (priority == 0)
3365  values[10] = CStringGetTextDatum("async");
3366  else if (list_member_int(sync_standbys, i))
3368  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3369  else
3370  values[10] = CStringGetTextDatum("potential");
3371 
3372  if (replyTime == 0)
3373  nulls[11] = true;
3374  else
3375  values[11] = TimestampTzGetDatum(replyTime);
3376  }
3377 
3378  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3379  }
3380 
3381  /* clean up and return the tuplestore */
3382  tuplestore_donestoring(tupstore);
3383 
3384  return (Datum) 0;
3385 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:568
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:380
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:941
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:682
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
bool list_member_int(const List *list, int datum)
Definition: list.c:485
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:141
int max_wal_senders
Definition: walsender.c:120
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3183
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:121
static XLogRecPtr sentPtr
Definition: walsender.c:157
int allowedModes
Definition: execnodes.h:300
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4931
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:302
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:228
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:305
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:298
#define Int32GetDatum(X)
Definition: postgres.h:464
TupleDesc setDesc
Definition: execnodes.h:306
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:83
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3202
XLogRecPtr apply
Definition: pg_list.h:45

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1723 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1724 {
1725  bool changed = false;
1727 
1728  Assert(lsn != InvalidXLogRecPtr);
1729  SpinLockAcquire(&slot->mutex);
1730  if (slot->data.restart_lsn != lsn)
1731  {
1732  changed = true;
1733  slot->data.restart_lsn = lsn;
1734  }
1735  SpinLockRelease(&slot->mutex);
1736 
1737  if (changed)
1738  {
1741  }
1742 
1743  /*
1744  * One could argue that the slot should be saved to disk now, but that'd
1745  * be energy wasted - the worst lost information can do here is give us
1746  * wrong information in a statistics view - we'll just potentially be more
1747  * conservative in removing files.
1748  */
1749 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:133
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:732
XLogRecPtr restart_lsn
Definition: slot.h:73
slock_t mutex
Definition: slot.h:106
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1860 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1861 {
1862  bool changed = false;
1864 
1865  SpinLockAcquire(&slot->mutex);
1867 
1868  /*
1869  * For physical replication we don't need the interlock provided by xmin
1870  * and effective_xmin since the consequences of a missed increase are
1871  * limited to query cancellations, so set both at once.
1872  */
1873  if (!TransactionIdIsNormal(slot->data.xmin) ||
1874  !TransactionIdIsNormal(feedbackXmin) ||
1875  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1876  {
1877  changed = true;
1878  slot->data.xmin = feedbackXmin;
1879  slot->effective_xmin = feedbackXmin;
1880  }
1881  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1882  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1883  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1884  {
1885  changed = true;
1886  slot->data.catalog_xmin = feedbackCatalogXmin;
1887  slot->effective_catalog_xmin = feedbackCatalogXmin;
1888  }
1889  SpinLockRelease(&slot->mutex);
1890 
1891  if (changed)
1892  {
1895  }
1896 }
TransactionId xmin
Definition: proc.h:228
ReplicationSlotPersistentData data
Definition: slot.h:133
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:129
TransactionId catalog_xmin
Definition: slot.h:70
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:62
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:130
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:106
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1588 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1589 {
1590  unsigned char firstchar;
1591  int r;
1592  bool received = false;
1593 
1595 
1596  for (;;)
1597  {
1598  pq_startmsgread();
1599  r = pq_getbyte_if_available(&firstchar);
1600  if (r < 0)
1601  {
1602  /* unexpected error or EOF */
1604  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1605  errmsg("unexpected EOF on standby connection")));
1606  proc_exit(0);
1607  }
1608  if (r == 0)
1609  {
1610  /* no data available without blocking */
1611  pq_endmsgread();
1612  break;
1613  }
1614 
1615  /* Read the message contents */
1617  if (pq_getmessage(&reply_message, 0))
1618  {
1620  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1621  errmsg("unexpected EOF on standby connection")));
1622  proc_exit(0);
1623  }
1624 
1625  /*
1626  * If we already received a CopyDone from the frontend, the frontend
1627  * should not send us anything until we've closed our end of the COPY.
1628  * XXX: In theory, the frontend could already send the next command
1629  * before receiving the CopyDone, but libpq doesn't currently allow
1630  * that.
1631  */
1632  if (streamingDoneReceiving && firstchar != 'X')
1633  ereport(FATAL,
1634  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1635  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1636  firstchar)));
1637 
1638  /* Handle the very limited subset of commands expected in this phase */
1639  switch (firstchar)
1640  {
1641  /*
1642  * 'd' means a standby reply wrapped in a CopyData packet.
1643  */
1644  case 'd':
1646  received = true;
1647  break;
1648 
1649  /*
1650  * CopyDone means the standby requested to finish streaming.
1651  * Reply with CopyDone, if we had not sent that already.
1652  */
1653  case 'c':
1654  if (!streamingDoneSending)
1655  {
1656  pq_putmessage_noblock('c', NULL, 0);
1657  streamingDoneSending = true;
1658  }
1659 
1660  streamingDoneReceiving = true;
1661  received = true;
1662  break;
1663 
1664  /*
1665  * 'X' means that the standby is closing down the socket.
1666  */
1667  case 'X':
1668  proc_exit(0);
1669 
1670  default:
1671  ereport(FATAL,
1672  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1673  errmsg("invalid standby message type \"%c\"",
1674  firstchar)));
1675  }
1676  }
1677 
1678  /*
1679  * Save the last reply timestamp if we've received at least one reply.
1680  */
1681  if (received)
1682  {
1684  waiting_for_ping_response = false;
1685  }
1686 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1692
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
static TimestampTz last_processing
Definition: walsender.c:165
void pq_startmsgread(void)
Definition: pqcomm.c:1210
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1040
static bool streamingDoneSending
Definition: walsender.c:182
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:141
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1272
static StringInfoData reply_message
Definition: walsender.c:161
void pq_endmsgread(void)
Definition: pqcomm.c:1234
static bool streamingDoneReceiving
Definition: walsender.c:183
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:174
static TimestampTz last_reply_timestamp
Definition: walsender.c:171

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1937 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, log_min_messages, WalSnd::mutex, MyPgXact, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1938 {
1939  TransactionId feedbackXmin;
1940  uint32 feedbackEpoch;
1941  TransactionId feedbackCatalogXmin;
1942  uint32 feedbackCatalogEpoch;
1943  TimestampTz replyTime;
1944 
1945  /*
1946  * Decipher the reply message. The caller already consumed the msgtype
1947  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1948  * of this message.
1949  */
1950  replyTime = pq_getmsgint64(&reply_message);
1951  feedbackXmin = pq_getmsgint(&reply_message, 4);
1952  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1953  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1954  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1955 
1956  if (log_min_messages <= DEBUG2)
1957  {
1958  char *replyTimeStr;
1959 
1960  /* Copy because timestamptz_to_str returns a static buffer */
1961  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1962 
1963  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
1964  feedbackXmin,
1965  feedbackEpoch,
1966  feedbackCatalogXmin,
1967  feedbackCatalogEpoch,
1968  replyTimeStr);
1969 
1970  pfree(replyTimeStr);
1971  }
1972 
1973  /*
1974  * Update shared state for this WalSender process based on reply data from
1975  * standby.
1976  */
1977  {
1978  WalSnd *walsnd = MyWalSnd;
1979 
1980  SpinLockAcquire(&walsnd->mutex);
1981  walsnd->replyTime = replyTime;
1982  SpinLockRelease(&walsnd->mutex);
1983  }
1984 
1985  /*
1986  * Unset WalSender's xmins if the feedback message values are invalid.
1987  * This happens when the downstream turned hot_standby_feedback off.
1988  */
1989  if (!TransactionIdIsNormal(feedbackXmin)
1990  && !TransactionIdIsNormal(feedbackCatalogXmin))
1991  {
1993  if (MyReplicationSlot != NULL)
1994  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1995  return;
1996  }
1997 
1998  /*
1999  * Check that the provided xmin/epoch are sane, that is, not in the future
2000  * and not so far back as to be already wrapped around. Ignore if not.
2001  */
2002  if (TransactionIdIsNormal(feedbackXmin) &&
2003  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2004  return;
2005 
2006  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2007  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2008  return;
2009 
2010  /*
2011  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2012  * the xmin will be taken into account by GetOldestXmin. This will hold
2013  * back the removal of dead rows and thereby prevent the generation of
2014  * cleanup conflicts on the standby server.
2015  *
2016  * There is a small window for a race condition here: although we just
2017  * checked that feedbackXmin precedes nextXid, the nextXid could have
2018  * gotten advanced between our fetching it and applying the xmin below,
2019  * perhaps far enough to make feedbackXmin wrap around. In that case the
2020  * xmin we set here would be "in the future" and have no effect. No point
2021  * in worrying about this since it's too late to save the desired data
2022  * anyway. Assuming that the standby sends us an increasing sequence of
2023  * xmins, this could only happen during the first reply cycle, else our
2024  * own xmin would prevent nextXid from advancing so far.
2025  *
2026  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2027  * is assumed atomic, and there's no real need to prevent a concurrent
2028  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2029  * safe, and if we're moving it backwards, well, the data is at risk
2030  * already since a VACUUM could have just finished calling GetOldestXmin.)
2031  *
2032  * If we're using a replication slot we reserve the xmin via that,
2033  * otherwise via the walsender's PGXACT entry. We can only track the
2034  * catalog xmin separately when using a slot, so we store the least of the
2035  * two provided when not using a slot.
2036  *
2037  * XXX: It might make sense to generalize the ephemeral slot concept and
2038  * always use the slot mechanism to handle the feedback xmin.
2039  */
2040  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2041  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2042  else
2043  {
2044  if (TransactionIdIsNormal(feedbackCatalogXmin)
2045  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2046  MyPgXact->xmin = feedbackCatalogXmin;
2047  else
2048  MyPgXact->xmin = feedbackXmin;
2049  }
2050 }
uint32 TransactionId
Definition: c.h:507
TransactionId xmin
Definition: proc.h:228
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1161
slock_t mutex
PGXACT * MyPgXact
Definition: proc.c:68
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1909
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1031
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:358
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:161
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
int log_min_messages
Definition: guc.c:492
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:226
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1860
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1730

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1692 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1693 {
1694  char msgtype;
1695 
1696  /*
1697  * Check message type from the first byte.
1698  */
1699  msgtype = pq_getmsgbyte(&reply_message);
1700 
1701  switch (msgtype)
1702  {
1703  case 'r':
1705  break;
1706 
1707  case 'h':
1709  break;
1710 
1711  default:
1713  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1714  errmsg("unexpected message type \"%c\"", msgtype)));
1715  proc_exit(0);
1716  }
1717 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
static StringInfoData reply_message
Definition: walsender.c:161
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1755
int errmsg(const char *fmt,...)
Definition: elog.c:784
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1937

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1755 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), log_min_messages, LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1756 {
1757  XLogRecPtr writePtr,
1758  flushPtr,
1759  applyPtr;
1760  bool replyRequested;
1761  TimeOffset writeLag,
1762  flushLag,
1763  applyLag;
1764  bool clearLagTimes;
1765  TimestampTz now;
1766  TimestampTz replyTime;
1767 
1768  static bool fullyAppliedLastTime = false;
1769 
1770  /* the caller already consumed the msgtype byte */
1771  writePtr = pq_getmsgint64(&reply_message);
1772  flushPtr = pq_getmsgint64(&reply_message);
1773  applyPtr = pq_getmsgint64(&reply_message);
1774  replyTime = pq_getmsgint64(&reply_message);
1775  replyRequested = pq_getmsgbyte(&reply_message);
1776 
1777  if (log_min_messages <= DEBUG2)
1778  {
1779  char *replyTimeStr;
1780 
1781  /* Copy because timestamptz_to_str returns a static buffer */
1782  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1783 
1784  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1785  (uint32) (writePtr >> 32), (uint32) writePtr,
1786  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1787  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1788  replyRequested ? " (reply requested)" : "",
1789  replyTimeStr);
1790 
1791  pfree(replyTimeStr);
1792  }
1793 
1794  /* See if we can compute the round-trip lag for these positions. */
1795  now = GetCurrentTimestamp();
1796  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1797  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1798  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1799 
1800  /*
1801  * If the standby reports that it has fully replayed the WAL in two
1802  * consecutive reply messages, then the second such message must result
1803  * from wal_receiver_status_interval expiring on the standby. This is a
1804  * convenient time to forget the lag times measured when it last
1805  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1806  * until more WAL traffic arrives.
1807  */
1808  clearLagTimes = false;
1809  if (applyPtr == sentPtr)
1810  {
1811  if (fullyAppliedLastTime)
1812  clearLagTimes = true;
1813  fullyAppliedLastTime = true;
1814  }
1815  else
1816  fullyAppliedLastTime = false;
1817 
1818  /* Send a reply if the standby requested one. */
1819  if (replyRequested)
1820  WalSndKeepalive(false);
1821 
1822  /*
1823  * Update shared state for this WalSender process based on reply data from
1824  * standby.
1825  */
1826  {
1827  WalSnd *walsnd = MyWalSnd;
1828 
1829  SpinLockAcquire(&walsnd->mutex);
1830  walsnd->write = writePtr;
1831  walsnd->flush = flushPtr;
1832  walsnd->apply = applyPtr;
1833  if (writeLag != -1 || clearLagTimes)
1834  walsnd->writeLag = writeLag;
1835  if (flushLag != -1 || clearLagTimes)
1836  walsnd->flushLag = flushLag;
1837  if (applyLag != -1 || clearLagTimes)
1838  walsnd->applyLag = applyLag;
1839  walsnd->replyTime = replyTime;
1840  SpinLockRelease(&walsnd->mutex);
1841  }
1842 
1845 
1846  /*
1847  * Advance our local xmin horizon when the client confirmed a flush.
1848  */
1849  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1850  {
1853  else
1855  }
1856 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1161
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1031
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3393
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1723
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:358
#define SlotIsLogical(slot)
Definition: slot.h:155
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:161
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3516
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
int log_min_messages
Definition: guc.c:492
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:994
#define elog(elevel,...)
Definition: elog.h:226
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:412
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
bool am_cascading_walsender
Definition: walsender.c:115
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1730

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 434 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

435 {
437  char histfname[MAXFNAMELEN];
438  char path[MAXPGPATH];
439  int fd;
440  off_t histfilelen;
441  off_t bytesleft;
442  Size len;
443 
444  /*
445  * Reply with a result set with one row, and two columns. The first col is
446  * the name of the history file, 2nd is the contents.
447  */
448 
449  TLHistoryFileName(histfname, cmd->timeline);
450  TLHistoryFilePath(path, cmd->timeline);
451 
452  /* Send a RowDescription message */
453  pq_beginmessage(&buf, 'T');
454  pq_sendint16(&buf, 2); /* 2 fields */
455 
456  /* first field */
457  pq_sendstring(&buf, "filename"); /* col name */
458  pq_sendint32(&buf, 0); /* table oid */
459  pq_sendint16(&buf, 0); /* attnum */
460  pq_sendint32(&buf, TEXTOID); /* type oid */
461  pq_sendint16(&buf, -1); /* typlen */
462  pq_sendint32(&buf, 0); /* typmod */
463  pq_sendint16(&buf, 0); /* format code */
464 
465  /* second field */
466  pq_sendstring(&buf, "content"); /* col name */
467  pq_sendint32(&buf, 0); /* table oid */
468  pq_sendint16(&buf, 0); /* attnum */
469  pq_sendint32(&buf, BYTEAOID); /* type oid */
470  pq_sendint16(&buf, -1); /* typlen */
471  pq_sendint32(&buf, 0); /* typmod */
472  pq_sendint16(&buf, 0); /* format code */
473  pq_endmessage(&buf);
474 
475  /* Send a DataRow message */
476  pq_beginmessage(&buf, 'D');
477  pq_sendint16(&buf, 2); /* # of columns */
478  len = strlen(histfname);
479  pq_sendint32(&buf, len); /* col1 len */
480  pq_sendbytes(&buf, histfname, len);
481 
482  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
483  if (fd < 0)
484  ereport(ERROR,
486  errmsg("could not open file \"%s\": %m", path)));
487 
488  /* Determine file length and send it to client */
489  histfilelen = lseek(fd, 0, SEEK_END);
490  if (histfilelen < 0)
491  ereport(ERROR,
493  errmsg("could not seek to end of file \"%s\": %m", path)));
494  if (lseek(fd, 0, SEEK_SET) != 0)
495  ereport(ERROR,
497  errmsg("could not seek to beginning of file \"%s\": %m", path)));
498 
499  pq_sendint32(&buf, histfilelen); /* col2 len */
500 
501  bytesleft = histfilelen;
502  while (bytesleft > 0)
503  {
504  PGAlignedBlock rbuf;
505  int nread;
506 
508  nread = read(fd, rbuf.data, sizeof(rbuf));
510  if (nread < 0)
511  ereport(ERROR,
513  errmsg("could not read file \"%s\": %m",
514  path)));
515  else if (nread == 0)
516  ereport(ERROR,
518  errmsg("could not read file \"%s\": read %d of %zu",
519  path, nread, (Size) bytesleft)));
520 
521  pq_sendbytes(&buf, rbuf.data, nread);
522  bytesleft -= nread;
523  }
524  CloseTransientFile(fd);
525 
526  pq_endmessage(&buf);
527 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
int errcode(int sqlerrcode)
Definition: elog.c:570
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1171
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1046
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2235
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:67
int errcode_for_file_access(void)
Definition: elog.c:593
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1262
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:43
int CloseTransientFile(int fd)
Definition: fd.c:2412
#define MAXFNAMELEN
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1238
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:784
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1053 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1054 {
1056 
1057  /* make sure that our requirements are still fulfilled */
1059 
1061 
1062  ReplicationSlotAcquire(cmd->slotname, true);
1063 
1064  /*
1065  * Force a disconnect, so that the decoding code doesn't need to care
1066  * about an eventual switch from running in recovery, to running in a
1067  * normal environment. Client code is expected to handle reconnects.
1068  */
1070  {
1071  ereport(LOG,
1072  (errmsg("terminating walsender process after promotion")));
1073  got_STOPPING = true;
1074  }
1075 
1076  /*
1077  * Create our decoding context, making it start at the previously ack'ed
1078  * position.
1079  *
1080  * Do this before sending CopyBoth, so that any errors are reported early.
1081  */
1083  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1087 
1088 
1090 
1091  /* Send a CopyBothResponse message, and start streaming */
1092  pq_beginmessage(&buf, 'W');
1093  pq_sendbyte(&buf, 0);
1094  pq_sendint16(&buf, 0);
1095  pq_endmessage(&buf);
1096  pq_flush();
1097 
1098 
1099  /* Start reading WAL from the oldest required WAL. */
1101 
1102  /*
1103  * Report the location after which we'll send out further commits as the
1104  * current sentPtr.
1105  */
1107 
1108  /* Also update the sent position status in shared memory */
1112 
1113  replication_active = true;
1114 
1116 
1117  /* Main loop of walsender */
1119 
1122 
1123  replication_active = false;
1124  if (got_STOPPING)
1125  proc_exit(0);
1127 
1128  /* Get out of COPY mode (CommandComplete). */
1129  EndCommand("COPY 0", DestRemote);
1130 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
void proc_exit(int code)
Definition: ipc.c:104
ReplicationSlotPersistentData data
Definition: slot.h:133
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7894
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:81
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:761
#define SpinLockAcquire(lock)
Definition: spin.h:62
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:200
static char * buf
Definition: pg_test_fsync.c:67
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1168
static XLogRecPtr logical_startptr
Definition: walsender.c:201
void ReplicationSlotRelease(void)
Definition: slot.c:424
void SyncRepInitConfig(void)
Definition: syncrep.c:384
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2137
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1141
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:198
XLogRecPtr sentPtr
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:363
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:732
void WalSndSetState(WalSndState state)
Definition: walsender.c:3164
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:501
static void XLogSendLogical(void)
Definition: walsender.c:2800
XLogRecPtr restart_lsn
Definition: slot.h:73
int errmsg(const char *fmt,...)
Definition: elog.c:784
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:78
bool am_cascading_walsender
Definition: walsender.c:115
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1257

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 536 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

537 {
539  XLogRecPtr FlushPtr;
540 
541  if (ThisTimeLineID == 0)
542  ereport(ERROR,
543  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
544  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
545 
546  /*
547  * We assume here that we're logging enough information in the WAL for
548  * log-shipping, since this is checked in PostmasterMain().
549  *
550  * NOTE: wal_level can only change at shutdown, so in most cases it is
551  * difficult for there to be WAL data that we can still see that was
552  * written at wal_level='minimal'.
553  */
554 
555  if (cmd->slotname)
556  {
557  ReplicationSlotAcquire(cmd->slotname, true);
559  ereport(ERROR,
560  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
561  (errmsg("cannot use a logical replication slot for physical replication"))));
562  }
563 
564  /*
565  * Select the timeline. If it was given explicitly by the client, use
566  * that. Otherwise use the timeline of the last replayed record, which is
567  * kept in ThisTimeLineID.
568  */
570  {
571  /* this also updates ThisTimeLineID */
572  FlushPtr = GetStandbyFlushRecPtr();
573  }
574  else
575  FlushPtr = GetFlushRecPtr();
576 
577  if (cmd->timeline != 0)
578  {
579  XLogRecPtr switchpoint;
580 
581  sendTimeLine = cmd->timeline;
583  {
584  sendTimeLineIsHistoric = false;
586  }
587  else
588  {
589  List *timeLineHistory;
590 
591  sendTimeLineIsHistoric = true;
592 
593  /*
594  * Check that the timeline the client requested exists, and the
595  * requested start location is on that timeline.
596  */
597  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
598  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
600  list_free_deep(timeLineHistory);
601 
602  /*
603  * Found the requested timeline in the history. Check that
604  * requested startpoint is on that timeline in our history.
605  *
606  * This is quite loose on purpose. We only check that we didn't
607  * fork off the requested timeline before the switchpoint. We
608  * don't check that we switched *to* it before the requested
609  * starting point. This is because the client can legitimately
610  * request to start replication from the beginning of the WAL
611  * segment that contains switchpoint, but on the new timeline, so
612  * that it doesn't end up with a partial segment. If you ask for
613  * too old a starting point, you'll get an error later when we
614  * fail to find the requested WAL segment in pg_wal.
615  *
616  * XXX: we could be more strict here and only allow a startpoint
617  * that's older than the switchpoint, if it's still in the same
618  * WAL segment.
619  */
620  if (!XLogRecPtrIsInvalid(switchpoint) &&
621  switchpoint < cmd->startpoint)
622  {
623  ereport(ERROR,
624  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
625  (uint32) (cmd->startpoint >> 32),
626  (uint32) (cmd->startpoint),
627  cmd->timeline),
628  errdetail("This server's history forked from timeline %u at %X/%X.",
629  cmd->timeline,
630  (uint32) (switchpoint >> 32),
631  (uint32) (switchpoint))));
632  }
633  sendTimeLineValidUpto = switchpoint;
634  }
635  }
636  else
637  {
640  sendTimeLineIsHistoric = false;
641  }
642 
644 
645  /* If there is nothing to stream, don't even enter COPY mode */
647  {
648  /*
649  * When we first start replication the standby will be behind the
650  * primary. For some applications, for example synchronous
651  * replication, it is important to have a clear state for this initial
652  * catchup mode, so we can trigger actions when we change streaming
653  * state later. We may stay in this state for a long time, which is
654  * exactly why we want to be able to monitor whether or not we are
655  * still here.
656  */
658 
659  /* Send a CopyBothResponse message, and start streaming */
660  pq_beginmessage(&buf, 'W');
661  pq_sendbyte(&buf, 0);
662  pq_sendint16(&buf, 0);
663  pq_endmessage(&buf);
664  pq_flush();
665 
666  /*
667  * Don't allow a request to stream from a future point in WAL that
668  * hasn't been flushed to disk in this server yet.
669  */
670  if (FlushPtr < cmd->startpoint)
671  {
672  ereport(ERROR,
673  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
674  (uint32) (cmd->startpoint >> 32),
675  (uint32) (cmd->startpoint),
676  (uint32) (FlushPtr >> 32),
677  (uint32) (FlushPtr))));
678  }
679 
680  /* Start streaming from the requested point */
681  sentPtr = cmd->startpoint;
682 
683  /* Initialize shared memory status, too */
687 
689 
690  /* Main loop of walsender */
691  replication_active = true;
692 
694 
695  replication_active = false;
696  if (got_STOPPING)
697  proc_exit(0);
699 
701  }
702 
703  if (cmd->slotname)
705 
706  /*
707  * Copy is finished now. Send a single-row result set indicating the next
708  * timeline.
709  */
711  {
712  char startpos_str[8 + 1 + 8 + 1];
714  TupOutputState *tstate;
715  TupleDesc tupdesc;
716  Datum values[2];
717  bool nulls[2];
718 
719  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
720  (uint32) (sendTimeLineValidUpto >> 32),
722 
724  MemSet(nulls, false, sizeof(nulls));
725 
726  /*
727  * Need a tuple descriptor representing two columns. int8 may seem
728  * like a surprising data type for this, but in theory int4 would not
729  * be wide enough for this, as TimeLineID is unsigned.
730  */
731  tupdesc = CreateTemplateTupleDesc(2);
732  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
733  INT8OID, -1, 0);
734  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
735  TEXTOID, -1, 0);
736 
737  /* prepare for projection of tuple */
738  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
739 
740  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
741  values[1] = CStringGetTextDatum(startpos_str);
742 
743  /* send it to dest */
744  do_tup_output(tstate, values, nulls);
745 
746  end_tup_output(tstate);
747  }
748 
749  /* Send CommandComplete message */
750  pq_puttextmessage('C', "START_STREAMING");
751 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:80
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2537
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2136
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
#define MemSet(start, val, len)
Definition: c.h:941
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8226
void list_free_deep(List *list)
Definition: list.c:1150
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2194
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:668
#define ERROR
Definition: elog.h:43
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2116
static char * buf
Definition: pg_test_fsync.c:67
static bool streamingDoneSending
Definition: walsender.c:182
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:860
unsigned int uint32
Definition: c.h:358
void ReplicationSlotRelease(void)
Definition: slot.c:424
#define SlotIsLogical(slot)
Definition: slot.h:155
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1877
void SyncRepInitConfig(void)
Definition: syncrep.c:384
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2137
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:198
uintptr_t Datum
Definition: postgres.h:367
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:561
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
TimeLineID ThisTimeLineID
Definition: xlog.c:183
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
void WalSndSetState(WalSndState state)
Definition: walsender.c:3164
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
static bool streamingDoneReceiving
Definition: walsender.c:183
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:784
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2920
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
Definition: pg_list.h:45
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
bool am_cascading_walsender
Definition: walsender.c:115

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 1909 of file walsender.c.

References GetNextXidAndEpoch(), and TransactionIdPrecedesOrEquals().

Referenced by ProcessStandbyHSFeedbackMessage().

1910 {
1911  TransactionId nextXid;
1912  uint32 nextEpoch;
1913 
1914  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1915 
1916  if (xid <= nextXid)
1917  {
1918  if (epoch != nextEpoch)
1919  return false;
1920  }
1921  else
1922  {
1923  if (epoch + 1 != nextEpoch)
1924  return false;
1925  }
1926 
1927  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1928  return false; /* epoch OK, but it's wrapped around */
1929 
1930  return true;
1931 }
uint32 TransactionId
Definition: c.h:507
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8295
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:358
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2110 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2111 {
2112  TimestampTz timeout;
2113 
2114  /* don't bail out if we're doing something that doesn't require timeouts */
2115  if (last_reply_timestamp <= 0)
2116  return;
2117 
2120 
2121  if (wal_sender_timeout > 0 && last_processing >= timeout)
2122  {
2123  /*
2124  * Since typically expiration of replication timeout means
2125  * communication problem, we don't send the error message to the
2126  * standby.
2127  */
2129  (errmsg("terminating walsender process due to replication timeout")));
2130 
2131  WalSndShutdown();
2132  }
2133 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:165
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndShutdown(void)
Definition: walsender.c:233
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:784
static TimestampTz last_reply_timestamp
Definition: walsender.c:171

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2060 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2061 {
2062  long sleeptime = 10000; /* 10 s */
2063 
2065  {
2066  TimestampTz wakeup_time;
2067  long sec_to_timeout;
2068  int microsec_to_timeout;
2069 
2070  /*
2071  * At the latest stop sleeping once wal_sender_timeout has been
2072  * reached.
2073  */
2076 
2077  /*
2078  * If no ping has been sent yet, wakeup when it's time to do so.
2079  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2080  * the timeout passed without a response.
2081  */
2084  wal_sender_timeout / 2);
2085 
2086  /* Compute relative time until wakeup. */
2087  TimestampDifference(now, wakeup_time,
2088  &sec_to_timeout, &microsec_to_timeout);
2089 
2090  sleeptime = sec_to_timeout * 1000 +
2091  microsec_to_timeout / 1000;
2092  }
2093 
2094  return sleeptime;
2095 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:42
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1644
static bool waiting_for_ping_response
Definition: walsender.c:174
static TimestampTz last_reply_timestamp
Definition: walsender.c:171
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2880 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2881 {
2882  XLogRecPtr replicatedPtr;
2883 
2884  /* ... let's just be real sure we're caught up ... */
2885  send_data();
2886 
2887  /*
2888  * To figure out whether all WAL has successfully been replicated, check
2889  * flush location if valid, write otherwise. Tools like pg_receivewal will
2890  * usually (unless in synchronous mode) return an invalid flush location.
2891  */
2892  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2894 
2895  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2896  !pq_is_send_pending())
2897  {
2898  /* Inform the standby that XLOG streaming is done */
2899  EndCommand("COPY 0", DestRemote);
2900  pq_flush();
2901 
2902  proc_exit(0);
2903  }
2905  {
2906  WalSndKeepalive(true);
2908  }
2909 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3393
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:186
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:174

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 298 of file walsender.c.

References close, ConditionVariableCancelSleep(), DestNone, DestRemote, got_SIGUSR2, got_STOPPING, LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, WalSndSetState(), WalSndShutdown(), WALSNDSTATE_STARTUP, and whereToSendOutput.

Referenced by PostgresMain().

299 {
303 
304  if (sendFile >= 0)
305  {
306  close(sendFile);
307  sendFile = -1;
308  }
309 
310  if (MyReplicationSlot != NULL)
312 
314 
315  replication_active = false;
316 
317  if (got_STOPPING || got_SIGUSR2)
318  proc_exit(0);
319 
320  /* Revert back to startup state */
322 }
static int sendFile
Definition: walsender.c:135
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
void proc_exit(int code)
Definition: ipc.c:104
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:424
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1262
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:189
static volatile sig_atomic_t replication_active
Definition: walsender.c:198
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3164
void ReplicationSlotCleanup(void)
Definition: slot.c:479
void LWLockReleaseAll(void)
Definition: lwlock.c:1824
#define close(a)
Definition: win32.h:12

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3183 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3184 {
3185  switch (state)
3186  {
3187  case WALSNDSTATE_STARTUP:
3188  return "startup";
3189  case WALSNDSTATE_BACKUP:
3190  return "backup";
3191  case WALSNDSTATE_CATCHUP:
3192  return "catchup";
3193  case WALSNDSTATE_STREAMING:
3194  return "streaming";
3195  case WALSNDSTATE_STOPPING:
3196  return "stopping";
3197  }
3198  return "UNKNOWN";
3199 }
Definition: regguts.h:298

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3100 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3101 {
3102  int i;
3103 
3104  for (i = 0; i < max_wal_senders; i++)
3105  {
3106  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3107  pid_t pid;
3108 
3109  SpinLockAcquire(&walsnd->mutex);
3110  pid = walsnd->pid;
3111  SpinLockRelease(&walsnd->mutex);
3112 
3113  if (pid == 0)
3114  continue;
3115 
3117  }
3118 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3393 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3394 {
3395  elog(DEBUG2, "sending replication keepalive");
3396 
3397  /* construct the message... */
3399  pq_sendbyte(&output_message, 'k');
3402  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3403 
3404  /* ... and send it wrapped in CopyData */
3406 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
static StringInfoData output_message
Definition: walsender.c:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static XLogRecPtr sentPtr
Definition: walsender.c:157
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog(elevel,...)
Definition: elog.h:226

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3412 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3413 {
3414  TimestampTz ping_time;
3415 
3416  /*
3417  * Don't send keepalive messages if timeouts are globally disabled or
3418  * we're doing something not partaking in timeouts.
3419  */
3420  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3421  return;
3422 
3424  return;
3425 
3426  /*
3427  * If half of wal_sender_timeout has lapsed without receiving any reply
3428  * from the standby, send a keep-alive message to the standby requesting
3429  * an immediate reply.
3430  */
3432  wal_sender_timeout / 2);
3433  if (last_processing >= ping_time)
3434  {
3435  WalSndKeepalive(true);
3437 
3438  /* Try to flush pending output to the client */
3439  if (pq_flush_if_writable() != 0)
3440  WalSndShutdown();
3441  }
3442 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3393
static TimestampTz last_processing
Definition: walsender.c:165
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:233
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:174
static TimestampTz last_reply_timestamp
Definition: walsender.c:171

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2326 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2327 {
2328  WalSnd *walsnd = MyWalSnd;
2329 
2330  Assert(walsnd != NULL);
2331 
2332  MyWalSnd = NULL;
2333 
2334  SpinLockAcquire(&walsnd->mutex);
2335  /* clear latch while holding the spinlock, so it can safely be read */
2336  walsnd->latch = NULL;
2337  /* Mark WalSnd struct as no longer being in use. */
2338  walsnd->pid = 0;
2339  SpinLockRelease(&walsnd->mutex);
2340 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:732

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2995 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2996 {
2997  int save_errno = errno;
2998 
2999  got_SIGUSR2 = true;
3000  SetLatch(MyLatch);
3001 
3002  errno = save_errno;
3003 }
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:189
void SetLatch(volatile Latch *latch)
Definition: latch.c:436
struct Latch * MyLatch
Definition: globals.c:54

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2137 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

2138 {
2139  /*
2140  * Initialize the last reply timestamp. That enables timeout processing
2141  * from hereon.
2142  */
2144  waiting_for_ping_response = false;
2145 
2146  /*
2147  * Loop until we reach the end of this timeline or the client requests to
2148  * stop streaming.
2149  */
2150  for (;;)
2151  {
2152  /* Clear any already-pending wakeups */
2154 
2156 
2157  /* Process any requests or signals received recently */
2158  if (ConfigReloadPending)
2159  {
2160  ConfigReloadPending = false;
2163  }
2164 
2165  /* Check for input from the client */
2167 
2168  /*
2169  * If we have received CopyDone from the client, sent CopyDone
2170  * ourselves, and the output buffer is empty, it's time to exit
2171  * streaming.
2172  */
2174  !pq_is_send_pending())
2175  break;
2176 
2177  /*
2178  * If we don't have any pending data in the output buffer, try to send
2179  * some more. If there is some, we don't bother to call send_data
2180  * again until we've flushed it ... but we'd better assume we are not
2181  * caught up.
2182  */
2183  if (!pq_is_send_pending())
2184  send_data();
2185  else
2186  WalSndCaughtUp = false;
2187 
2188  /* Try to flush pending output to the client */
2189  if (pq_flush_if_writable() != 0)
2190  WalSndShutdown();
2191 
2192  /* If nothing remains to be sent right now ... */
2194  {
2195  /*
2196  * If we're in catchup state, move to streaming. This is an
2197  * important state change for users to know about, since before
2198  * this point data loss might occur if the primary dies and we
2199  * need to failover to the standby. The state change is also
2200  * important for synchronous replication, since commits that
2201  * started to wait at that point might wait for some time.
2202  */
2204  {
2205  ereport(DEBUG1,
2206  (errmsg("\"%s\" has now caught up with upstream server",
2207  application_name)));
2209  }
2210 
2211  /*
2212  * When SIGUSR2 arrives, we send any outstanding logs up to the
2213  * shutdown checkpoint record (i.e., the latest record), wait for
2214  * them to be replicated to the standby, and exit. This may be a
2215  * normal termination at shutdown, or a promotion, the walsender
2216  * is not sure which.
2217  */
2218  if (got_SIGUSR2)
2219  WalSndDone(send_data);
2220  }
2221 
2222  /* Check for replication timeout. */
2224 
2225  /* Send keepalive if the time has come */
2227 
2228  /*
2229  * We don't block if not caught up, unless there is unsent data
2230  * pending in which case we'd better block until the socket is
2231  * write-ready. This test is only needed for the case where the
2232  * send_data callback handled a subset of the available data but then
2233  * pq_flush_if_writable flushed it all --- we should immediately try
2234  * to send more.
2235  */
2237  {
2238  long sleeptime;
2239  int wakeEvents;
2240 
2241  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2243 
2244  /*
2245  * Use fresh timestamp, not last_processed, to reduce the chance
2246  * of reaching wal_sender_timeout before sending a keepalive.
2247  */
2249 
2250  if (pq_is_send_pending())
2251  wakeEvents |= WL_SOCKET_WRITEABLE;
2252 
2253  /* Sleep until something happens or we time out */
2254  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2255  MyProcPort->sock, sleeptime,
2257  }
2258  }
2259  return;
2260 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2880
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:42
void ResetLatch(volatile Latch *latch)
Definition: latch.c:519
pgsocket sock
Definition: libpq-be.h:118
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:182
#define pq_flush_if_writable()
Definition: libpq.h:40
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:189
void SyncRepInitConfig(void)
Definition: syncrep.c:384
#define ereport(elevel, rest)
Definition: elog.h:141
static bool WalSndCaughtUp
Definition: walsender.c:186
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3412
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2110
static void WalSndShutdown(void)
Definition: walsender.c:233
WalSnd * MyWalSnd
Definition: walsender.c:111
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2060
void WalSndSetState(WalSndState state)
Definition: walsender.c:3164
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
static bool streamingDoneReceiving
Definition: walsender.c:183
char * application_name
Definition: guc.c:511
int errmsg(const char *fmt,...)
Definition: elog.c:784
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:174
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:171
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1588
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1141 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1142 {
1143  /* can't have sync rep confused by sending the same LSN several times */
1144  if (!last_write)
1145  lsn = InvalidXLogRecPtr;
1146 
1147  resetStringInfo(ctx->out);
1148 
1149  pq_sendbyte(ctx->out, 'w');
1150  pq_sendint64(ctx->out, lsn); /* dataStart */
1151  pq_sendint64(ctx->out, lsn); /* walEnd */
1152 
1153  /*
1154  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1155  * reserve space here.
1156  */
1157  pq_sendint64(ctx->out, 0); /* sendtime */
1158 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
StringInfo out
Definition: logical.h:73

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2950 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2951 {
2952  int i;
2953 
2954  for (i = 0; i < max_wal_senders; i++)
2955  {
2956  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2957 
2958  SpinLockAcquire(&walsnd->mutex);
2959  if (walsnd->pid == 0)
2960  {
2961  SpinLockRelease(&walsnd->mutex);
2962  continue;
2963  }
2964  walsnd->needreload = true;
2965  SpinLockRelease(&walsnd->mutex);
2966  }
2967 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3164 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3165 {
3166  WalSnd *walsnd = MyWalSnd;
3167 
3169 
3170  if (walsnd->state == state)
3171  return;
3172 
3173  SpinLockAcquire(&walsnd->mutex);
3174  walsnd->state = state;
3175  SpinLockRelease(&walsnd->mutex);
3176 }
slock_t mutex
bool am_walsender
Definition: walsender.c:114
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:732
Definition: regguts.h:298

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3039 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3040 {
3041  bool found;
3042  int i;
3043 
3044  WalSndCtl = (WalSndCtlData *)
3045  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3046 
3047  if (!found)
3048  {
3049  /* First time through, so initialize */
3051 
3052  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3054 
3055  for (i = 0; i < max_wal_senders; i++)
3056  {
3057  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3058 
3059  SpinLockInit(&walsnd->mutex);
3060  }
3061  }
3062 }
Size WalSndShmemSize(void)
Definition: walsender.c:3027
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
#define MemSet(start, val, len)
Definition: c.h:941
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:120
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3027 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3028 {
3029  Size size = 0;
3030 
3031  size = offsetof(WalSndCtlData, walsnds);
3032  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3033 
3034  return size;
3035 }
int max_wal_senders
Definition: walsender.c:120
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:466
#define offsetof(type, field)
Definition: c.h:655

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 233 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndErrorCleanup(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

265 {
267 
268  /* Create a per-walsender data structure in shared memory */
270 
271  /*
272  * We don't currently need any ResourceOwner in a walsender process, but
273  * if we did, we could call CreateAuxProcessResourceOwner here.
274  */
275 
276  /*
277  * Let postmaster know that we're a WAL sender. Once we've declared us as
278  * a WAL sender process, postmaster will let us outlive the bgwriter and
279  * kill us last in the shutdown sequence, so we get a chance to stream all
280  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
281  * there's no going back, and we mustn't write any WAL records after this.
282  */
285 
286  /* Initialize empty timestamp buffer for lag tracking. */
288 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2264
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
bool RecoveryInProgress(void)
Definition: xlog.c:7894
static LagTracker * lag_tracker
Definition: walsender.c:223
MemoryContext TopMemoryContext
Definition: mcxt.c:44
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:814
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
bool am_cascading_walsender
Definition: walsender.c:115

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3007 of file walsender.c.

References die(), InitializeTimeouts(), PostgresSigHupHandler(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3008 {
3009  /* Set up signal handlers */
3010  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
3011  * file */
3012  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3013  pqsignal(SIGTERM, die); /* request shutdown */
3014  pqsignal(SIGQUIT, quickdie); /* hard crash time */
3015  InitializeTimeouts(); /* establishes SIGALRM handler */
3018  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3019  * shutdown */
3020 
3021  /* Reset some signals that are accepted by postmaster but not here */
3023 }
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGQUIT
Definition: win32_port.h:164
#define SIGUSR1
Definition: win32_port.h:175
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2995
#define SIGCHLD
Definition: win32_port.h:173
#define SIGPIPE
Definition: win32_port.h:168
#define SIGUSR2
Definition: win32_port.h:176
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2792
#define SIGHUP
Definition: win32_port.h:163
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2832
#define SIG_IGN
Definition: win32_port.h:160
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIG_DFL
Definition: win32_port.h:158
void die(SIGNAL_ARGS)
Definition: postgres.c:2761
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:261
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2694

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1257 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1258 {
1259  static TimestampTz sendTime = 0;
1261 
1262  /*
1263  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1264  * avoid flooding the lag tracker when we commit frequently.
1265  */
1266 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1267  if (!TimestampDifferenceExceeds(sendTime, now,
1269  return;
1270 
1271  LagTrackerWrite(lsn, now);
1272  sendTime = now;
1273 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1669
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3451
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1283 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1284 {
1285  int wakeEvents;
1286  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1287 
1288 
1289  /*
1290  * Fast path to avoid acquiring the spinlock in case we already know we
1291  * have enough WAL available. This is particularly interesting if we're
1292  * far behind.
1293  */
1294  if (RecentFlushPtr != InvalidXLogRecPtr &&
1295  loc <= RecentFlushPtr)
1296  return RecentFlushPtr;
1297 
1298  /* Get a more recent flush pointer. */
1299  if (!RecoveryInProgress())
1300  RecentFlushPtr = GetFlushRecPtr();
1301  else
1302  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1303 
1304  for (;;)
1305  {
1306  long sleeptime;
1307 
1308  /* Clear any already-pending wakeups */
1310 
1312 
1313  /* Process any requests or signals received recently */
1314  if (ConfigReloadPending)
1315  {
1316  ConfigReloadPending = false;
1319  }
1320 
1321  /* Check for input from the client */
1323 
1324  /*
1325  * If we're shutting down, trigger pending WAL to be written out,
1326  * otherwise we'd possibly end up waiting for WAL that never gets
1327  * written, because walwriter has shut down already.
1328  */
1329  if (got_STOPPING)
1331 
1332  /* Update our idea of the currently flushed position. */
1333  if (!RecoveryInProgress())
1334  RecentFlushPtr = GetFlushRecPtr();
1335  else
1336  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1337 
1338  /*
1339  * If postmaster asked us to stop, don't wait anymore.
1340  *
1341  * It's important to do this check after the recomputation of
1342  * RecentFlushPtr, so we can send all remaining data before shutting
1343  * down.
1344  */
1345  if (got_STOPPING)
1346  break;
1347 
1348  /*
1349  * We only send regular messages to the client for full decoded
1350  * transactions, but a synchronous replication and walsender shutdown
1351  * possibly are waiting for a later location. So we send pings
1352  * containing the flush location every now and then.
1353  */
1354  if (MyWalSnd->flush < sentPtr &&
1355  MyWalSnd->write < sentPtr &&
1357  {
1358  WalSndKeepalive(false);
1360  }
1361 
1362  /* check whether we're done */
1363  if (loc <= RecentFlushPtr)
1364  break;
1365 
1366  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1367  WalSndCaughtUp = true;
1368 
1369  /*
1370  * Try to flush any pending output to the client.
1371  */
1372  if (pq_flush_if_writable() != 0)
1373  WalSndShutdown();
1374 
1375  /*
1376  * If we have received CopyDone from the client, sent CopyDone
1377  * ourselves, and the output buffer is empty, it's time to exit
1378  * streaming, so fail the current WAL fetch request.
1379  */
1381  !pq_is_send_pending())
1382  break;
1383 
1384  /* die if timeout was reached */
1386 
1387  /* Send keepalive if the time has come */
1389 
1390  /*
1391  * Sleep until something happens or we time out. Also wait for the
1392  * socket becoming writable, if there's still pending output.
1393  * Otherwise we might sit on sendable output data while waiting for
1394  * new WAL to be generated. (But if we have nothing to send, we don't
1395  * want to wake on socket-writable.)
1396  */
1398 
1399  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1401 
1402  if (pq_is_send_pending())
1403  wakeEvents |= WL_SOCKET_WRITEABLE;
1404 
1405  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1406  MyProcPort->sock, sleeptime,
1408  }
1409 
1410  /* reactivate latch so WalSndLoop knows to continue */
1411  SetLatch(MyLatch);
1412  return RecentFlushPtr;
1413 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8226
int sleeptime
Definition: pg_standby.c:42
void ResetLatch(volatile Latch *latch)
Definition: latch.c:519
bool RecoveryInProgress(void)
Definition: xlog.c:7894
pgsocket sock
Definition: libpq-be.h:118
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3393
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11163
bool XLogBackgroundFlush(void)
Definition: xlog.c:2992
static bool streamingDoneSending
Definition: walsender.c:182
#define pq_flush_if_writable()
Definition: libpq.h:40
void SyncRepInitConfig(void)
Definition: syncrep.c:384
static bool WalSndCaughtUp
Definition: walsender.c:186
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3412
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2110
static void WalSndShutdown(void)
Definition: walsender.c:233
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2060
void SetLatch(volatile Latch *latch)
Definition: latch.c:436
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
static bool streamingDoneReceiving
Definition: walsender.c:183
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:174
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1588
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3126 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3127 {
3128  for (;;)
3129  {
3130  int i;
3131  bool all_stopped = true;
3132 
3133  for (i = 0; i < max_wal_senders; i++)
3134  {
3135  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3136 
3137  SpinLockAcquire(&walsnd->mutex);
3138 
3139  if (walsnd->pid == 0)
3140  {
3141  SpinLockRelease(&walsnd->mutex);
3142  continue;
3143  }
3144 
3145  if (walsnd->state != WALSNDSTATE_STOPPING)
3146  {
3147  all_stopped = false;
3148  SpinLockRelease(&walsnd->mutex);
3149  break;
3150  }
3151  SpinLockRelease(&walsnd->mutex);
3152  }
3153 
3154  /* safe to leave if confirmation is done for all WAL senders */
3155  if (all_stopped)
3156  return;
3157 
3158  pg_usleep(10000L); /* wait for 10 msec */
3159  }
3160 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3071 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3072 {
3073  int i;
3074 
3075  for (i = 0; i < max_wal_senders; i++)
3076  {
3077  Latch *latch;
3078  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3079 
3080  /*
3081  * Get latch pointer with spinlock held, for the unlikely case that
3082  * pointer reads aren't atomic (as they're 8 bytes).
3083  */
3084  SpinLockAcquire(&walsnd->mutex);
3085  latch = walsnd->latch;
3086  SpinLockRelease(&walsnd->mutex);
3087 
3088  if (latch != NULL)
3089  SetLatch(latch);
3090  }
3091 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:436
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1168 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1170 {
1171  TimestampTz now;
1172 
1173  /* output previously gathered data in a CopyData packet */
1174  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1175 
1176  /*
1177  * Fill the send timestamp last, so that it is taken as late as possible.
1178  * This is somewhat ugly, but the protocol is set as it's already used for
1179  * several releases by streaming physical replication.
1180  */
1182  now = GetCurrentTimestamp();
1183  pq_sendint64(&tmpbuf, now);
1184  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1185  tmpbuf.data, sizeof(int64));
1186 
1188 
1189  /* Try to flush pending output to the client */
1190  if (pq_flush_if_writable() != 0)
1191  WalSndShutdown();
1192 
1193  /* Try taking fast path unless we get too close to walsender timeout. */
1195  wal_sender_timeout / 2) &&
1196  !pq_is_send_pending())
1197  {
1198  return;
1199  }
1200 
1201  /* If we have pending write here, go to slow path */
1202  for (;;)
1203  {
1204  int wakeEvents;
1205  long sleeptime;
1206 
1207  /* Check for input from the client */
1209 
1210  /* die if timeout was reached */
1212 
1213  /* Send keepalive if the time has come */
1215 
1216  if (!pq_is_send_pending())
1217  break;
1218 
1220 
1221  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1223 
1224  /* Sleep until something happens or we time out */
1225  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1226  MyProcPort->sock, sleeptime,
1228 
1229  /* Clear any already-pending wakeups */
1231 
1233 
1234  /* Process any requests or signals received recently */
1235  if (ConfigReloadPending)
1236  {
1237  ConfigReloadPending = false;
1240  }
1241 
1242  /* Try to flush pending output to the client */
1243  if (pq_flush_if_writable() != 0)
1244  WalSndShutdown();
1245  }
1246 
1247  /* reactivate latch so WalSndLoop knows to continue */
1248  SetLatch(MyLatch);
1249 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
int wal_sender_timeout
Definition: walsender.c:122
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:42
void ResetLatch(volatile Latch *latch)
Definition: latch.c:519
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
pgsocket sock
Definition: libpq-be.h:118
#define pq_flush_if_writable()
Definition: libpq.h:40
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
void SyncRepInitConfig(void)
Definition: syncrep.c:384
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3412
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2110
static void WalSndShutdown(void)
Definition: walsender.c:233
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2060
void SetLatch(volatile Latch *latch)
Definition: latch.c:436
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
static StringInfoData tmpbuf
Definition: walsender.c:162
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:73
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:171
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1588
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ XLogRead()

static void XLogRead ( char *  buf,
XLogRecPtr  startptr,
Size  count 
)
static

Definition at line 2354 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), buf, CheckXLogRemoved(), close, curFileTimeLine, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, MAXPGPATH, WalSnd::mutex, MyWalSnd, WalSnd::needreload, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sendFile, sendOff, sendSegNo, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, startptr, ThisTimeLineID, WAIT_EVENT_WAL_READ, wal_segment_size, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegmentOffset.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2355 {
2356  char *p;
2357  XLogRecPtr recptr;
2358  Size nbytes;
2359  XLogSegNo segno;
2360 
2361 retry:
2362  p = buf;
2363  recptr = startptr;
2364  nbytes = count;
2365 
2366  while (nbytes > 0)
2367  {
2368  uint32 startoff;
2369  int segbytes;
2370  int readbytes;
2371 
2372  startoff = XLogSegmentOffset(recptr, wal_segment_size);
2373 
2374  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
2375  {
2376  char path[MAXPGPATH];
2377 
2378  /* Switch to another logfile segment */
2379  if (sendFile >= 0)
2380  close(sendFile);
2381 
2383 
2384  /*-------
2385  * When reading from a historic timeline, and there is a timeline
2386  * switch within this segment, read from the WAL segment belonging
2387  * to the new timeline.
2388  *
2389  * For example, imagine that this server is currently on timeline
2390  * 5, and we're streaming timeline 4. The switch from timeline 4
2391  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2392  *
2393  * ...
2394  * 000000040000000000000012
2395  * 000000040000000000000013
2396  * 000000050000000000000013
2397  * 000000050000000000000014
2398  * ...
2399  *
2400  * In this situation, when requested to send the WAL from
2401  * segment 0x13, on timeline 4, we read the WAL from file
2402  * 000000050000000000000013. Archive recovery prefers files from
2403  * newer timelines, so if the segment was restored from the
2404  * archive on this server, the file belonging to the old timeline,
2405  * 000000040000000000000013, might not exist. Their contents are
2406  * equal up to the switchpoint, because at a timeline switch, the
2407  * used portion of the old segment is copied to the new file.
2408  *-------
2409  */
2412  {
2413  XLogSegNo endSegNo;
2414 
2416  if (sendSegNo == endSegNo)
2418  }
2419 
2421 
2422  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2423  if (sendFile < 0)
2424  {
2425  /*
2426  * If the file is not found, assume it's because the standby
2427  * asked for a too old WAL segment that has already been
2428  * removed or recycled.
2429  */
2430  if (errno == ENOENT)
2431  ereport(ERROR,
2433  errmsg("requested WAL segment %s has already been removed",
2435  else
2436  ereport(ERROR,
2438  errmsg("could not open file \"%s\": %m",
2439  path)));
2440  }
2441  sendOff = 0;
2442  }
2443 
2444  /* Need to seek in the file? */
2445  if (sendOff != startoff)
2446  {
2447  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2448  ereport(ERROR,
2450  errmsg("could not seek in log segment %s to offset %u: %m",
2452  startoff)));
2453  sendOff = startoff;
2454  }
2455 
2456  /* How many bytes are within this segment? */
2457  if (nbytes > (wal_segment_size - startoff))
2458  segbytes = wal_segment_size - startoff;
2459  else
2460  segbytes = nbytes;
2461 
2463  readbytes = read(sendFile, p, segbytes);
2465  if (readbytes < 0)
2466  {
2467  ereport(ERROR,
2469  errmsg("could not read from log segment %s, offset %u, length %zu: %m",
2471  sendOff, (Size) segbytes)));
2472  }
2473  else if (readbytes == 0)
2474  {
2475  ereport(ERROR,
2477  errmsg("could not read from log segment %s, offset %u: read %d of %zu",
2479  sendOff, readbytes, (Size) segbytes)));
2480  }
2481 
2482  /* Update state for read */
2483  recptr += readbytes;
2484 
2485  sendOff += readbytes;
2486  nbytes -= readbytes;
2487  p += readbytes;
2488  }
2489 
2490  /*
2491  * After reading into the buffer, check that what we read was valid. We do
2492  * this after reading, because even though the segment was present when we
2493  * opened it, it might get recycled or removed while we read it. The
2494  * read() succeeds in that case, but the data we tried to read might
2495  * already have been overwritten with new WAL records.
2496  */
2499 
2500  /*
2501  * During recovery, the currently-open WAL file might be replaced with the
2502  * file of the same name retrieved from archive. So we always need to
2503  * check what we read was valid after reading into the buffer. If it's
2504  * invalid, we try to open and read the file again.
2505  */
2507  {
2508  WalSnd *walsnd = MyWalSnd;
2509  bool reload;
2510 
2511  SpinLockAcquire(&walsnd->mutex);
2512  reload = walsnd->needreload;
2513  walsnd->needreload = false;
2514  SpinLockRelease(&walsnd->mutex);
2515 
2516  if (reload && sendFile >= 0)
2517  {
2518  close(sendFile);
2519  sendFile = -1;
2520 
2521  goto retry;
2522  }
2523  }
2524 }
int wal_segment_size
Definition: xlog.c:108
static int sendFile
Definition: walsender.c:135
int errcode(int sqlerrcode)
Definition: elog.c:570
#define PG_BINARY
Definition: c.h:1171
slock_t mutex
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3825
#define MAXPGPATH
static TimeLineID curFileTimeLine
Definition: walsender.c:140
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10163
static char * buf
Definition: pg_test_fsync.c:67
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1262
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:43
#define SpinLockRelease(lock)
Definition: spin.h:64
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeLineID ThisTimeLineID
Definition: xlog.c:183
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool needreload
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1238
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
static XLogSegNo sendSegNo
Definition: walsender.c:136
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:920
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:784
static uint32 sendOff
Definition: walsender.c:137
#define close(a)
Definition: win32.h:12
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
bool am_cascading_walsender
Definition: walsender.c:115
static XLogRecPtr startptr
Definition: basebackup.c:106
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2800 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2801 {
2802  XLogRecord *record;
2803  char *errm;
2804 
2805  /*
2806  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2807  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2808  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2809  * didn't wait - i.e. when we're shutting down.
2810  */
2811  WalSndCaughtUp = false;
2812 
2815 
2816  /* xlog record was invalid */
2817  if (errm != NULL)
2818  elog(ERROR, "%s", errm);
2819 
2820  if (record != NULL)
2821  {
2822  /* XXX: Note that logical decoding cannot be used while in recovery */
2823  XLogRecPtr flushPtr = GetFlushRecPtr();
2824 
2825  /*
2826  * Note the lack of any call to LagTrackerWrite() which is handled by
2827  * WalSndUpdateProgress which is called by output plugin through
2828  * logical decoding write api.
2829  */
2831 
2833 
2834  /*
2835  * If we have sent a record that is at or beyond the flushed point, we
2836  * have caught up.
2837  */
2838  if (sentPtr >= flushPtr)
2839  WalSndCaughtUp = true;
2840  }
2841  else
2842  {
2843  /*
2844  * If the record we just wanted read is at or beyond the flushed
2845  * point, then we're caught up.
2846  */
2848  {
2849  WalSndCaughtUp = true;
2850 
2851  /*
2852  * Have WalSndLoop() terminate the connection in an orderly
2853  * manner, after writing out all the pending data.
2854  */
2855  if (got_STOPPING)
2856  got_SIGUSR2 = true;
2857  }
2858  }
2859 
2860  /* Update shared memory status */
2861  {
2862  WalSnd *walsnd = MyWalSnd;
2863 
2864  SpinLockAcquire(&walsnd->mutex);
2865  walsnd->sentPtr = sentPtr;
2866  SpinLockRelease(&walsnd->mutex);
2867  }
2868 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8226
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:216
XLogRecPtr EndRecPtr
Definition: xlogreader.h:120
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:97
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:200
static XLogRecPtr logical_startptr
Definition: walsender.c:201
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:189
static bool WalSndCaughtUp
Definition: walsender.c:186
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogReaderState * reader
Definition: logical.h:44
#define elog(elevel,...)
Definition: elog.h:226

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 2537 of file walsender.c.

References am_cascading_walsender, Assert, close, StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendFile, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, startptr, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, and XLogRead().

Referenced by StartReplication().

2538 {
2539  XLogRecPtr SendRqstPtr;
2541  XLogRecPtr endptr;
2542  Size nbytes;
2543 
2544  /* If requested switch the WAL sender to the stopping state. */
2545  if (got_STOPPING)
2547 
2549  {
2550  WalSndCaughtUp = true;
2551  return;
2552  }
2553 
2554  /* Figure out how far we can safely send the WAL. */
2556  {
2557  /*
2558  * Streaming an old timeline that's in this server's history, but is
2559  * not the one we're currently inserting or replaying. It can be
2560  * streamed up to the point where we switched off that timeline.
2561  */
2562  SendRqstPtr = sendTimeLineValidUpto;
2563  }
2564  else if (am_cascading_walsender)
2565  {
2566  /*
2567  * Streaming the latest timeline on a standby.
2568  *
2569  * Attempt to send all WAL that has already been replayed, so that we
2570  * know it's valid. If we're receiving WAL through streaming
2571  * replication, it's also OK to send any WAL that has been received
2572  * but not replayed.
2573  *
2574  * The timeline we're recovering from can change, or we can be
2575  * promoted. In either case, the current timeline becomes historic. We
2576  * need to detect that so that we don't try to stream past the point
2577  * where we switched to another timeline. We check for promotion or
2578  * timeline switch after calculating FlushPtr, to avoid a race
2579  * condition: if the timeline becomes historic just after we checked
2580  * that it was still current, it's still be OK to stream it up to the
2581  * FlushPtr that was calculated befor