PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void XLogRead (char *buf, XLogRecPtr startptr, Size count)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static int sendFile = -1
 
static XLogSegNo sendSegNo = 0
 
static uint32 sendOff = 0
 
static TimeLineID curFileTimeLine = 0
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 211 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 105 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 229 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 852 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

853 {
854  const char *snapshot_name = NULL;
855  char xloc[MAXFNAMELEN];
856  char *slot_name;
857  bool reserve_wal = false;
858  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
860  TupOutputState *tstate;
861  TupleDesc tupdesc;
862  Datum values[4];
863  bool nulls[4];
864 
866 
867  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
868 
869  /* setup state for XLogReadPage */
870  sendTimeLineIsHistoric = false;
872 
873  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
874  {
875  ReplicationSlotCreate(cmd->slotname, false,
877  }
878  else
879  {
881 
882  /*
883  * Initially create persistent slot as ephemeral - that allows us to
884  * nicely handle errors during initialization because it'll get
885  * dropped if this transaction fails. We'll make it persistent at the
886  * end. Temporary slots can be created as temporary from beginning as
887  * they get dropped on error as well.
888  */
889  ReplicationSlotCreate(cmd->slotname, true,
891  }
892 
893  if (cmd->kind == REPLICATION_KIND_LOGICAL)
894  {
896  bool need_full_snapshot = false;
897 
898  /*
899  * Do options check early so that we can bail before calling the
900  * DecodingContextFindStartpoint which can take long time.
901  */
902  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
903  {
904  if (IsTransactionBlock())
905  ereport(ERROR,
906  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
907  (errmsg("%s must not be called inside a transaction",
908  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
909 
910  need_full_snapshot = true;
911  }
912  else if (snapshot_action == CRS_USE_SNAPSHOT)
913  {
914  if (!IsTransactionBlock())
915  ereport(ERROR,
916  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
917  (errmsg("%s must be called inside a transaction",
918  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
919 
921  ereport(ERROR,
922  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
923  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
924  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
925 
926  if (FirstSnapshotSet)
927  ereport(ERROR,
928  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
929  (errmsg("%s must be called before any query",
930  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
931 
932  if (IsSubTransaction())
933  ereport(ERROR,
934  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
935  (errmsg("%s must not be called in a subtransaction",
936  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
937 
938  need_full_snapshot = true;
939  }
940 
941  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
946 
947  /*
948  * Signal that we don't need the timeout mechanism. We're just
949  * creating the replication slot and don't yet accept feedback
950  * messages or send keepalives. As we possibly need to wait for
951  * further WAL the walsender would otherwise possibly be killed too
952  * soon.
953  */
955 
956  /* build initial snapshot, might take a while */
958 
959  /*
960  * Export or use the snapshot if we've been asked to do so.
961  *
962  * NB. We will convert the snapbuild.c kind of snapshot to normal
963  * snapshot when doing this.
964  */
965  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
966  {
967  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
968  }
969  else if (snapshot_action == CRS_USE_SNAPSHOT)
970  {
971  Snapshot snap;
972 
975  }
976 
977  /* don't need the decoding context anymore */
978  FreeDecodingContext(ctx);
979 
980  if (!cmd->temporary)
982  }
983  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
984  {
986 
988 
989  /* Write this slot to disk if it's a permanent one. */
990  if (!cmd->temporary)
992  }
993 
994  snprintf(xloc, sizeof(xloc), "%X/%X",
997 
999  MemSet(nulls, false, sizeof(nulls));
1000 
1001  /*----------
1002  * Need a tuple descriptor representing four columns:
1003  * - first field: the slot name
1004  * - second field: LSN at which we became consistent
1005  * - third field: exported snapshot's name
1006  * - fourth field: output plugin
1007  *----------
1008  */
1009  tupdesc = CreateTemplateTupleDesc(4);
1010  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1011  TEXTOID, -1, 0);
1012  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1013  TEXTOID, -1, 0);
1014  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1015  TEXTOID, -1, 0);
1016  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1017  TEXTOID, -1, 0);
1018 
1019  /* prepare for projection of tuples */
1020  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1021 
1022  /* slot_name */
1023  slot_name = NameStr(MyReplicationSlot->data.name);
1024  values[0] = CStringGetTextDatum(slot_name);
1025 
1026  /* consistent wal location */
1027  values[1] = CStringGetTextDatum(xloc);
1028 
1029  /* snapshot name, or NULL if none */
1030  if (snapshot_name != NULL)
1031  values[2] = CStringGetTextDatum(snapshot_name);
1032  else
1033  nulls[2] = true;
1034 
1035  /* plugin, or NULL if none */
1036  if (cmd->plugin != NULL)
1037  values[3] = CStringGetTextDatum(cmd->plugin);
1038  else
1039  nulls[3] = true;
1040 
1041  /* send it to dest */
1042  do_tup_output(tstate, values, nulls);
1043  end_tup_output(tstate);
1044 
1046 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:799
PGPROC * MyProc
Definition: proc.c:68
#define XACT_REPEATABLE_READ
Definition: xact.h:38
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2225
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2252
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define MemSet(start, val, len)
Definition: c.h:955
void ReplicationSlotSave(void)
Definition: slot.c:645
ReplicationSlotPersistentData data
Definition: slot.h:132
XLogRecPtr confirmed_flush
Definition: slot.h:80
ReplicationKind kind
Definition: replnodes.h:56
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:765
bool IsTransactionBlock(void)
Definition: xact.c:4609
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:636
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:466
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2310
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:205
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2232
void ReplicationSlotPersist(void)
Definition: slot.c:680
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1179
unsigned int uint32
Definition: c.h:358
void ReplicationSlotRelease(void)
Definition: slot.c:424
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1152
#define MAXFNAMELEN
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:231
uintptr_t Datum
Definition: postgres.h:367
TimeLineID ThisTimeLineID
Definition: xlog.c:187
struct SnapBuild * snapshot_builder
Definition: logical.h:44
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:148
#define Assert(condition)
Definition: c.h:732
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:512
int XactIsoLevel
Definition: xact.c:75
bool IsSubTransaction(void)
Definition: xact.c:4682
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:544
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:784
CRSSnapshotAction
Definition: walsender.h:20
#define NameStr(name)
Definition: c.h:609
#define CStringGetTextDatum(s)
Definition: builtins.h:83
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:79
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:171
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1268

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1052 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1053 {
1054  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1055  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1056 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1433 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, generate_unaccent_rules::dest, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_IDLE, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1434 {
1435  int parse_rc;
1436  Node *cmd_node;
1437  MemoryContext cmd_context;
1438  MemoryContext old_context;
1439 
1440  /*
1441  * If WAL sender has been told that shutdown is getting close, switch its
1442  * status accordingly to handle the next replication commands correctly.
1443  */
1444  if (got_STOPPING)
1446 
1447  /*
1448  * Throw error if in stopping mode. We need prevent commands that could
1449  * generate WAL while the shutdown checkpoint is being written. To be
1450  * safe, we just prohibit all new commands.
1451  */
1453  ereport(ERROR,
1454  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1455 
1456  /*
1457  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1458  * command arrives. Clean up the old stuff if there's anything.
1459  */
1461 
1463 
1465  "Replication command context",
1467  old_context = MemoryContextSwitchTo(cmd_context);
1468 
1469  replication_scanner_init(cmd_string);
1470  parse_rc = replication_yyparse();
1471  if (parse_rc != 0)
1472  ereport(ERROR,
1473  (errcode(ERRCODE_SYNTAX_ERROR),
1474  (errmsg_internal("replication command parser returned %d",
1475  parse_rc))));
1476 
1477  cmd_node = replication_parse_result;
1478 
1479  /*
1480  * Log replication command if log_replication_commands is enabled. Even
1481  * when it's disabled, log the command with DEBUG1 level for backward
1482  * compatibility. Note that SQL commands are not logged here, and will be
1483  * logged later if log_statement is enabled.
1484  */
1485  if (cmd_node->type != T_SQLCmd)
1487  (errmsg("received replication command: %s", cmd_string)));
1488 
1489  /*
1490  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1491  * called outside of transaction the snapshot should be cleared here.
1492  */
1493  if (!IsTransactionBlock())
1495 
1496  /*
1497  * For aborted transactions, don't allow anything except pure SQL, the
1498  * exec_simple_query() will handle it correctly.
1499  */
1500  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1501  ereport(ERROR,
1502  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1503  errmsg("current transaction is aborted, "
1504  "commands ignored until end of transaction block")));
1505 
1507 
1508  /*
1509  * Allocate buffers that will be used for each outgoing and incoming
1510  * message. We do this just once per command to reduce palloc overhead.
1511  */
1515 
1516  /* Report to pgstat that this process is running */
1518 
1519  switch (cmd_node->type)
1520  {
1521  case T_IdentifySystemCmd:
1522  IdentifySystem();
1523  break;
1524 
1525  case T_BaseBackupCmd:
1526  PreventInTransactionBlock(true, "BASE_BACKUP");
1527  SendBaseBackup((BaseBackupCmd *) cmd_node);
1528  break;
1529 
1532  break;
1533 
1536  break;
1537 
1538  case T_StartReplicationCmd:
1539  {
1540  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1541 
1542  PreventInTransactionBlock(true, "START_REPLICATION");
1543 
1544  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1545  StartReplication(cmd);
1546  else
1548  break;
1549  }
1550 
1551  case T_TimeLineHistoryCmd:
1552  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1554  break;
1555 
1556  case T_VariableShowStmt:
1557  {
1559  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1560 
1561  /* syscache access needs a transaction environment */
1563  GetPGVariable(n->name, dest);
1565  }
1566  break;
1567 
1568  case T_SQLCmd:
1569  if (MyDatabaseId == InvalidOid)
1570  ereport(ERROR,
1571  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1572 
1573  /* Report to pgstat that this process is now idle */
1575 
1576  /* Tell the caller that this wasn't a WalSender command. */
1577  return false;
1578 
1579  default:
1580  elog(ERROR, "unrecognized replication command node tag: %u",
1581  cmd_node->type);
1582  }
1583 
1584  /* done */
1585  MemoryContextSwitchTo(old_context);
1586  MemoryContextDelete(cmd_context);
1587 
1588  /* Send CommandComplete message */
1589  EndCommand("SELECT", DestRemote);
1590 
1591  /* Report to pgstat that this process is now idle */
1593 
1594  return true;
1595 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:575
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:169
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3121
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:434
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1052
void CommitTransactionCommand(void)
Definition: xact.c:2895
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:376
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:524
static StringInfoData output_message
Definition: walsender.c:160
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8738
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4609
ReplicationKind kind
Definition: replnodes.h:82
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:746
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:191
WalSndState state
NodeTag type
Definition: nodes.h:526
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:540
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3328
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define ereport(elevel, rest)
Definition: elog.h:141
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static StringInfoData reply_message
Definition: walsender.c:161
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1063
Oid MyDatabaseId
Definition: globals.c:85
WalSnd * MyWalSnd
Definition: walsender.c:111
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:814
void WalSndSetState(WalSndState state)
Definition: walsender.c:3177
void StartTransactionCommand(void)
Definition: xact.c:2794
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:852
static StringInfoData tmpbuf
Definition: walsender.c:162
bool log_replication_commands
Definition: walsender.c:124
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void IdentifySystem(void)
Definition: walsender.c:345
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:697
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2933 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2934 {
2935  XLogRecPtr replayPtr;
2936  TimeLineID replayTLI;
2937  XLogRecPtr receivePtr;
2939  XLogRecPtr result;
2940 
2941  /*
2942  * We can safely send what's already been replayed. Also, if walreceiver
2943  * is streaming WAL from the same timeline, we can send anything that it
2944  * has streamed, but hasn't been replayed yet.
2945  */
2946 
2947  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2948  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2949 
2950  ThisTimeLineID = replayTLI;
2951 
2952  result = replayPtr;
2953  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2954  result = receivePtr;
2955 
2956  return result;
2957 }
uint32 TimeLineID
Definition: xlogdefs.h:52
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11130
static TimeLineID receiveTLI
Definition: xlog.c:209
TimeLineID ThisTimeLineID
Definition: xlog.c:187
uint64 XLogRecPtr
Definition: xlogdefs.h:21

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 2986 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

2987 {
2989 
2990  /*
2991  * If replication has not yet started, die like with SIGTERM. If
2992  * replication is active, only set a flag and wake up the main loop. It
2993  * will send any outstanding WAL, wait for it to be replicated to the
2994  * standby, and then exit gracefully.
2995  */
2996  if (!replication_active)
2997  kill(MyProcPid, SIGTERM);
2998  else
2999  got_STOPPING = true;
3000 }
int MyProcPid
Definition: globals.c:40
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
#define kill(pid, sig)
Definition: win32_port.h:435
bool am_walsender
Definition: walsender.c:114
static volatile sig_atomic_t replication_active
Definition: walsender.c:198
#define Assert(condition)
Definition: c.h:732

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 345 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

346 {
347  char sysid[32];
348  char xloc[MAXFNAMELEN];
349  XLogRecPtr logptr;
350  char *dbname = NULL;
352  TupOutputState *tstate;
353  TupleDesc tupdesc;
354  Datum values[4];
355  bool nulls[4];
356 
357  /*
358  * Reply with a result set with one row, four columns. First col is system
359  * ID, second is timeline ID, third is current xlog location and the
360  * fourth contains the database name if we are connected to one.
361  */
362 
363  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
365 
368  {
369  /* this also updates ThisTimeLineID */
370  logptr = GetStandbyFlushRecPtr();
371  }
372  else
373  logptr = GetFlushRecPtr();
374 
375  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
376 
377  if (MyDatabaseId != InvalidOid)
378  {
380 
381  /* syscache access needs a transaction env. */
383  /* make dbname live outside TX context */
387  /* CommitTransactionCommand switches to TopMemoryContext */
389  }
390 
392  MemSet(nulls, false, sizeof(nulls));
393 
394  /* need a tuple descriptor representing four columns */
395  tupdesc = CreateTemplateTupleDesc(4);
396  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
397  TEXTOID, -1, 0);
398  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
399  INT4OID, -1, 0);
400  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
401  TEXTOID, -1, 0);
402  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
403  TEXTOID, -1, 0);
404 
405  /* prepare for projection of tuples */
406  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
407 
408  /* column 1: system identifier */
409  values[0] = CStringGetTextDatum(sysid);
410 
411  /* column 2: timeline */
412  values[1] = Int32GetDatum(ThisTimeLineID);
413 
414  /* column 3: wal location */
415  values[2] = CStringGetTextDatum(xloc);
416 
417  /* column 4: database name, or NULL if none */
418  if (dbname)
419  values[3] = CStringGetTextDatum(dbname);
420  else
421  nulls[3] = true;
422 
423  /* send it to dest */
424  do_tup_output(tstate, values, nulls);
425 
426  end_tup_output(tstate);
427 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void CommitTransactionCommand(void)
Definition: xact.c:2895
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2252
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:955
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8230
bool RecoveryInProgress(void)
Definition: xlog.c:7898
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2310
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2232
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2099
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
unsigned int uint32
Definition: c.h:358
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:187
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2794
char * dbname
Definition: streamutil.c:52
static Datum values[MAXATTR]
Definition: bootstrap.c:167
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4802
#define Int32GetDatum(X)
Definition: postgres.h:479
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2933
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:401
bool am_cascading_walsender
Definition: walsender.c:115

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2281 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2282 {
2283  int i;
2284 
2285  /*
2286  * WalSndCtl should be set up already (we inherit this by fork() or
2287  * EXEC_BACKEND mechanism from the postmaster).
2288  */
2289  Assert(WalSndCtl != NULL);
2290  Assert(MyWalSnd == NULL);
2291 
2292  /*
2293  * Find a free walsender slot and reserve it. This must not fail due to
2294  * the prior check for free WAL senders in InitProcess().
2295  */
2296  for (i = 0; i < max_wal_senders; i++)
2297  {
2298  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2299 
2300  SpinLockAcquire(&walsnd->mutex);
2301 
2302  if (walsnd->pid != 0)
2303  {
2304  SpinLockRelease(&walsnd->mutex);
2305  continue;
2306  }
2307  else
2308  {
2309  /*
2310  * Found a free slot. Reserve it for us.
2311  */
2312  walsnd->pid = MyProcPid;
2313  walsnd->sentPtr = InvalidXLogRecPtr;
2314  walsnd->write = InvalidXLogRecPtr;
2315  walsnd->flush = InvalidXLogRecPtr;
2316  walsnd->apply = InvalidXLogRecPtr;
2317  walsnd->writeLag = -1;
2318  walsnd->flushLag = -1;
2319  walsnd->applyLag = -1;
2320  walsnd->state = WALSNDSTATE_STARTUP;
2321  walsnd->latch = &MyProc->procLatch;
2322  walsnd->replyTime = 0;
2323  SpinLockRelease(&walsnd->mutex);
2324  /* don't need the lock anymore */
2325  MyWalSnd = (WalSnd *) walsnd;
2326 
2327  break;
2328  }
2329  }
2330 
2331  Assert(MyWalSnd != NULL);
2332 
2333  /* Arrange to clean up at walsender exit */
2335 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:40
PGPROC * MyProc
Definition: proc.c:68
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2339
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:732
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3529 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3530 {
3531  TimestampTz time = 0;
3532 
3533  /* Read all unread samples up to this LSN or end of buffer. */
3534  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3535  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3536  {
3537  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3538  lag_tracker->last_read[head] =
3540  lag_tracker->read_heads[head] =
3542  }
3543 
3544  /*
3545  * If the lag tracker is empty, that means the standby has processed
3546  * everything we've ever sent so we should now clear 'last_read'. If we
3547  * didn't do that, we'd risk using a stale and irrelevant sample for
3548  * interpolation at the beginning of the next burst of WAL after a period
3549  * of idleness.
3550  */
3552  lag_tracker->last_read[head].time = 0;
3553 
3554  if (time > now)
3555  {
3556  /* If the clock somehow went backwards, treat as not found. */
3557  return -1;
3558  }
3559  else if (time == 0)
3560  {
3561  /*
3562  * We didn't cross a time. If there is a future sample that we
3563  * haven't reached yet, and we've already reached at least one sample,
3564  * let's interpolate the local flushed time. This is mainly useful
3565  * for reporting a completely stuck apply position as having
3566  * increasing lag, since otherwise we'd have to wait for it to
3567  * eventually start moving again and cross one of our samples before
3568  * we can show the lag increasing.
3569  */
3571  {
3572  /* There are no future samples, so we can't interpolate. */
3573  return -1;
3574  }
3575  else if (lag_tracker->last_read[head].time != 0)
3576  {
3577  /* We can interpolate between last_read and the next sample. */
3578  double fraction;
3579  WalTimeSample prev = lag_tracker->last_read[head];
3581 
3582  if (lsn < prev.lsn)
3583  {
3584  /*
3585  * Reported LSNs shouldn't normally go backwards, but it's
3586  * possible when there is a timeline change. Treat as not
3587  * found.
3588  */
3589  return -1;
3590  }
3591 
3592  Assert(prev.lsn < next.lsn);
3593 
3594  if (prev.time > next.time)
3595  {
3596  /* If the clock somehow went backwards, treat as not found. */
3597  return -1;
3598  }
3599 
3600  /* See how far we are between the previous and next samples. */
3601  fraction =
3602  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3603 
3604  /* Scale the local flush time proportionally. */
3605  time = (TimestampTz)
3606  ((double) prev.time + (next.time - prev.time) * fraction);
3607  }
3608  else
3609  {
3610  /*
3611  * We have only a future sample, implying that we were entirely
3612  * caught up but and now there is a new burst of WAL and the
3613  * standby hasn't processed the first sample yet. Until the
3614  * standby reaches the future sample the best we can do is report
3615  * the hypothetical lag if that sample were to be replayed now.
3616  */
3617  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3618  }
3619  }
3620 
3621  /* Return the elapsed time since local flush time in microseconds. */
3622  Assert(time != 0);
3623  return now - time;
3624 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:217
static int32 next
Definition: blutils.c:213
int write_head
Definition: walsender.c:218
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:220
TimestampTz time
Definition: walsender.c:207
static LagTracker * lag_tracker
Definition: walsender.c:223
XLogRecPtr lsn
Definition: walsender.c:206
#define Assert(condition)
Definition: c.h:732
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:219
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:211
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3464 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3465 {
3466  bool buffer_full;
3467  int new_write_head;
3468  int i;
3469 
3470  if (!am_walsender)
3471  return;
3472 
3473  /*
3474  * If the lsn hasn't advanced since last time, then do nothing. This way
3475  * we only record a new sample when new WAL has been written.
3476  */
3477  if (lag_tracker->last_lsn == lsn)
3478  return;
3479  lag_tracker->last_lsn = lsn;
3480 
3481  /*
3482  * If advancing the write head of the circular buffer would crash into any
3483  * of the read heads, then the buffer is full. In other words, the
3484  * slowest reader (presumably apply) is the one that controls the release
3485  * of space.
3486  */
3487  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3488  buffer_full = false;
3489  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3490  {
3491  if (new_write_head == lag_tracker->read_heads[i])
3492  buffer_full = true;
3493  }
3494 
3495  /*
3496  * If the buffer is full, for now we just rewind by one slot and overwrite
3497  * the last sample, as a simple (if somewhat uneven) way to lower the
3498  * sampling rate. There may be better adaptive compaction algorithms.
3499  */
3500  if (buffer_full)
3501  {
3502  new_write_head = lag_tracker->write_head;
3503  if (lag_tracker->write_head > 0)
3505  else
3507  }
3508 
3509  /* Store a sample at the current write head position. */
3511  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3512  lag_tracker->write_head = new_write_head;
3513 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:217
int write_head
Definition: walsender.c:218
TimestampTz time
Definition: walsender.c:207
static LagTracker * lag_tracker
Definition: walsender.c:223
bool am_walsender
Definition: walsender.c:114
XLogRecPtr lsn
Definition: walsender.c:206
XLogRecPtr last_lsn
Definition: walsender.c:216
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:219
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:211
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page,
TimeLineID pageTLI 
)
static

Definition at line 765 of file walsender.c.

References XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WalSndWaitForWal(), XLogRead(), and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

767 {
768  XLogRecPtr flushptr;
769  int count;
770 
771  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
773  sendTimeLine = state->currTLI;
775  sendTimeLineNextTLI = state->nextTLI;
776 
777  /* make sure we have enough WAL available */
778  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
779 
780  /* fail if not (implies we are going to shut down) */
781  if (flushptr < targetPagePtr + reqLen)
782  return -1;
783 
784  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
785  count = XLOG_BLCKSZ; /* more than one block available */
786  else
787  count = flushptr - targetPagePtr; /* part of the page available */
788 
789  /* now actually read the data, we know it's there */
790  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
791 
792  return count;
793 }
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:803
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:184
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2367
TimeLineID nextTLI
Definition: xlogreader.h:190
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1294
TimeLineID ThisTimeLineID
Definition: xlog.c:187
TimeLineID currTLI
Definition: xlogreader.h:174
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
static bool sendTimeLineIsHistoric
Definition: walsender.c:150

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3215 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3216 {
3217  Interval *result = palloc(sizeof(Interval));
3218 
3219  result->month = 0;
3220  result->day = 0;
3221  result->time = offset;
3222 
3223  return result;
3224 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:924

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 799 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

802 {
803  ListCell *lc;
804  bool snapshot_action_given = false;
805  bool reserve_wal_given = false;
806 
807  /* Parse options */
808  foreach(lc, cmd->options)
809  {
810  DefElem *defel = (DefElem *) lfirst(lc);
811 
812  if (strcmp(defel->defname, "export_snapshot") == 0)
813  {
814  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
815  ereport(ERROR,
816  (errcode(ERRCODE_SYNTAX_ERROR),
817  errmsg("conflicting or redundant options")));
818 
819  snapshot_action_given = true;
820  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
822  }
823  else if (strcmp(defel->defname, "use_snapshot") == 0)
824  {
825  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
826  ereport(ERROR,
827  (errcode(ERRCODE_SYNTAX_ERROR),
828  errmsg("conflicting or redundant options")));
829 
830  snapshot_action_given = true;
831  *snapshot_action = CRS_USE_SNAPSHOT;
832  }
833  else if (strcmp(defel->defname, "reserve_wal") == 0)
834  {
835  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
836  ereport(ERROR,
837  (errcode(ERRCODE_SYNTAX_ERROR),
838  errmsg("conflicting or redundant options")));
839 
840  reserve_wal_given = true;
841  *reserve_wal = true;
842  }
843  else
844  elog(ERROR, "unrecognized option: %s", defel->defname);
845  }
846 }
int errcode(int sqlerrcode)
Definition: elog.c:570
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
char * defname
Definition: parsenodes.h:730

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3231 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, IntervalPGetDatum, is_member_of_role(), IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3232 {
3233 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3234  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3235  TupleDesc tupdesc;
3236  Tuplestorestate *tupstore;
3237  MemoryContext per_query_ctx;
3238  MemoryContext oldcontext;
3239  List *sync_standbys;
3240  int i;
3241 
3242  /* check to see if caller supports us returning a tuplestore */
3243  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3244  ereport(ERROR,
3245  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3246  errmsg("set-valued function called in context that cannot accept a set")));
3247  if (!(rsinfo->allowedModes & SFRM_Materialize))
3248  ereport(ERROR,
3249  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3250  errmsg("materialize mode required, but it is not " \
3251  "allowed in this context")));
3252 
3253  /* Build a tuple descriptor for our result type */
3254  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3255  elog(ERROR, "return type must be a row type");
3256 
3257  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3258  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3259 
3260  tupstore = tuplestore_begin_heap(true, false, work_mem);
3261  rsinfo->returnMode = SFRM_Materialize;
3262  rsinfo->setResult = tupstore;
3263  rsinfo->setDesc = tupdesc;
3264 
3265  MemoryContextSwitchTo(oldcontext);
3266 
3267  /*
3268  * Get the currently active synchronous standbys.
3269  */
3270  LWLockAcquire(SyncRepLock, LW_SHARED);
3271  sync_standbys = SyncRepGetSyncStandbys(NULL);
3272  LWLockRelease(SyncRepLock);
3273 
3274  for (i = 0; i < max_wal_senders; i++)
3275  {
3276  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3278  XLogRecPtr write;
3279  XLogRecPtr flush;
3280  XLogRecPtr apply;
3281  TimeOffset writeLag;
3282  TimeOffset flushLag;
3283  TimeOffset applyLag;
3284  int priority;
3285  int pid;
3287  TimestampTz replyTime;
3289  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3290 
3291  SpinLockAcquire(&walsnd->mutex);
3292  if (walsnd->pid == 0)
3293  {
3294  SpinLockRelease(&walsnd->mutex);
3295  continue;
3296  }
3297  pid = walsnd->pid;
3298  sentPtr = walsnd->sentPtr;
3299  state = walsnd->state;
3300  write = walsnd->write;
3301  flush = walsnd->flush;
3302  apply = walsnd->apply;
3303  writeLag = walsnd->writeLag;
3304  flushLag = walsnd->flushLag;
3305  applyLag = walsnd->applyLag;
3306  priority = walsnd->sync_standby_priority;
3307  replyTime = walsnd->replyTime;
3308  SpinLockRelease(&walsnd->mutex);
3309 
3310  memset(nulls, 0, sizeof(nulls));
3311  values[0] = Int32GetDatum(pid);
3312 
3313  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3314  {
3315  /*
3316  * Only superusers and members of pg_read_all_stats can see
3317  * details. Other users only get the pid value to know it's a
3318  * walsender, but no details.
3319  */
3320  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3321  }
3322  else
3323  {
3324  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3325 
3326  if (XLogRecPtrIsInvalid(sentPtr))
3327  nulls[2] = true;
3328  values[2] = LSNGetDatum(sentPtr);
3329 
3330  if (XLogRecPtrIsInvalid(write))
3331  nulls[3] = true;
3332  values[3] = LSNGetDatum(write);
3333 
3334  if (XLogRecPtrIsInvalid(flush))
3335  nulls[4] = true;
3336  values[4] = LSNGetDatum(flush);
3337 
3338  if (XLogRecPtrIsInvalid(apply))
3339  nulls[5] = true;
3340  values[5] = LSNGetDatum(apply);
3341 
3342  /*
3343  * Treat a standby such as a pg_basebackup background process
3344  * which always returns an invalid flush location, as an
3345  * asynchronous standby.
3346  */
3347  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3348 
3349  if (writeLag < 0)
3350  nulls[6] = true;
3351  else
3352  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3353 
3354  if (flushLag < 0)
3355  nulls[7] = true;
3356  else
3357  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3358 
3359  if (applyLag < 0)
3360  nulls[8] = true;
3361  else
3362  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3363 
3364  values[9] = Int32GetDatum(priority);
3365 
3366  /*
3367  * More easily understood version of standby state. This is purely
3368  * informational.
3369  *
3370  * In quorum-based sync replication, the role of each standby
3371  * listed in synchronous_standby_names can be changing very
3372  * frequently. Any standbys considered as "sync" at one moment can
3373  * be switched to "potential" ones at the next moment. So, it's
3374  * basically useless to report "sync" or "potential" as their sync
3375  * states. We report just "quorum" for them.
3376  */
3377  if (priority == 0)
3378  values[10] = CStringGetTextDatum("async");
3379  else if (list_member_int(sync_standbys, i))
3381  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3382  else
3383  values[10] = CStringGetTextDatum("potential");
3384 
3385  if (replyTime == 0)
3386  nulls[11] = true;
3387  else
3388  values[11] = TimestampTzGetDatum(replyTime);
3389  }
3390 
3391  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3392  }
3393 
3394  /* clean up and return the tuplestore */
3395  tuplestore_donestoring(tupstore);
3396 
3397  return (Datum) 0;
3398 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:575
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:380
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:955
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:681
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
bool list_member_int(const List *list, int datum)
Definition: list.c:654
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:141
int max_wal_senders
Definition: walsender.c:120
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3196
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:121
static XLogRecPtr sentPtr
Definition: walsender.c:157
int allowedModes
Definition: execnodes.h:303
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4932
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:305
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:231
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:308
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:301
#define Int32GetDatum(X)
Definition: postgres.h:479
TupleDesc setDesc
Definition: execnodes.h:309
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:83
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3215
XLogRecPtr apply
Definition: pg_list.h:50

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1737 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1738 {
1739  bool changed = false;
1741 
1742  Assert(lsn != InvalidXLogRecPtr);
1743  SpinLockAcquire(&slot->mutex);
1744  if (slot->data.restart_lsn != lsn)
1745  {
1746  changed = true;
1747  slot->data.restart_lsn = lsn;
1748  }
1749  SpinLockRelease(&slot->mutex);
1750 
1751  if (changed)
1752  {
1755  }
1756 
1757  /*
1758  * One could argue that the slot should be saved to disk now, but that'd
1759  * be energy wasted - the worst lost information can do here is give us
1760  * wrong information in a statistics view - we'll just potentially be more
1761  * conservative in removing files.
1762  */
1763 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:132
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:732
XLogRecPtr restart_lsn
Definition: slot.h:72
slock_t mutex
Definition: slot.h:105
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1874 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1875 {
1876  bool changed = false;
1878 
1879  SpinLockAcquire(&slot->mutex);
1881 
1882  /*
1883  * For physical replication we don't need the interlock provided by xmin
1884  * and effective_xmin since the consequences of a missed increase are
1885  * limited to query cancellations, so set both at once.
1886  */
1887  if (!TransactionIdIsNormal(slot->data.xmin) ||
1888  !TransactionIdIsNormal(feedbackXmin) ||
1889  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1890  {
1891  changed = true;
1892  slot->data.xmin = feedbackXmin;
1893  slot->effective_xmin = feedbackXmin;
1894  }
1895  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1896  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1897  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1898  {
1899  changed = true;
1900  slot->data.catalog_xmin = feedbackCatalogXmin;
1901  slot->effective_catalog_xmin = feedbackCatalogXmin;
1902  }
1903  SpinLockRelease(&slot->mutex);
1904 
1905  if (changed)
1906  {
1909  }
1910 }
TransactionId xmin
Definition: proc.h:228
ReplicationSlotPersistentData data
Definition: slot.h:132
PGXACT * MyPgXact
Definition: proc.c:69
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:128
TransactionId catalog_xmin
Definition: slot.h:69
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:61
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:129
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:105
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1602 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1603 {
1604  unsigned char firstchar;
1605  int r;
1606  bool received = false;
1607 
1609 
1610  for (;;)
1611  {
1612  pq_startmsgread();
1613  r = pq_getbyte_if_available(&firstchar);
1614  if (r < 0)
1615  {
1616  /* unexpected error or EOF */
1618  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1619  errmsg("unexpected EOF on standby connection")));
1620  proc_exit(0);
1621  }
1622  if (r == 0)
1623  {
1624  /* no data available without blocking */
1625  pq_endmsgread();
1626  break;
1627  }
1628 
1629  /* Read the message contents */
1631  if (pq_getmessage(&reply_message, 0))
1632  {
1634  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1635  errmsg("unexpected EOF on standby connection")));
1636  proc_exit(0);
1637  }
1638 
1639  /*
1640  * If we already received a CopyDone from the frontend, the frontend
1641  * should not send us anything until we've closed our end of the COPY.
1642  * XXX: In theory, the frontend could already send the next command
1643  * before receiving the CopyDone, but libpq doesn't currently allow
1644  * that.
1645  */
1646  if (streamingDoneReceiving && firstchar != 'X')
1647  ereport(FATAL,
1648  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1649  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1650  firstchar)));
1651 
1652  /* Handle the very limited subset of commands expected in this phase */
1653  switch (firstchar)
1654  {
1655  /*
1656  * 'd' means a standby reply wrapped in a CopyData packet.
1657  */
1658  case 'd':
1660  received = true;
1661  break;
1662 
1663  /*
1664  * CopyDone means the standby requested to finish streaming.
1665  * Reply with CopyDone, if we had not sent that already.
1666  */
1667  case 'c':
1668  if (!streamingDoneSending)
1669  {
1670  pq_putmessage_noblock('c', NULL, 0);
1671  streamingDoneSending = true;
1672  }
1673 
1674  streamingDoneReceiving = true;
1675  received = true;
1676  break;
1677 
1678  /*
1679  * 'X' means that the standby is closing down the socket.
1680  */
1681  case 'X':
1682  proc_exit(0);
1683 
1684  default:
1685  ereport(FATAL,
1686  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1687  errmsg("invalid standby message type \"%c\"",
1688  firstchar)));
1689  }
1690  }
1691 
1692  /*
1693  * Save the last reply timestamp if we've received at least one reply.
1694  */
1695  if (received)
1696  {
1698  waiting_for_ping_response = false;
1699  }
1700 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1706
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
static TimestampTz last_processing
Definition: walsender.c:165
void pq_startmsgread(void)
Definition: pqcomm.c:1211
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1041
static bool streamingDoneSending
Definition: walsender.c:182
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:141
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1273
static StringInfoData reply_message
Definition: walsender.c:161
void pq_endmsgread(void)
Definition: pqcomm.c:1235
static bool streamingDoneReceiving
Definition: walsender.c:183
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:174
static TimestampTz last_reply_timestamp
Definition: walsender.c:171

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1954 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, log_min_messages, WalSnd::mutex, MyPgXact, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1955 {
1956  TransactionId feedbackXmin;
1957  uint32 feedbackEpoch;
1958  TransactionId feedbackCatalogXmin;
1959  uint32 feedbackCatalogEpoch;
1960  TimestampTz replyTime;
1961 
1962  /*
1963  * Decipher the reply message. The caller already consumed the msgtype
1964  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1965  * of this message.
1966  */
1967  replyTime = pq_getmsgint64(&reply_message);
1968  feedbackXmin = pq_getmsgint(&reply_message, 4);
1969  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1970  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1971  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1972 
1973  if (log_min_messages <= DEBUG2)
1974  {
1975  char *replyTimeStr;
1976 
1977  /* Copy because timestamptz_to_str returns a static buffer */
1978  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1979 
1980  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
1981  feedbackXmin,
1982  feedbackEpoch,
1983  feedbackCatalogXmin,
1984  feedbackCatalogEpoch,
1985  replyTimeStr);
1986 
1987  pfree(replyTimeStr);
1988  }
1989 
1990  /*
1991  * Update shared state for this WalSender process based on reply data from
1992  * standby.
1993  */
1994  {
1995  WalSnd *walsnd = MyWalSnd;
1996 
1997  SpinLockAcquire(&walsnd->mutex);
1998  walsnd->replyTime = replyTime;
1999  SpinLockRelease(&walsnd->mutex);
2000  }
2001 
2002  /*
2003  * Unset WalSender's xmins if the feedback message values are invalid.
2004  * This happens when the downstream turned hot_standby_feedback off.
2005  */
2006  if (!TransactionIdIsNormal(feedbackXmin)
2007  && !TransactionIdIsNormal(feedbackCatalogXmin))
2008  {
2010  if (MyReplicationSlot != NULL)
2011  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2012  return;
2013  }
2014 
2015  /*
2016  * Check that the provided xmin/epoch are sane, that is, not in the future
2017  * and not so far back as to be already wrapped around. Ignore if not.
2018  */
2019  if (TransactionIdIsNormal(feedbackXmin) &&
2020  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2021  return;
2022 
2023  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2024  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2025  return;
2026 
2027  /*
2028  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2029  * the xmin will be taken into account by GetOldestXmin. This will hold
2030  * back the removal of dead rows and thereby prevent the generation of
2031  * cleanup conflicts on the standby server.
2032  *
2033  * There is a small window for a race condition here: although we just
2034  * checked that feedbackXmin precedes nextXid, the nextXid could have
2035  * gotten advanced between our fetching it and applying the xmin below,
2036  * perhaps far enough to make feedbackXmin wrap around. In that case the
2037  * xmin we set here would be "in the future" and have no effect. No point
2038  * in worrying about this since it's too late to save the desired data
2039  * anyway. Assuming that the standby sends us an increasing sequence of
2040  * xmins, this could only happen during the first reply cycle, else our
2041  * own xmin would prevent nextXid from advancing so far.
2042  *
2043  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2044  * is assumed atomic, and there's no real need to prevent a concurrent
2045  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2046  * safe, and if we're moving it backwards, well, the data is at risk
2047  * already since a VACUUM could have just finished calling GetOldestXmin.)
2048  *
2049  * If we're using a replication slot we reserve the xmin via that,
2050  * otherwise via the walsender's PGXACT entry. We can only track the
2051  * catalog xmin separately when using a slot, so we store the least of the
2052  * two provided when not using a slot.
2053  *
2054  * XXX: It might make sense to generalize the ephemeral slot concept and
2055  * always use the slot mechanism to handle the feedback xmin.
2056  */
2057  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2058  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2059  else
2060  {
2061  if (TransactionIdIsNormal(feedbackCatalogXmin)
2062  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2063  MyPgXact->xmin = feedbackCatalogXmin;
2064  else
2065  MyPgXact->xmin = feedbackXmin;
2066  }
2067 }
uint32 TransactionId
Definition: c.h:507
TransactionId xmin
Definition: proc.h:228
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1161
slock_t mutex
PGXACT * MyPgXact
Definition: proc.c:69
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1923
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1031
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:358
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:161
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
int log_min_messages
Definition: guc.c:510
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:226
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1874
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1729

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1706 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1707 {
1708  char msgtype;
1709 
1710  /*
1711  * Check message type from the first byte.
1712  */
1713  msgtype = pq_getmsgbyte(&reply_message);
1714 
1715  switch (msgtype)
1716  {
1717  case 'r':
1719  break;
1720 
1721  case 'h':
1723  break;
1724 
1725  default:
1727  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1728  errmsg("unexpected message type \"%c\"", msgtype)));
1729  proc_exit(0);
1730  }
1731 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
static StringInfoData reply_message
Definition: walsender.c:161
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1769
int errmsg(const char *fmt,...)
Definition: elog.c:784
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1954

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1769 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), log_min_messages, LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1770 {
1771  XLogRecPtr writePtr,
1772  flushPtr,
1773  applyPtr;
1774  bool replyRequested;
1775  TimeOffset writeLag,
1776  flushLag,
1777  applyLag;
1778  bool clearLagTimes;
1779  TimestampTz now;
1780  TimestampTz replyTime;
1781 
1782  static bool fullyAppliedLastTime = false;
1783 
1784  /* the caller already consumed the msgtype byte */
1785  writePtr = pq_getmsgint64(&reply_message);
1786  flushPtr = pq_getmsgint64(&reply_message);
1787  applyPtr = pq_getmsgint64(&reply_message);
1788  replyTime = pq_getmsgint64(&reply_message);
1789  replyRequested = pq_getmsgbyte(&reply_message);
1790 
1791  if (log_min_messages <= DEBUG2)
1792  {
1793  char *replyTimeStr;
1794 
1795  /* Copy because timestamptz_to_str returns a static buffer */
1796  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1797 
1798  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1799  (uint32) (writePtr >> 32), (uint32) writePtr,
1800  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1801  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1802  replyRequested ? " (reply requested)" : "",
1803  replyTimeStr);
1804 
1805  pfree(replyTimeStr);
1806  }
1807 
1808  /* See if we can compute the round-trip lag for these positions. */
1809  now = GetCurrentTimestamp();
1810  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1811  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1812  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1813 
1814  /*
1815  * If the standby reports that it has fully replayed the WAL in two
1816  * consecutive reply messages, then the second such message must result
1817  * from wal_receiver_status_interval expiring on the standby. This is a
1818  * convenient time to forget the lag times measured when it last
1819  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1820  * until more WAL traffic arrives.
1821  */
1822  clearLagTimes = false;
1823  if (applyPtr == sentPtr)
1824  {
1825  if (fullyAppliedLastTime)
1826  clearLagTimes = true;
1827  fullyAppliedLastTime = true;
1828  }
1829  else
1830  fullyAppliedLastTime = false;
1831 
1832  /* Send a reply if the standby requested one. */
1833  if (replyRequested)
1834  WalSndKeepalive(false);
1835 
1836  /*
1837  * Update shared state for this WalSender process based on reply data from
1838  * standby.
1839  */
1840  {
1841  WalSnd *walsnd = MyWalSnd;
1842 
1843  SpinLockAcquire(&walsnd->mutex);
1844  walsnd->write = writePtr;
1845  walsnd->flush = flushPtr;
1846  walsnd->apply = applyPtr;
1847  if (writeLag != -1 || clearLagTimes)
1848  walsnd->writeLag = writeLag;
1849  if (flushLag != -1 || clearLagTimes)
1850  walsnd->flushLag = flushLag;
1851  if (applyLag != -1 || clearLagTimes)
1852  walsnd->applyLag = applyLag;
1853  walsnd->replyTime = replyTime;
1854  SpinLockRelease(&walsnd->mutex);
1855  }
1856 
1859 
1860  /*
1861  * Advance our local xmin horizon when the client confirmed a flush.
1862  */
1863  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1864  {
1867  else
1869  }
1870 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1161
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1031
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3406
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1737
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:358
#define SlotIsLogical(slot)
Definition: slot.h:154
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:161
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3529
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
int log_min_messages
Definition: guc.c:510
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1005
#define elog(elevel,...)
Definition: elog.h:226
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:411
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
bool am_cascading_walsender
Definition: walsender.c:115
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1729

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 434 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

435 {
437  char histfname[MAXFNAMELEN];
438  char path[MAXPGPATH];
439  int fd;
440  off_t histfilelen;
441  off_t bytesleft;
442  Size len;
443 
444  /*
445  * Reply with a result set with one row, and two columns. The first col is
446  * the name of the history file, 2nd is the contents.
447  */
448 
449  TLHistoryFileName(histfname, cmd->timeline);
450  TLHistoryFilePath(path, cmd->timeline);
451 
452  /* Send a RowDescription message */
453  pq_beginmessage(&buf, 'T');
454  pq_sendint16(&buf, 2); /* 2 fields */
455 
456  /* first field */
457  pq_sendstring(&buf, "filename"); /* col name */
458  pq_sendint32(&buf, 0); /* table oid */
459  pq_sendint16(&buf, 0); /* attnum */
460  pq_sendint32(&buf, TEXTOID); /* type oid */
461  pq_sendint16(&buf, -1); /* typlen */
462  pq_sendint32(&buf, 0); /* typmod */
463  pq_sendint16(&buf, 0); /* format code */
464 
465  /* second field */
466  pq_sendstring(&buf, "content"); /* col name */
467  pq_sendint32(&buf, 0); /* table oid */
468  pq_sendint16(&buf, 0); /* attnum */
469  pq_sendint32(&buf, BYTEAOID); /* type oid */
470  pq_sendint16(&buf, -1); /* typlen */
471  pq_sendint32(&buf, 0); /* typmod */
472  pq_sendint16(&buf, 0); /* format code */
473  pq_endmessage(&buf);
474 
475  /* Send a DataRow message */
476  pq_beginmessage(&buf, 'D');
477  pq_sendint16(&buf, 2); /* # of columns */
478  len = strlen(histfname);
479  pq_sendint32(&buf, len); /* col1 len */
480  pq_sendbytes(&buf, histfname, len);
481 
482  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
483  if (fd < 0)
484  ereport(ERROR,
486  errmsg("could not open file \"%s\": %m", path)));
487 
488  /* Determine file length and send it to client */
489  histfilelen = lseek(fd, 0, SEEK_END);
490  if (histfilelen < 0)
491  ereport(ERROR,
493  errmsg("could not seek to end of file \"%s\": %m", path)));
494  if (lseek(fd, 0, SEEK_SET) != 0)
495  ereport(ERROR,
497  errmsg("could not seek to beginning of file \"%s\": %m", path)));
498 
499  pq_sendint32(&buf, histfilelen); /* col2 len */
500 
501  bytesleft = histfilelen;
502  while (bytesleft > 0)
503  {
504  PGAlignedBlock rbuf;
505  int nread;
506 
508  nread = read(fd, rbuf.data, sizeof(rbuf));
510  if (nread < 0)
511  ereport(ERROR,
513  errmsg("could not read file \"%s\": %m",
514  path)));
515  else if (nread == 0)
516  ereport(ERROR,
518  errmsg("could not read file \"%s\": read %d of %zu",
519  path, nread, (Size) bytesleft)));
520 
521  pq_sendbytes(&buf, rbuf.data, nread);
522  bytesleft -= nread;
523  }
524 
525  if (CloseTransientFile(fd) != 0)
526  ereport(ERROR,
528  errmsg("could not close file \"%s\": %m", path)));
529 
530  pq_endmessage(&buf);
531 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
int errcode(int sqlerrcode)
Definition: elog.c:570
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1191
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1060
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2257
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:68
int errcode_for_file_access(void)
Definition: elog.c:593
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:44
int CloseTransientFile(int fd)
Definition: fd.c:2434
#define MAXFNAMELEN
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:784
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1063 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1064 {
1066 
1067  /* make sure that our requirements are still fulfilled */
1069 
1071 
1072  ReplicationSlotAcquire(cmd->slotname, true);
1073 
1074  /*
1075  * Force a disconnect, so that the decoding code doesn't need to care
1076  * about an eventual switch from running in recovery, to running in a
1077  * normal environment. Client code is expected to handle reconnects.
1078  */
1080  {
1081  ereport(LOG,
1082  (errmsg("terminating walsender process after promotion")));
1083  got_STOPPING = true;
1084  }
1085 
1086  /*
1087  * Create our decoding context, making it start at the previously ack'ed
1088  * position.
1089  *
1090  * Do this before sending a CopyBothResponse message, so that any errors
1091  * are reported early.
1092  */
1094  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1098 
1099 
1101 
1102  /* Send a CopyBothResponse message, and start streaming */
1103  pq_beginmessage(&buf, 'W');
1104  pq_sendbyte(&buf, 0);
1105  pq_sendint16(&buf, 0);
1106  pq_endmessage(&buf);
1107  pq_flush();
1108 
1109 
1110  /* Start reading WAL from the oldest required WAL. */
1112 
1113  /*
1114  * Report the location after which we'll send out further commits as the
1115  * current sentPtr.
1116  */
1118 
1119  /* Also update the sent position status in shared memory */
1123 
1124  replication_active = true;
1125 
1127 
1128  /* Main loop of walsender */
1130 
1133 
1134  replication_active = false;
1135  if (got_STOPPING)
1136  proc_exit(0);
1138 
1139  /* Get out of COPY mode (CommandComplete). */
1140  EndCommand("COPY 0", DestRemote);
1141 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
void proc_exit(int code)
Definition: ipc.c:104
ReplicationSlotPersistentData data
Definition: slot.h:132
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7898
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:80
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:765
#define SpinLockAcquire(lock)
Definition: spin.h:62
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:200
static char * buf
Definition: pg_test_fsync.c:68
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1179
static XLogRecPtr logical_startptr
Definition: walsender.c:201
void ReplicationSlotRelease(void)
Definition: slot.c:424
void SyncRepInitConfig(void)
Definition: syncrep.c:383
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2154
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1152
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:198
XLogRecPtr sentPtr
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:374
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:732
void WalSndSetState(WalSndState state)
Definition: walsender.c:3177
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:512
static void XLogSendLogical(void)
Definition: walsender.c:2813
XLogRecPtr restart_lsn
Definition: slot.h:72
int errmsg(const char *fmt,...)
Definition: elog.c:784
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:79
bool am_cascading_walsender
Definition: walsender.c:115
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1268

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 540 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

541 {
543  XLogRecPtr FlushPtr;
544 
545  if (ThisTimeLineID == 0)
546  ereport(ERROR,
547  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
549 
550  /*
551  * We assume here that we're logging enough information in the WAL for
552  * log-shipping, since this is checked in PostmasterMain().
553  *
554  * NOTE: wal_level can only change at shutdown, so in most cases it is
555  * difficult for there to be WAL data that we can still see that was
556  * written at wal_level='minimal'.
557  */
558 
559  if (cmd->slotname)
560  {
561  ReplicationSlotAcquire(cmd->slotname, true);
563  ereport(ERROR,
564  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
565  (errmsg("cannot use a logical replication slot for physical replication"))));
566  }
567 
568  /*
569  * Select the timeline. If it was given explicitly by the client, use
570  * that. Otherwise use the timeline of the last replayed record, which is
571  * kept in ThisTimeLineID.
572  */
574  {
575  /* this also updates ThisTimeLineID */
576  FlushPtr = GetStandbyFlushRecPtr();
577  }
578  else
579  FlushPtr = GetFlushRecPtr();
580 
581  if (cmd->timeline != 0)
582  {
583  XLogRecPtr switchpoint;
584 
585  sendTimeLine = cmd->timeline;
587  {
588  sendTimeLineIsHistoric = false;
590  }
591  else
592  {
593  List *timeLineHistory;
594 
595  sendTimeLineIsHistoric = true;
596 
597  /*
598  * Check that the timeline the client requested exists, and the
599  * requested start location is on that timeline.
600  */
601  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
602  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
604  list_free_deep(timeLineHistory);
605 
606  /*
607  * Found the requested timeline in the history. Check that
608  * requested startpoint is on that timeline in our history.
609  *
610  * This is quite loose on purpose. We only check that we didn't
611  * fork off the requested timeline before the switchpoint. We
612  * don't check that we switched *to* it before the requested
613  * starting point. This is because the client can legitimately
614  * request to start replication from the beginning of the WAL
615  * segment that contains switchpoint, but on the new timeline, so
616  * that it doesn't end up with a partial segment. If you ask for
617  * too old a starting point, you'll get an error later when we
618  * fail to find the requested WAL segment in pg_wal.
619  *
620  * XXX: we could be more strict here and only allow a startpoint
621  * that's older than the switchpoint, if it's still in the same
622  * WAL segment.
623  */
624  if (!XLogRecPtrIsInvalid(switchpoint) &&
625  switchpoint < cmd->startpoint)
626  {
627  ereport(ERROR,
628  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
629  (uint32) (cmd->startpoint >> 32),
630  (uint32) (cmd->startpoint),
631  cmd->timeline),
632  errdetail("This server's history forked from timeline %u at %X/%X.",
633  cmd->timeline,
634  (uint32) (switchpoint >> 32),
635  (uint32) (switchpoint))));
636  }
637  sendTimeLineValidUpto = switchpoint;
638  }
639  }
640  else
641  {
644  sendTimeLineIsHistoric = false;
645  }
646 
648 
649  /* If there is nothing to stream, don't even enter COPY mode */
651  {
652  /*
653  * When we first start replication the standby will be behind the
654  * primary. For some applications, for example synchronous
655  * replication, it is important to have a clear state for this initial
656  * catchup mode, so we can trigger actions when we change streaming
657  * state later. We may stay in this state for a long time, which is
658  * exactly why we want to be able to monitor whether or not we are
659  * still here.
660  */
662 
663  /* Send a CopyBothResponse message, and start streaming */
664  pq_beginmessage(&buf, 'W');
665  pq_sendbyte(&buf, 0);
666  pq_sendint16(&buf, 0);
667  pq_endmessage(&buf);
668  pq_flush();
669 
670  /*
671  * Don't allow a request to stream from a future point in WAL that
672  * hasn't been flushed to disk in this server yet.
673  */
674  if (FlushPtr < cmd->startpoint)
675  {
676  ereport(ERROR,
677  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
678  (uint32) (cmd->startpoint >> 32),
679  (uint32) (cmd->startpoint),
680  (uint32) (FlushPtr >> 32),
681  (uint32) (FlushPtr))));
682  }
683 
684  /* Start streaming from the requested point */
685  sentPtr = cmd->startpoint;
686 
687  /* Initialize shared memory status, too */
691 
693 
694  /* Main loop of walsender */
695  replication_active = true;
696 
698 
699  replication_active = false;
700  if (got_STOPPING)
701  proc_exit(0);
703 
705  }
706 
707  if (cmd->slotname)
709 
710  /*
711  * Copy is finished now. Send a single-row result set indicating the next
712  * timeline.
713  */
715  {
716  char startpos_str[8 + 1 + 8 + 1];
718  TupOutputState *tstate;
719  TupleDesc tupdesc;
720  Datum values[2];
721  bool nulls[2];
722 
723  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
724  (uint32) (sendTimeLineValidUpto >> 32),
726 
728  MemSet(nulls, false, sizeof(nulls));
729 
730  /*
731  * Need a tuple descriptor representing two columns. int8 may seem
732  * like a surprising data type for this, but in theory int4 would not
733  * be wide enough for this, as TimeLineID is unsigned.
734  */
735  tupdesc = CreateTemplateTupleDesc(2);
736  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
737  INT8OID, -1, 0);
738  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
739  TEXTOID, -1, 0);
740 
741  /* prepare for projection of tuple */
742  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
743 
744  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
745  values[1] = CStringGetTextDatum(startpos_str);
746 
747  /* send it to dest */
748  do_tup_output(tstate, values, nulls);
749 
750  end_tup_output(tstate);
751  }
752 
753  /* Send CommandComplete message */
754  pq_puttextmessage('C', "START_STREAMING");
755 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2550
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2252
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
#define MemSet(start, val, len)
Definition: c.h:955
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8230
void list_free_deep(List *list)
Definition: list.c:1387
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2310
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2232
static char * buf
Definition: pg_test_fsync.c:68
static bool streamingDoneSending
Definition: walsender.c:182
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:860
unsigned int uint32
Definition: c.h:358
void ReplicationSlotRelease(void)
Definition: slot.c:424
#define SlotIsLogical(slot)
Definition: slot.h:154
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1699
void SyncRepInitConfig(void)
Definition: syncrep.c:383
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2154
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:198
uintptr_t Datum
Definition: postgres.h:367
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
TimeLineID ThisTimeLineID
Definition: xlog.c:187
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
void WalSndSetState(WalSndState state)
Definition: walsender.c:3177
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
static bool streamingDoneReceiving
Definition: walsender.c:183
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:784
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2933
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
Definition: pg_list.h:50
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
bool am_cascading_walsender
Definition: walsender.c:115

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 1923 of file walsender.c.

References EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

1924 {
1925  FullTransactionId nextFullXid;
1926  TransactionId nextXid;
1927  uint32 nextEpoch;
1928 
1929  nextFullXid = ReadNextFullTransactionId();
1930  nextXid = XidFromFullTransactionId(nextFullXid);
1931  nextEpoch = EpochFromFullTransactionId(nextFullXid);
1932 
1933  if (xid <= nextXid)
1934  {
1935  if (epoch != nextEpoch)
1936  return false;
1937  }
1938  else
1939  {
1940  if (epoch + 1 != nextEpoch)
1941  return false;
1942  }
1943 
1944  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1945  return false; /* epoch OK, but it's wrapped around */
1946 
1947  return true;
1948 }
uint32 TransactionId
Definition: c.h:507
#define XidFromFullTransactionId(x)
Definition: transam.h:48
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:358
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2127 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2128 {
2129  TimestampTz timeout;
2130 
2131  /* don't bail out if we're doing something that doesn't require timeouts */
2132  if (last_reply_timestamp <= 0)
2133  return;
2134 
2137 
2138  if (wal_sender_timeout > 0 && last_processing >= timeout)
2139  {
2140  /*
2141  * Since typically expiration of replication timeout means
2142  * communication problem, we don't send the error message to the
2143  * standby.
2144  */
2146  (errmsg("terminating walsender process due to replication timeout")));
2147 
2148  WalSndShutdown();
2149  }
2150 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:165
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndShutdown(void)
Definition: walsender.c:233
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:784
static TimestampTz last_reply_timestamp
Definition: walsender.c:171

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2077 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2078 {
2079  long sleeptime = 10000; /* 10 s */
2080 
2082  {
2083  TimestampTz wakeup_time;
2084  long sec_to_timeout;
2085  int microsec_to_timeout;
2086 
2087  /*
2088  * At the latest stop sleeping once wal_sender_timeout has been
2089  * reached.
2090  */
2093 
2094  /*
2095  * If no ping has been sent yet, wakeup when it's time to do so.
2096  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2097  * the timeout passed without a response.
2098  */
2101  wal_sender_timeout / 2);
2102 
2103  /* Compute relative time until wakeup. */
2104  TimestampDifference(now, wakeup_time,
2105  &sec_to_timeout, &microsec_to_timeout);
2106 
2107  sleeptime = sec_to_timeout * 1000 +
2108  microsec_to_timeout / 1000;
2109  }
2110 
2111  return sleeptime;
2112 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:42
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1643
static bool waiting_for_ping_response
Definition: walsender.c:174
static TimestampTz last_reply_timestamp
Definition: walsender.c:171
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2893 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2894 {
2895  XLogRecPtr replicatedPtr;
2896 
2897  /* ... let's just be real sure we're caught up ... */
2898  send_data();
2899 
2900  /*
2901  * To figure out whether all WAL has successfully been replicated, check
2902  * flush location if valid, write otherwise. Tools like pg_receivewal will
2903  * usually (unless in synchronous mode) return an invalid flush location.
2904  */
2905  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2907 
2908  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2909  !pq_is_send_pending())
2910  {
2911  /* Inform the standby that XLOG streaming is done */
2912  EndCommand("COPY 0", DestRemote);
2913  pq_flush();
2914 
2915  proc_exit(0);
2916  }
2918  {
2919  WalSndKeepalive(true);
2921  }
2922 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3406
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:186
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:174

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 298 of file walsender.c.

References close, ConditionVariableCancelSleep(), DestNone, DestRemote, got_SIGUSR2, got_STOPPING, LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, WalSndSetState(), WalSndShutdown(), WALSNDSTATE_STARTUP, and whereToSendOutput.

Referenced by PostgresMain().

299 {
303 
304  if (sendFile >= 0)
305  {
306  close(sendFile);
307  sendFile = -1;
308  }
309 
310  if (MyReplicationSlot != NULL)
312 
314 
315  replication_active = false;
316 
317  if (got_STOPPING || got_SIGUSR2)
318  proc_exit(0);
319 
320  /* Revert back to startup state */
322 }
static int sendFile
Definition: walsender.c:135
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
void proc_exit(int code)
Definition: ipc.c:104
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:424
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:189
static volatile sig_atomic_t replication_active
Definition: walsender.c:198
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3177
void ReplicationSlotCleanup(void)
Definition: slot.c:479
void LWLockReleaseAll(void)
Definition: lwlock.c:1825
#define close(a)
Definition: win32.h:12

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3196 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3197 {
3198  switch (state)
3199  {
3200  case WALSNDSTATE_STARTUP:
3201  return "startup";
3202  case WALSNDSTATE_BACKUP:
3203  return "backup";
3204  case WALSNDSTATE_CATCHUP:
3205  return "catchup";
3206  case WALSNDSTATE_STREAMING:
3207  return "streaming";
3208  case WALSNDSTATE_STOPPING:
3209  return "stopping";
3210  }
3211  return "UNKNOWN";
3212 }
Definition: regguts.h:298

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3113 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3114 {
3115  int i;
3116 
3117  for (i = 0; i < max_wal_senders; i++)
3118  {
3119  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3120  pid_t pid;
3121 
3122  SpinLockAcquire(&walsnd->mutex);
3123  pid = walsnd->pid;
3124  SpinLockRelease(&walsnd->mutex);
3125 
3126  if (pid == 0)
3127  continue;
3128 
3130  }
3131 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3406 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3407 {
3408  elog(DEBUG2, "sending replication keepalive");
3409 
3410  /* construct the message... */
3412  pq_sendbyte(&output_message, 'k');
3415  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3416 
3417  /* ... and send it wrapped in CopyData */
3419 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
static StringInfoData output_message
Definition: walsender.c:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static XLogRecPtr sentPtr
Definition: walsender.c:157
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog(elevel,...)
Definition: elog.h:226

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3425 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3426 {
3427  TimestampTz ping_time;
3428 
3429  /*
3430  * Don't send keepalive messages if timeouts are globally disabled or
3431  * we're doing something not partaking in timeouts.
3432  */
3433  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3434  return;
3435 
3437  return;
3438 
3439  /*
3440  * If half of wal_sender_timeout has lapsed without receiving any reply
3441  * from the standby, send a keep-alive message to the standby requesting
3442  * an immediate reply.
3443  */
3445  wal_sender_timeout / 2);
3446  if (last_processing >= ping_time)
3447  {
3448  WalSndKeepalive(true);
3450 
3451  /* Try to flush pending output to the client */
3452  if (pq_flush_if_writable() != 0)
3453  WalSndShutdown();
3454  }
3455 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3406
static TimestampTz last_processing
Definition: walsender.c:165
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:233
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:174
static TimestampTz last_reply_timestamp
Definition: walsender.c:171

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2339 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2340 {
2341  WalSnd *walsnd = MyWalSnd;
2342 
2343  Assert(walsnd != NULL);
2344 
2345  MyWalSnd = NULL;
2346 
2347  SpinLockAcquire(&walsnd->mutex);
2348  /* clear latch while holding the spinlock, so it can safely be read */
2349  walsnd->latch = NULL;
2350  /* Mark WalSnd struct as no longer being in use. */
2351  walsnd->pid = 0;
2352  SpinLockRelease(&walsnd->mutex);
2353 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:732

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3008 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

3009 {
3010  int save_errno = errno;
3011 
3012  got_SIGUSR2 = true;
3013  SetLatch(MyLatch);
3014 
3015  errno = save_errno;
3016 }
void SetLatch(Latch *latch)
Definition: latch.c:436
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:189
struct Latch * MyLatch
Definition: globals.c:54

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2154 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

2155 {
2156  /*
2157  * Initialize the last reply timestamp. That enables timeout processing
2158  * from hereon.
2159  */
2161  waiting_for_ping_response = false;
2162 
2163  /*
2164  * Loop until we reach the end of this timeline or the client requests to
2165  * stop streaming.
2166  */
2167  for (;;)
2168  {
2169  /* Clear any already-pending wakeups */
2171 
2173 
2174  /* Process any requests or signals received recently */
2175  if (ConfigReloadPending)
2176  {
2177  ConfigReloadPending = false;
2180  }
2181 
2182  /* Check for input from the client */
2184 
2185  /*
2186  * If we have received CopyDone from the client, sent CopyDone
2187  * ourselves, and the output buffer is empty, it's time to exit
2188  * streaming.
2189  */
2191  !pq_is_send_pending())
2192  break;
2193 
2194  /*
2195  * If we don't have any pending data in the output buffer, try to send
2196  * some more. If there is some, we don't bother to call send_data
2197  * again until we've flushed it ... but we'd better assume we are not
2198  * caught up.
2199  */
2200  if (!pq_is_send_pending())
2201  send_data();
2202  else
2203  WalSndCaughtUp = false;
2204 
2205  /* Try to flush pending output to the client */
2206  if (pq_flush_if_writable() != 0)
2207  WalSndShutdown();
2208 
2209  /* If nothing remains to be sent right now ... */
2211  {
2212  /*
2213  * If we're in catchup state, move to streaming. This is an
2214  * important state change for users to know about, since before
2215  * this point data loss might occur if the primary dies and we
2216  * need to failover to the standby. The state change is also
2217  * important for synchronous replication, since commits that
2218  * started to wait at that point might wait for some time.
2219  */
2221  {
2222  ereport(DEBUG1,
2223  (errmsg("\"%s\" has now caught up with upstream server",
2224  application_name)));
2226  }
2227 
2228  /*
2229  * When SIGUSR2 arrives, we send any outstanding logs up to the
2230  * shutdown checkpoint record (i.e., the latest record), wait for
2231  * them to be replicated to the standby, and exit. This may be a
2232  * normal termination at shutdown, or a promotion, the walsender
2233  * is not sure which.
2234  */
2235  if (got_SIGUSR2)
2236  WalSndDone(send_data);
2237  }
2238 
2239  /* Check for replication timeout. */
2241 
2242  /* Send keepalive if the time has come */
2244 
2245  /*
2246  * We don't block if not caught up, unless there is unsent data
2247  * pending in which case we'd better block until the socket is
2248  * write-ready. This test is only needed for the case where the
2249  * send_data callback handled a subset of the available data but then
2250  * pq_flush_if_writable flushed it all --- we should immediately try
2251  * to send more.
2252  */
2254  {
2255  long sleeptime;
2256  int wakeEvents;
2257 
2258  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2260 
2261  /*
2262  * Use fresh timestamp, not last_processing, to reduce the chance
2263  * of reaching wal_sender_timeout before sending a keepalive.
2264  */
2266 
2267  if (pq_is_send_pending())
2268  wakeEvents |= WL_SOCKET_WRITEABLE;
2269 
2270  /* Sleep until something happens or we time out */
2271  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2272  MyProcPort->sock, sleeptime,
2274  }
2275  }
2276  return;
2277 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2893
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:42
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:182
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:189
void SyncRepInitConfig(void)
Definition: syncrep.c:383
#define ereport(elevel, rest)
Definition: elog.h:141
static bool WalSndCaughtUp
Definition: walsender.c:186
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3425
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2127
static void WalSndShutdown(void)
Definition: walsender.c:233
WalSnd * MyWalSnd
Definition: walsender.c:111
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2077
void WalSndSetState(WalSndState state)
Definition: walsender.c:3177
static bool streamingDoneReceiving
Definition: walsender.c:183
char * application_name
Definition: guc.c:529
int errmsg(const char *fmt,...)
Definition: elog.c:784
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:174
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:171
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1602
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1152 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1153 {
1154  /* can't have sync rep confused by sending the same LSN several times */
1155  if (!last_write)
1156  lsn = InvalidXLogRecPtr;
1157 
1158  resetStringInfo(ctx->out);
1159 
1160  pq_sendbyte(ctx->out, 'w');
1161  pq_sendint64(ctx->out, lsn); /* dataStart */
1162  pq_sendint64(ctx->out, lsn); /* walEnd */
1163 
1164  /*
1165  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1166  * reserve space here.
1167  */
1168  pq_sendint64(ctx->out, 0); /* sendtime */
1169 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
StringInfo out
Definition: logical.h:71

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2963 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2964 {
2965  int i;
2966 
2967  for (i = 0; i < max_wal_senders; i++)
2968  {
2969  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2970 
2971  SpinLockAcquire(&walsnd->mutex);
2972  if (walsnd->pid == 0)
2973  {
2974  SpinLockRelease(&walsnd->mutex);
2975  continue;
2976  }
2977  walsnd->needreload = true;
2978  SpinLockRelease(&walsnd->mutex);
2979  }
2980 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3177 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3178 {
3179  WalSnd *walsnd = MyWalSnd;
3180 
3182 
3183  if (walsnd->state == state)
3184  return;
3185 
3186  SpinLockAcquire(&walsnd->mutex);
3187  walsnd->state = state;
3188  SpinLockRelease(&walsnd->mutex);
3189 }
slock_t mutex
bool am_walsender
Definition: walsender.c:114
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:732
Definition: regguts.h:298

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3052 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3053 {
3054  bool found;
3055  int i;
3056 
3057  WalSndCtl = (WalSndCtlData *)
3058  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3059 
3060  if (!found)
3061  {
3062  /* First time through, so initialize */
3064 
3065  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3067 
3068  for (i = 0; i < max_wal_senders; i++)
3069  {
3070  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3071 
3072  SpinLockInit(&walsnd->mutex);
3073  }
3074  }
3075 }
Size WalSndShmemSize(void)
Definition: walsender.c:3040
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
#define MemSet(start, val, len)
Definition: c.h:955
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:120
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3040 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3041 {
3042  Size size = 0;
3043 
3044  size = offsetof(WalSndCtlData, walsnds);
3045  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3046 
3047  return size;
3048 }
int max_wal_senders
Definition: walsender.c:120
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:466
#define offsetof(type, field)
Definition: c.h:655

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 233 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndErrorCleanup(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

265 {
267 
268  /* Create a per-walsender data structure in shared memory */
270 
271  /*
272  * We don't currently need any ResourceOwner in a walsender process, but
273  * if we did, we could call CreateAuxProcessResourceOwner here.
274  */
275 
276  /*
277  * Let postmaster know that we're a WAL sender. Once we've declared us as
278  * a WAL sender process, postmaster will let us outlive the bgwriter and
279  * kill us last in the shutdown sequence, so we get a chance to stream all
280  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
281  * there's no going back, and we mustn't write any WAL records after this.
282  */
285 
286  /* Initialize empty timestamp buffer for lag tracking. */
288 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2281
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
bool RecoveryInProgress(void)
Definition: xlog.c:7898
static LagTracker * lag_tracker
Definition: walsender.c:223
MemoryContext TopMemoryContext
Definition: mcxt.c:44
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:814
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
bool am_cascading_walsender
Definition: walsender.c:115

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3020 of file walsender.c.

References die, InitializeTimeouts(), PostgresSigHupHandler(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3021 {
3022  /* Set up signal handlers */
3023  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
3024  * file */
3025  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3026  pqsignal(SIGTERM, die); /* request shutdown */
3027  pqsignal(SIGQUIT, quickdie); /* hard crash time */
3028  InitializeTimeouts(); /* establishes SIGALRM handler */
3031  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3032  * shutdown */
3033 
3034  /* Reset some signals that are accepted by postmaster but not here */
3036 }
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGQUIT
Definition: win32_port.h:164
#define SIGUSR1
Definition: win32_port.h:175
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3008
#define SIGCHLD
Definition: win32_port.h:173
#define SIGPIPE
Definition: win32_port.h:168
#define SIGUSR2
Definition: win32_port.h:176
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2792
#define SIGHUP
Definition: win32_port.h:163
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2832
#define SIG_IGN
Definition: win32_port.h:160
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:158
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:261
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2694
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1268 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1269 {
1270  static TimestampTz sendTime = 0;
1272 
1273  /*
1274  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1275  * avoid flooding the lag tracker when we commit frequently.
1276  */
1277 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1278  if (!TimestampDifferenceExceeds(sendTime, now,
1280  return;
1281 
1282  LagTrackerWrite(lsn, now);
1283  sendTime = now;
1284 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1668
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3464
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1294 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1295 {
1296  int wakeEvents;
1297  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1298 
1299 
1300  /*
1301  * Fast path to avoid acquiring the spinlock in case we already know we
1302  * have enough WAL available. This is particularly interesting if we're
1303  * far behind.
1304  */
1305  if (RecentFlushPtr != InvalidXLogRecPtr &&
1306  loc <= RecentFlushPtr)
1307  return RecentFlushPtr;
1308 
1309  /* Get a more recent flush pointer. */
1310  if (!RecoveryInProgress())
1311  RecentFlushPtr = GetFlushRecPtr();
1312  else
1313  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1314 
1315  for (;;)
1316  {
1317  long sleeptime;
1318 
1319  /* Clear any already-pending wakeups */
1321 
1323 
1324  /* Process any requests or signals received recently */
1325  if (ConfigReloadPending)
1326  {
1327  ConfigReloadPending = false;
1330  }
1331 
1332  /* Check for input from the client */
1334 
1335  /*
1336  * If we're shutting down, trigger pending WAL to be written out,
1337  * otherwise we'd possibly end up waiting for WAL that never gets
1338  * written, because walwriter has shut down already.
1339  */
1340  if (got_STOPPING)
1342 
1343  /* Update our idea of the currently flushed position. */
1344  if (!RecoveryInProgress())
1345  RecentFlushPtr = GetFlushRecPtr();
1346  else
1347  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1348 
1349  /*
1350  * If postmaster asked us to stop, don't wait anymore.
1351  *
1352  * It's important to do this check after the recomputation of
1353  * RecentFlushPtr, so we can send all remaining data before shutting
1354  * down.
1355  */
1356  if (got_STOPPING)
1357  break;
1358 
1359  /*
1360  * We only send regular messages to the client for full decoded
1361  * transactions, but a synchronous replication and walsender shutdown
1362  * possibly are waiting for a later location. So we send pings
1363  * containing the flush location every now and then.
1364  */
1365  if (MyWalSnd->flush < sentPtr &&
1366  MyWalSnd->write < sentPtr &&
1368  {
1369  WalSndKeepalive(false);
1371  }
1372 
1373  /* check whether we're done */
1374  if (loc <= RecentFlushPtr)
1375  break;
1376 
1377  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1378  WalSndCaughtUp = true;
1379 
1380  /*
1381  * Try to flush any pending output to the client.
1382  */
1383  if (pq_flush_if_writable() != 0)
1384  WalSndShutdown();
1385 
1386  /*
1387  * If we have received CopyDone from the client, sent CopyDone
1388  * ourselves, and the output buffer is empty, it's time to exit
1389  * streaming, so fail the current WAL fetch request.
1390  */
1392  !pq_is_send_pending())
1393  break;
1394 
1395  /* die if timeout was reached */
1397 
1398  /* Send keepalive if the time has come */
1400 
1401  /*
1402  * Sleep until something happens or we time out. Also wait for the
1403  * socket becoming writable, if there's still pending output.
1404  * Otherwise we might sit on sendable output data while waiting for
1405  * new WAL to be generated. (But if we have nothing to send, we don't
1406  * want to wake on socket-writable.)
1407  */
1409 
1410  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1412 
1413  if (pq_is_send_pending())
1414  wakeEvents |= WL_SOCKET_WRITEABLE;
1415 
1416  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1417  MyProcPort->sock, sleeptime,
1419  }
1420 
1421  /* reactivate latch so WalSndLoop knows to continue */
1422  SetLatch(MyLatch);
1423  return RecentFlushPtr;
1424 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8230
int sleeptime
Definition: pg_standby.c:42
bool RecoveryInProgress(void)
Definition: xlog.c:7898
void SetLatch(Latch *latch)
Definition: latch.c:436
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3406
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11130
bool XLogBackgroundFlush(void)
Definition: xlog.c:2987
static bool streamingDoneSending
Definition: walsender.c:182
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void SyncRepInitConfig(void)
Definition: syncrep.c:383
static bool WalSndCaughtUp
Definition: walsender.c:186
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3425
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2127
static void WalSndShutdown(void)
Definition: walsender.c:233
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2077
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool streamingDoneReceiving
Definition: walsender.c:183
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:174
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1602
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3139 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3140 {
3141  for (;;)
3142  {
3143  int i;
3144  bool all_stopped = true;
3145 
3146  for (i = 0; i < max_wal_senders; i++)
3147  {
3148  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3149 
3150  SpinLockAcquire(&walsnd->mutex);
3151 
3152  if (walsnd->pid == 0)
3153  {
3154  SpinLockRelease(&walsnd->mutex);
3155  continue;
3156  }
3157 
3158  if (walsnd->state != WALSNDSTATE_STOPPING)
3159  {
3160  all_stopped = false;
3161  SpinLockRelease(&walsnd->mutex);
3162  break;
3163  }
3164  SpinLockRelease(&walsnd->mutex);
3165  }
3166 
3167  /* safe to leave if confirmation is done for all WAL senders */
3168  if (all_stopped)
3169  return;
3170 
3171  pg_usleep(10000L); /* wait for 10 msec */
3172  }
3173 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3084 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3085 {
3086  int i;
3087 
3088  for (i = 0; i < max_wal_senders; i++)
3089  {
3090  Latch *latch;
3091  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3092 
3093  /*
3094  * Get latch pointer with spinlock held, for the unlikely case that
3095  * pointer reads aren't atomic (as they're 8 bytes).
3096  */
3097  SpinLockAcquire(&walsnd->mutex);
3098  latch = walsnd->latch;
3099  SpinLockRelease(&walsnd->mutex);
3100 
3101  if (latch != NULL)
3102  SetLatch(latch);
3103  }
3104 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:436
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1179 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1181 {
1182  TimestampTz now;
1183 
1184  /* output previously gathered data in a CopyData packet */
1185  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1186 
1187  /*
1188  * Fill the send timestamp last, so that it is taken as late as possible.
1189  * This is somewhat ugly, but the protocol is set as it's already used for
1190  * several releases by streaming physical replication.
1191  */
1193  now = GetCurrentTimestamp();
1194  pq_sendint64(&tmpbuf, now);
1195  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1196  tmpbuf.data, sizeof(int64));
1197 
1199 
1200  /* Try to flush pending output to the client */
1201  if (pq_flush_if_writable() != 0)
1202  WalSndShutdown();
1203 
1204  /* Try taking fast path unless we get too close to walsender timeout. */
1206  wal_sender_timeout / 2) &&
1207  !pq_is_send_pending())
1208  {
1209  return;
1210  }
1211 
1212  /* If we have pending write here, go to slow path */
1213  for (;;)
1214  {
1215  int wakeEvents;
1216  long sleeptime;
1217 
1218  /* Check for input from the client */
1220 
1221  /* die if timeout was reached */
1223 
1224  /* Send keepalive if the time has come */
1226 
1227  if (!pq_is_send_pending())
1228  break;
1229 
1231 
1232  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1234 
1235  /* Sleep until something happens or we time out */
1236  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1237  MyProcPort->sock, sleeptime,
1239 
1240  /* Clear any already-pending wakeups */
1242 
1244 
1245  /* Process any requests or signals received recently */
1246  if (ConfigReloadPending)
1247  {
1248  ConfigReloadPending = false;
1251  }
1252 
1253  /* Try to flush pending output to the client */
1254  if (pq_flush_if_writable() != 0)
1255  WalSndShutdown();
1256  }
1257 
1258  /* reactivate latch so WalSndLoop knows to continue */
1259  SetLatch(MyLatch);
1260 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
int wal_sender_timeout
Definition: walsender.c:122
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:42
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void SetLatch(Latch *latch)
Definition: latch.c:436
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
void SyncRepInitConfig(void)
Definition: syncrep.c:383
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3425
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2127
static void WalSndShutdown(void)
Definition: walsender.c:233
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2077
static StringInfoData tmpbuf
Definition: walsender.c:162
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:71
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:171
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1602
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ XLogRead()

static void XLogRead ( char *  buf,
XLogRecPtr  startptr,
Size  count 
)
static

Definition at line 2367 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), buf, CheckXLogRemoved(), close, curFileTimeLine, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, MAXPGPATH, WalSnd::mutex, MyWalSnd, WalSnd::needreload, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sendFile, sendOff, sendSegNo, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, startptr, ThisTimeLineID, WAIT_EVENT_WAL_READ, wal_segment_size, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegmentOffset.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2368 {
2369  char *p;
2370  XLogRecPtr recptr;
2371  Size nbytes;
2372  XLogSegNo segno;
2373 
2374 retry:
2375  p = buf;
2376  recptr = startptr;
2377  nbytes = count;
2378 
2379  while (nbytes > 0)
2380  {
2381  uint32 startoff;
2382  int segbytes;
2383  int readbytes;
2384 
2385  startoff = XLogSegmentOffset(recptr, wal_segment_size);
2386 
2387  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
2388  {
2389  char path[MAXPGPATH];
2390 
2391  /* Switch to another logfile segment */
2392  if (sendFile >= 0)
2393  close(sendFile);
2394 
2396 
2397  /*-------
2398  * When reading from a historic timeline, and there is a timeline
2399  * switch within this segment, read from the WAL segment belonging
2400  * to the new timeline.
2401  *
2402  * For example, imagine that this server is currently on timeline
2403  * 5, and we're streaming timeline 4. The switch from timeline 4
2404  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2405  *
2406  * ...
2407  * 000000040000000000000012
2408  * 000000040000000000000013
2409  * 000000050000000000000013
2410  * 000000050000000000000014
2411  * ...
2412  *
2413  * In this situation, when requested to send the WAL from
2414  * segment 0x13, on timeline 4, we read the WAL from file
2415  * 000000050000000000000013. Archive recovery prefers files from
2416  * newer timelines, so if the segment was restored from the
2417  * archive on this server, the file belonging to the old timeline,
2418  * 000000040000000000000013, might not exist. Their contents are
2419  * equal up to the switchpoint, because at a timeline switch, the
2420  * used portion of the old segment is copied to the new file.
2421  *-------
2422  */
2425  {
2426  XLogSegNo endSegNo;
2427 
2429  if (sendSegNo == endSegNo)
2431  }
2432 
2434 
2435  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2436  if (sendFile < 0)
2437  {
2438  /*
2439  * If the file is not found, assume it's because the standby
2440  * asked for a too old WAL segment that has already been
2441  * removed or recycled.
2442  */
2443  if (errno == ENOENT)
2444  ereport(ERROR,
2446  errmsg("requested WAL segment %s has already been removed",
2448  else
2449  ereport(ERROR,
2451  errmsg("could not open file \"%s\": %m",
2452  path)));
2453  }
2454  sendOff = 0;
2455  }
2456 
2457  /* Need to seek in the file? */
2458  if (sendOff != startoff)
2459  {
2460  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2461  ereport(ERROR,
2463  errmsg("could not seek in log segment %s to offset %u: %m",
2465  startoff)));
2466  sendOff = startoff;
2467  }
2468 
2469  /* How many bytes are within this segment? */
2470  if (nbytes > (wal_segment_size - startoff))
2471  segbytes = wal_segment_size - startoff;
2472  else
2473  segbytes = nbytes;
2474 
2476  readbytes = read(sendFile, p, segbytes);
2478  if (readbytes < 0)
2479  {
2480  ereport(ERROR,
2482  errmsg("could not read from log segment %s, offset %u, length %zu: %m",
2484  sendOff, (Size) segbytes)));
2485  }
2486  else if (readbytes == 0)
2487  {
2488  ereport(ERROR,
2490  errmsg("could not read from log segment %s, offset %u: read %d of %zu",
2492  sendOff, readbytes, (Size) segbytes)));
2493  }
2494 
2495  /* Update state for read */
2496  recptr += readbytes;
2497 
2498  sendOff += readbytes;
2499  nbytes -= readbytes;
2500  p += readbytes;
2501  }
2502 
2503  /*
2504  * After reading into the buffer, check that what we read was valid. We do
2505  * this after reading, because even though the segment was present when we
2506  * opened it, it might get recycled or removed while we read it. The
2507  * read() succeeds in that case, but the data we tried to read might
2508  * already have been overwritten with new WAL records.
2509  */
2512 
2513  /*
2514  * During recovery, the currently-open WAL file might be replaced with the
2515  * file of the same name retrieved from archive. So we always need to
2516  * check what we read was valid after reading into the buffer. If it's
2517  * invalid, we try to open and read the file again.
2518  */
2520  {
2521  WalSnd *walsnd = MyWalSnd;
2522  bool reload;
2523 
2524  SpinLockAcquire(&walsnd->mutex);
2525  reload = walsnd->needreload;
2526  walsnd->needreload = false;
2527  SpinLockRelease(&walsnd->mutex);
2528 
2529  if (reload && sendFile >= 0)
2530  {
2531  close(sendFile);
2532  sendFile = -1;
2533 
2534  goto retry;
2535  }
2536  }
2537 }
int wal_segment_size
Definition: xlog.c:112
static int sendFile
Definition: walsender.c:135
int errcode(int sqlerrcode)
Definition: elog.c:570
#define PG_BINARY
Definition: c.h:1191
slock_t mutex
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3846
#define MAXPGPATH
static TimeLineID curFileTimeLine
Definition: walsender.c:140
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10130
static char * buf
Definition: pg_test_fsync.c:68
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:44
#define SpinLockRelease(lock)
Definition: spin.h:64
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeLineID ThisTimeLineID
Definition: xlog.c:187
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool needreload
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
static XLogSegNo sendSegNo
Definition: walsender.c:136
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:946
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:784
static uint32 sendOff
Definition: walsender.c:137
#define close(a)
Definition: win32.h:12
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
bool am_cascading_walsender
Definition: walsender.c:115
static XLogRecPtr startptr
Definition: basebackup.c:106
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2813 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2814 {
2815  XLogRecord *record;
2816  char *errm;
2817 
2818  /*
2819  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2820  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2821  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2822  * didn't wait - i.e. when we're shutting down.
2823  */
2824  WalSndCaughtUp = false;
2825 
2828 
2829  /* xlog record was invalid */
2830  if (errm != NULL)
2831  elog(ERROR, "%s", errm);
2832 
2833  if (record != NULL)
2834  {
2835  /* XXX: Note that logical decoding cannot be used while in recovery */
2836  XLogRecPtr flushPtr = GetFlushRecPtr();
2837 
2838  /*
2839  * Note the lack of any call to LagTrackerWrite() which is handled by
2840  * WalSndUpdateProgress which is called by output plugin through
2841  * logical decoding write api.
2842  */
2844 
2846 
2847  /*
2848  * If we have sent a record that is at or beyond the flushed point, we
2849  * have caught up.
2850  */
2851  if (sentPtr >= flushPtr)
2852  WalSndCaughtUp = true;
2853  }
2854  else
2855  {
2856  /*
2857  * If the record we just wanted read is at or beyond the flushed
2858  * point, then we're caught up.
2859  */
2861  {
2862  WalSndCaughtUp = true;
2863 
2864  /*
2865  * Have WalSndLoop() terminate the connection in an orderly
2866  * manner, after writing out all the pending data.
2867  */
2868  if (got_STOPPING)
2869  got_SIGUSR2 = true;
2870  }
2871  }
2872 
2873  /* Update shared memory status */
2874  {
2875  WalSnd *walsnd = MyWalSnd;
2876 
2877  SpinLockAcquire(&walsnd->mutex);
2878  walsnd->sentPtr = sentPtr;
2879  SpinLockRelease(&walsnd->mutex);
2880  }
2881 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8230
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:217
XLogRecPtr EndRecPtr
Definition: xlogreader.h:124
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:97
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:200
static XLogRecPtr logical_startptr
Definition: walsender.c:201
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:189
static bool WalSndCaughtUp
Definition: walsender.c:186
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogReaderState * reader
Definition: logical.h:42
#define elog(elevel,...)
Definition: elog.h:226

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 2550 of file walsender.c.

References am_cascading_walsender, Assert, close, StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendFile, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, startptr, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, and XLogRead().

Referenced by StartReplication().

2551 {
2552  XLogRecPtr SendRqstPtr;
2554  XLogRecPtr endptr;
2555  Size nbytes;
2556 
2557  /* If requested switch the WAL sender to the stopping state. */
2558  if (got_STOPPING)
2560 
2562  {
2563  WalSndCaughtUp = true;
2564  return;
2565  }
2566 
2567  /* Figure out how far we can safely send the WAL. */
2569  {
2570  /*
2571  * Streaming an old timeline that's in this server's history, but is
2572  * not the one we're currently inserting or replaying. It can be
2573  * streamed up to the point where we switched off that timeline.
2574  */
2575