PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   11
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (TimestampTz now)
 
static void XLogRead (char *buf, XLogRecPtr startptr, Size count)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (TimestampTz now)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static int sendFile = -1
 
static XLogSegNo sendSegNo = 0
 
static uint32 sendOff = 0
 
static TimeLineID curFileTimeLine = 0
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
struct {
   XLogRecPtr   last_lsn
 
   WalTimeSample   buffer [LAG_TRACKER_BUFFER_SIZE]
 
   int   write_head
 
   int   read_heads [NUM_SYNC_REP_WAIT_MODE]
 
   WalTimeSample   last_read [NUM_SYNC_REP_WAIT_MODE]
 
LagTracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 209 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   11

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 225 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 836 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf(), CreateReplicationSlotCmd::temporary, TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

837 {
838  const char *snapshot_name = NULL;
839  char xloc[MAXFNAMELEN];
840  char *slot_name;
841  bool reserve_wal = false;
842  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
844  TupOutputState *tstate;
845  TupleDesc tupdesc;
846  Datum values[4];
847  bool nulls[4];
848 
850 
851  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
852 
853  /* setup state for XLogReadPage */
854  sendTimeLineIsHistoric = false;
856 
857  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
858  {
859  ReplicationSlotCreate(cmd->slotname, false,
861  }
862  else
863  {
865 
866  /*
867  * Initially create persistent slot as ephemeral - that allows us to
868  * nicely handle errors during initialization because it'll get
869  * dropped if this transaction fails. We'll make it persistent at the
870  * end. Temporary slots can be created as temporary from beginning as
871  * they get dropped on error as well.
872  */
873  ReplicationSlotCreate(cmd->slotname, true,
875  }
876 
877  if (cmd->kind == REPLICATION_KIND_LOGICAL)
878  {
880  bool need_full_snapshot = false;
881 
882  /*
883  * Do options check early so that we can bail before calling the
884  * DecodingContextFindStartpoint which can take long time.
885  */
886  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
887  {
888  if (IsTransactionBlock())
889  ereport(ERROR,
890  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
891  "must not be called inside a transaction")));
892 
893  need_full_snapshot = true;
894  }
895  else if (snapshot_action == CRS_USE_SNAPSHOT)
896  {
897  if (!IsTransactionBlock())
898  ereport(ERROR,
899  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
900  "must be called inside a transaction")));
901 
903  ereport(ERROR,
904  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
905  "must be called in REPEATABLE READ isolation mode transaction")));
906 
907  if (FirstSnapshotSet)
908  ereport(ERROR,
909  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
910  "must be called before any query")));
911 
912  if (IsSubTransaction())
913  ereport(ERROR,
914  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
915  "must not be called in a subtransaction")));
916 
917  need_full_snapshot = true;
918  }
919 
920  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
924 
925  /*
926  * Signal that we don't need the timeout mechanism. We're just
927  * creating the replication slot and don't yet accept feedback
928  * messages or send keepalives. As we possibly need to wait for
929  * further WAL the walsender would otherwise possibly be killed too
930  * soon.
931  */
933 
934  /* build initial snapshot, might take a while */
936 
937  /*
938  * Export or use the snapshot if we've been asked to do so.
939  *
940  * NB. We will convert the snapbuild.c kind of snapshot to normal
941  * snapshot when doing this.
942  */
943  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
944  {
945  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
946  }
947  else if (snapshot_action == CRS_USE_SNAPSHOT)
948  {
949  Snapshot snap;
950 
953  }
954 
955  /* don't need the decoding context anymore */
956  FreeDecodingContext(ctx);
957 
958  if (!cmd->temporary)
960  }
961  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
962  {
964 
966 
967  /* Write this slot to disk if it's a permanent one. */
968  if (!cmd->temporary)
970  }
971 
972  snprintf(xloc, sizeof(xloc), "%X/%X",
975 
977  MemSet(nulls, false, sizeof(nulls));
978 
979  /*----------
980  * Need a tuple descriptor representing four columns:
981  * - first field: the slot name
982  * - second field: LSN at which we became consistent
983  * - third field: exported snapshot's name
984  * - fourth field: output plugin
985  *----------
986  */
987  tupdesc = CreateTemplateTupleDesc(4, false);
988  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
989  TEXTOID, -1, 0);
990  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
991  TEXTOID, -1, 0);
992  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
993  TEXTOID, -1, 0);
994  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
995  TEXTOID, -1, 0);
996 
997  /* prepare for projection of tuples */
998  tstate = begin_tup_output_tupdesc(dest, tupdesc);
999 
1000  /* slot_name */
1001  slot_name = NameStr(MyReplicationSlot->data.name);
1002  values[0] = CStringGetTextDatum(slot_name);
1003 
1004  /* consistent wal location */
1005  values[1] = CStringGetTextDatum(xloc);
1006 
1007  /* snapshot name, or NULL if none */
1008  if (snapshot_name != NULL)
1009  values[2] = CStringGetTextDatum(snapshot_name);
1010  else
1011  nulls[2] = true;
1012 
1013  /* plugin, or NULL if none */
1014  if (cmd->plugin != NULL)
1015  values[3] = CStringGetTextDatum(cmd->plugin);
1016  else
1017  nulls[3] = true;
1018 
1019  /* send it to dest */
1020  do_tup_output(tstate, values, nulls);
1021  end_tup_output(tstate);
1022 
1024 }
#define NIL
Definition: pg_list.h:69
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:783
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:30
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2191
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1309
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define MemSet(start, val, len)
Definition: c.h:897
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ReplicationSlotSave(void)
Definition: slot.c:638
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:220
ReplicationSlotPersistentData data
Definition: slot.h:120
XLogRecPtr confirmed_flush
Definition: slot.h:81
ReplicationKind kind
Definition: replnodes.h:56
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:749
bool IsTransactionBlock(void)
Definition: xact.c:4448
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:635
void ReplicationSlotReserveWal(void)
Definition: slot.c:985
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:438
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1367
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:614
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:203
void ReplicationSlotPersist(void)
Definition: slot.c:673
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1153
unsigned int uint32
Definition: c.h:314
void ReplicationSlotRelease(void)
Definition: slot.c:416
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1291
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1126
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:365
TimeLineID ThisTimeLineID
Definition: xlog.c:181
struct SnapBuild * snapshot_builder
Definition: logical.h:46
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:149
#define Assert(condition)
Definition: c.h:688
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:481
int XactIsoLevel
Definition: xact.c:74
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:43
bool IsSubTransaction(void)
Definition: xact.c:4521
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:545
static Datum values[MAXATTR]
Definition: bootstrap.c:164
int errmsg(const char *fmt,...)
Definition: elog.c:797
CRSSnapshotAction
Definition: walsender.h:22
#define NameStr(name)
Definition: c.h:565
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
static bool sendTimeLineIsHistoric
Definition: walsender.c:151
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1251

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1030 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1031 {
1032  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1033  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1034 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:509

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1426 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, generate_unaccent_rules::dest, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventTransactionChain(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), WalSnd::state, STATE_IDLE, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1427 {
1428  int parse_rc;
1429  Node *cmd_node;
1430  MemoryContext cmd_context;
1431  MemoryContext old_context;
1432 
1433  /*
1434  * If WAL sender has been told that shutdown is getting close, switch its
1435  * status accordingly to handle the next replication commands correctly.
1436  */
1437  if (got_STOPPING)
1439 
1440  /*
1441  * Throw error if in stopping mode. We need prevent commands that could
1442  * generate WAL while the shutdown checkpoint is being written. To be
1443  * safe, we just prohibit all new commands.
1444  */
1446  ereport(ERROR,
1447  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1448 
1449  /*
1450  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1451  * command arrives. Clean up the old stuff if there's anything.
1452  */
1454 
1456 
1458  "Replication command context",
1460  old_context = MemoryContextSwitchTo(cmd_context);
1461 
1462  replication_scanner_init(cmd_string);
1463  parse_rc = replication_yyparse();
1464  if (parse_rc != 0)
1465  ereport(ERROR,
1466  (errcode(ERRCODE_SYNTAX_ERROR),
1467  (errmsg_internal("replication command parser returned %d",
1468  parse_rc))));
1469 
1470  cmd_node = replication_parse_result;
1471 
1472  /*
1473  * Log replication command if log_replication_commands is enabled. Even
1474  * when it's disabled, log the command with DEBUG1 level for backward
1475  * compatibility. Note that SQL commands are not logged here, and will be
1476  * logged later if log_statement is enabled.
1477  */
1478  if (cmd_node->type != T_SQLCmd)
1480  (errmsg("received replication command: %s", cmd_string)));
1481 
1482  /*
1483  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1484  * called outside of transaction the snapshot should be cleared here.
1485  */
1486  if (!IsTransactionBlock())
1488 
1489  /*
1490  * For aborted transactions, don't allow anything except pure SQL, the
1491  * exec_simple_query() will handle it correctly.
1492  */
1493  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1494  ereport(ERROR,
1495  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1496  errmsg("current transaction is aborted, "
1497  "commands ignored until end of transaction block")));
1498 
1500 
1501  /*
1502  * Allocate buffers that will be used for each outgoing and incoming
1503  * message. We do this just once per command to reduce palloc overhead.
1504  */
1508 
1509  /* Report to pgstat that this process is running */
1511 
1512  switch (cmd_node->type)
1513  {
1514  case T_IdentifySystemCmd:
1515  IdentifySystem();
1516  break;
1517 
1518  case T_BaseBackupCmd:
1519  PreventTransactionChain(true, "BASE_BACKUP");
1520  SendBaseBackup((BaseBackupCmd *) cmd_node);
1521  break;
1522 
1525  break;
1526 
1529  break;
1530 
1531  case T_StartReplicationCmd:
1532  {
1533  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1534 
1535  PreventTransactionChain(true, "START_REPLICATION");
1536 
1537  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1538  StartReplication(cmd);
1539  else
1541  break;
1542  }
1543 
1544  case T_TimeLineHistoryCmd:
1545  PreventTransactionChain(true, "TIMELINE_HISTORY");
1547  break;
1548 
1549  case T_VariableShowStmt:
1550  {
1552  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1553 
1554  GetPGVariable(n->name, dest);
1555  }
1556  break;
1557 
1558  case T_SQLCmd:
1559  if (MyDatabaseId == InvalidOid)
1560  ereport(ERROR,
1561  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1562 
1563  /* Report to pgstat that this process is now idle */
1565 
1566  /* Tell the caller that this wasn't a WalSender command. */
1567  return false;
1568 
1569  default:
1570  elog(ERROR, "unrecognized replication command node tag: %u",
1571  cmd_node->type);
1572  }
1573 
1574  /* done */
1575  MemoryContextSwitchTo(old_context);
1576  MemoryContextDelete(cmd_context);
1577 
1578  /* Send CommandComplete message */
1579  EndCommand("SELECT", DestRemote);
1580 
1581  /* Report to pgstat that this process is now idle */
1583 
1584  return true;
1585 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:564
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:198
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:428
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1030
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:371
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:513
static StringInfoData output_message
Definition: walsender.c:161
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7941
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4448
ReplicationKind kind
Definition: replnodes.h:82
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:691
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
WalSndState state
NodeTag type
Definition: nodes.h:515
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:524
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
int replication_yyparse(void)
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:165
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static StringInfoData reply_message
Definition: walsender.c:162
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1041
Oid MyDatabaseId
Definition: globals.c:77
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
void WalSndSetState(WalSndState state)
Definition: walsender.c:3113
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:836
static StringInfoData tmpbuf
Definition: walsender.c:163
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define elog
Definition: elog.h:219
static void IdentifySystem(void)
Definition: walsender.c:339
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:696
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3154
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2865 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2866 {
2867  XLogRecPtr replayPtr;
2868  TimeLineID replayTLI;
2869  XLogRecPtr receivePtr;
2871  XLogRecPtr result;
2872 
2873  /*
2874  * We can safely send what's already been replayed. Also, if walreceiver
2875  * is streaming WAL from the same timeline, we can send anything that it
2876  * has streamed, but hasn't been replayed yet.
2877  */
2878 
2879  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2880  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2881 
2882  ThisTimeLineID = replayTLI;
2883 
2884  result = replayPtr;
2885  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2886  result = receivePtr;
2887 
2888  return result;
2889 }
uint32 TimeLineID
Definition: xlogdefs.h:45
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11155
static TimeLineID receiveTLI
Definition: xlog.c:203
TimeLineID ThisTimeLineID
Definition: xlog.c:181
uint64 XLogRecPtr
Definition: xlogdefs.h:21

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 2918 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

2919 {
2921 
2922  /*
2923  * If replication has not yet started, die like with SIGTERM. If
2924  * replication is active, only set a flag and wake up the main loop. It
2925  * will send any outstanding WAL, wait for it to be replicated to the
2926  * standby, and then exit gracefully.
2927  */
2928  if (!replication_active)
2929  kill(MyProcPid, SIGTERM);
2930  else
2931  got_STOPPING = true;
2932 }
int MyProcPid
Definition: globals.c:39
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
#define kill(pid, sig)
Definition: win32_port.h:437
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
#define Assert(condition)
Definition: c.h:688

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 339 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, INT4OID, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf(), StartTransactionCommand(), TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

340 {
341  char sysid[32];
342  char xloc[MAXFNAMELEN];
343  XLogRecPtr logptr;
344  char *dbname = NULL;
346  TupOutputState *tstate;
347  TupleDesc tupdesc;
348  Datum values[4];
349  bool nulls[4];
350 
351  /*
352  * Reply with a result set with one row, four columns. First col is system
353  * ID, second is timeline ID, third is current xlog location and the
354  * fourth contains the database name if we are connected to one.
355  */
356 
357  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
359 
362  {
363  /* this also updates ThisTimeLineID */
364  logptr = GetStandbyFlushRecPtr();
365  }
366  else
367  logptr = GetFlushRecPtr();
368 
369  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
370 
371  if (MyDatabaseId != InvalidOid)
372  {
374 
375  /* syscache access needs a transaction env. */
377  /* make dbname live outside TX context */
381  /* CommitTransactionCommand switches to TopMemoryContext */
383  }
384 
386  MemSet(nulls, false, sizeof(nulls));
387 
388  /* need a tuple descriptor representing four columns */
389  tupdesc = CreateTemplateTupleDesc(4, false);
390  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
391  TEXTOID, -1, 0);
392  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
393  INT4OID, -1, 0);
394  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
395  TEXTOID, -1, 0);
396  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
397  TEXTOID, -1, 0);
398 
399  /* prepare for projection of tuples */
400  tstate = begin_tup_output_tupdesc(dest, tupdesc);
401 
402  /* column 1: system identifier */
403  values[0] = CStringGetTextDatum(sysid);
404 
405  /* column 2: timeline */
406  values[1] = Int32GetDatum(ThisTimeLineID);
407 
408  /* column 3: wal location */
409  values[2] = CStringGetTextDatum(xloc);
410 
411  /* column 4: database name, or NULL if none */
412  if (dbname)
413  values[3] = CStringGetTextDatum(dbname);
414  else
415  nulls[3] = true;
416 
417  /* send it to dest */
418  do_tup_output(tstate, values, nulls);
419 
420  end_tup_output(tstate);
421 }
#define TEXTOID
Definition: pg_type.h:324
void CommitTransactionCommand(void)
Definition: xact.c:2745
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1309
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define INT4OID
Definition: pg_type.h:316
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:897
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8251
bool RecoveryInProgress(void)
Definition: xlog.c:7919
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1367
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:614
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2056
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
unsigned int uint32
Definition: c.h:314
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1291
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:365
Oid MyDatabaseId
Definition: globals.c:77
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:181
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2674
char * dbname
Definition: streamutil.c:42
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:43
static Datum values[MAXATTR]
Definition: bootstrap.c:164
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4711
#define Int32GetDatum(X)
Definition: postgres.h:462
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2865
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:357
bool am_cascading_walsender
Definition: walsender.c:116

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2228 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, ereport, errcode(), errmsg(), FATAL, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2229 {
2230  int i;
2231 
2232  /*
2233  * WalSndCtl should be set up already (we inherit this by fork() or
2234  * EXEC_BACKEND mechanism from the postmaster).
2235  */
2236  Assert(WalSndCtl != NULL);
2237  Assert(MyWalSnd == NULL);
2238 
2239  /*
2240  * Find a free walsender slot and reserve it. If this fails, we must be
2241  * out of WalSnd structures.
2242  */
2243  for (i = 0; i < max_wal_senders; i++)
2244  {
2245  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2246 
2247  SpinLockAcquire(&walsnd->mutex);
2248 
2249  if (walsnd->pid != 0)
2250  {
2251  SpinLockRelease(&walsnd->mutex);
2252  continue;
2253  }
2254  else
2255  {
2256  /*
2257  * Found a free slot. Reserve it for us.
2258  */
2259  walsnd->pid = MyProcPid;
2260  walsnd->sentPtr = InvalidXLogRecPtr;
2261  walsnd->write = InvalidXLogRecPtr;
2262  walsnd->flush = InvalidXLogRecPtr;
2263  walsnd->apply = InvalidXLogRecPtr;
2264  walsnd->writeLag = -1;
2265  walsnd->flushLag = -1;
2266  walsnd->applyLag = -1;
2267  walsnd->state = WALSNDSTATE_STARTUP;
2268  walsnd->latch = &MyProc->procLatch;
2269  SpinLockRelease(&walsnd->mutex);
2270  /* don't need the lock anymore */
2271  MyWalSnd = (WalSnd *) walsnd;
2272 
2273  break;
2274  }
2275  }
2276  if (MyWalSnd == NULL)
2277  ereport(FATAL,
2278  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2279  errmsg("number of requested standby connections "
2280  "exceeds max_wal_senders (currently %d)",
2281  max_wal_senders)));
2282 
2283  /* Arrange to clean up at walsender exit */
2285 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:39
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
int errcode(int sqlerrcode)
Definition: elog.c:575
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2289
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define FATAL
Definition: elog.h:52
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:355
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:688
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3458 of file walsender.c.

References Assert, LAG_TRACKER_BUFFER_SIZE, LagTracker, WalTimeSample::lsn, next, and WalTimeSample::time.

Referenced by ProcessStandbyReplyMessage().

3459 {
3460  TimestampTz time = 0;
3461 
3462  /* Read all unread samples up to this LSN or end of buffer. */
3463  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3464  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3465  {
3466  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3467  LagTracker.last_read[head] =
3468  LagTracker.buffer[LagTracker.read_heads[head]];
3469  LagTracker.read_heads[head] =
3470  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3471  }
3472 
3473  /*
3474  * If the lag tracker is empty, that means the standby has processed
3475  * everything we've ever sent so we should now clear 'last_read'. If we
3476  * didn't do that, we'd risk using a stale and irrelevant sample for
3477  * interpolation at the beginning of the next burst of WAL after a period
3478  * of idleness.
3479  */
3480  if (LagTracker.read_heads[head] == LagTracker.write_head)
3481  LagTracker.last_read[head].time = 0;
3482 
3483  if (time > now)
3484  {
3485  /* If the clock somehow went backwards, treat as not found. */
3486  return -1;
3487  }
3488  else if (time == 0)
3489  {
3490  /*
3491  * We didn't cross a time. If there is a future sample that we
3492  * haven't reached yet, and we've already reached at least one sample,
3493  * let's interpolate the local flushed time. This is mainly useful
3494  * for reporting a completely stuck apply position as having
3495  * increasing lag, since otherwise we'd have to wait for it to
3496  * eventually start moving again and cross one of our samples before
3497  * we can show the lag increasing.
3498  */
3499  if (LagTracker.read_heads[head] == LagTracker.write_head)
3500  {
3501  /* There are no future samples, so we can't interpolate. */
3502  return -1;
3503  }
3504  else if (LagTracker.last_read[head].time != 0)
3505  {
3506  /* We can interpolate between last_read and the next sample. */
3507  double fraction;
3508  WalTimeSample prev = LagTracker.last_read[head];
3509  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3510 
3511  if (lsn < prev.lsn)
3512  {
3513  /*
3514  * Reported LSNs shouldn't normally go backwards, but it's
3515  * possible when there is a timeline change. Treat as not
3516  * found.
3517  */
3518  return -1;
3519  }
3520 
3521  Assert(prev.lsn < next.lsn);
3522 
3523  if (prev.time > next.time)
3524  {
3525  /* If the clock somehow went backwards, treat as not found. */
3526  return -1;
3527  }
3528 
3529  /* See how far we are between the previous and next samples. */
3530  fraction =
3531  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3532 
3533  /* Scale the local flush time proportionally. */
3534  time = (TimestampTz)
3535  ((double) prev.time + (next.time - prev.time) * fraction);
3536  }
3537  else
3538  {
3539  /*
3540  * We have only a future sample, implying that we were entirely
3541  * caught up but and now there is a new burst of WAL and the
3542  * standby hasn't processed the first sample yet. Until the
3543  * standby reaches the future sample the best we can do is report
3544  * the hypothetical lag if that sample were to be replayed now.
3545  */
3546  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3547  }
3548  }
3549 
3550  /* Return the elapsed time since local flush time in microseconds. */
3551  Assert(time != 0);
3552  return now - time;
3553 }
static int32 next
Definition: blutils.c:210
int64 TimestampTz
Definition: timestamp.h:39
TimestampTz time
Definition: walsender.c:205
static struct @26 LagTracker
XLogRecPtr lsn
Definition: walsender.c:204
#define Assert(condition)
Definition: c.h:688
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:209
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3393 of file walsender.c.

References am_walsender, i, LAG_TRACKER_BUFFER_SIZE, LagTracker, and NUM_SYNC_REP_WAIT_MODE.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3394 {
3395  bool buffer_full;
3396  int new_write_head;
3397  int i;
3398 
3399  if (!am_walsender)
3400  return;
3401 
3402  /*
3403  * If the lsn hasn't advanced since last time, then do nothing. This way
3404  * we only record a new sample when new WAL has been written.
3405  */
3406  if (LagTracker.last_lsn == lsn)
3407  return;
3408  LagTracker.last_lsn = lsn;
3409 
3410  /*
3411  * If advancing the write head of the circular buffer would crash into any
3412  * of the read heads, then the buffer is full. In other words, the
3413  * slowest reader (presumably apply) is the one that controls the release
3414  * of space.
3415  */
3416  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3417  buffer_full = false;
3418  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3419  {
3420  if (new_write_head == LagTracker.read_heads[i])
3421  buffer_full = true;
3422  }
3423 
3424  /*
3425  * If the buffer is full, for now we just rewind by one slot and overwrite
3426  * the last sample, as a simple (if somewhat uneven) way to lower the
3427  * sampling rate. There may be better adaptive compaction algorithms.
3428  */
3429  if (buffer_full)
3430  {
3431  new_write_head = LagTracker.write_head;
3432  if (LagTracker.write_head > 0)
3433  LagTracker.write_head--;
3434  else
3435  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3436  }
3437 
3438  /* Store a sample at the current write head position. */
3439  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3440  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3441  LagTracker.write_head = new_write_head;
3442 }
static struct @26 LagTracker
bool am_walsender
Definition: walsender.c:115
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:209
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page,
TimeLineID pageTLI 
)
static

Definition at line 749 of file walsender.c.

References XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WalSndWaitForWal(), XLogRead(), and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

751 {
752  XLogRecPtr flushptr;
753  int count;
754 
755  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
757  sendTimeLine = state->currTLI;
759  sendTimeLineNextTLI = state->nextTLI;
760 
761  /* make sure we have enough WAL available */
762  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
763 
764  /* fail if not (implies we are going to shut down) */
765  if (flushptr < targetPagePtr + reqLen)
766  return -1;
767 
768  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
769  count = XLOG_BLCKSZ; /* more than one block available */
770  else
771  count = flushptr - targetPagePtr; /* part of the page available */
772 
773  /* now actually read the data, we know it's there */
774  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
775 
776  return count;
777 }
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:802
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:180
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2317
TimeLineID nextTLI
Definition: xlogreader.h:186
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1277
TimeLineID ThisTimeLineID
Definition: xlog.c:181
TimeLineID currTLI
Definition: xlogreader.h:170
static TimeLineID sendTimeLine
Definition: walsender.c:149
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:150
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:152
static bool sendTimeLineIsHistoric
Definition: walsender.c:151

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3151 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3152 {
3153  Interval *result = palloc(sizeof(Interval));
3154 
3155  result->month = 0;
3156  result->day = 0;
3157  result->time = offset;
3158 
3159  return result;
3160 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:835

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 783 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

786 {
787  ListCell *lc;
788  bool snapshot_action_given = false;
789  bool reserve_wal_given = false;
790 
791  /* Parse options */
792  foreach(lc, cmd->options)
793  {
794  DefElem *defel = (DefElem *) lfirst(lc);
795 
796  if (strcmp(defel->defname, "export_snapshot") == 0)
797  {
798  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
799  ereport(ERROR,
800  (errcode(ERRCODE_SYNTAX_ERROR),
801  errmsg("conflicting or redundant options")));
802 
803  snapshot_action_given = true;
804  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
806  }
807  else if (strcmp(defel->defname, "use_snapshot") == 0)
808  {
809  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
810  ereport(ERROR,
811  (errcode(ERRCODE_SYNTAX_ERROR),
812  errmsg("conflicting or redundant options")));
813 
814  snapshot_action_given = true;
815  *snapshot_action = CRS_USE_SNAPSHOT;
816  }
817  else if (strcmp(defel->defname, "reserve_wal") == 0)
818  {
819  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
820  ereport(ERROR,
821  (errcode(ERRCODE_SYNTAX_ERROR),
822  errmsg("conflicting or redundant options")));
823 
824  reserve_wal_given = true;
825  *reserve_wal = true;
826  }
827  else
828  elog(ERROR, "unrecognized option: %s", defel->defname);
829  }
830 }
int errcode(int sqlerrcode)
Definition: elog.c:575
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:727
#define elog
Definition: elog.h:219

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3167 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, DEFAULT_ROLE_READ_ALL_STATS, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, IntervalPGetDatum, is_member_of_role(), IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3168 {
3169 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3170  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3171  TupleDesc tupdesc;
3172  Tuplestorestate *tupstore;
3173  MemoryContext per_query_ctx;
3174  MemoryContext oldcontext;
3175  List *sync_standbys;
3176  int i;
3177 
3178  /* check to see if caller supports us returning a tuplestore */
3179  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3180  ereport(ERROR,
3181  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3182  errmsg("set-valued function called in context that cannot accept a set")));
3183  if (!(rsinfo->allowedModes & SFRM_Materialize))
3184  ereport(ERROR,
3185  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3186  errmsg("materialize mode required, but it is not " \
3187  "allowed in this context")));
3188 
3189  /* Build a tuple descriptor for our result type */
3190  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3191  elog(ERROR, "return type must be a row type");
3192 
3193  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3194  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3195 
3196  tupstore = tuplestore_begin_heap(true, false, work_mem);
3197  rsinfo->returnMode = SFRM_Materialize;
3198  rsinfo->setResult = tupstore;
3199  rsinfo->setDesc = tupdesc;
3200 
3201  MemoryContextSwitchTo(oldcontext);
3202 
3203  /*
3204  * Get the currently active synchronous standbys.
3205  */
3206  LWLockAcquire(SyncRepLock, LW_SHARED);
3207  sync_standbys = SyncRepGetSyncStandbys(NULL);
3208  LWLockRelease(SyncRepLock);
3209 
3210  for (i = 0; i < max_wal_senders; i++)
3211  {
3212  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3214  XLogRecPtr write;
3215  XLogRecPtr flush;
3216  XLogRecPtr apply;
3217  TimeOffset writeLag;
3218  TimeOffset flushLag;
3219  TimeOffset applyLag;
3220  int priority;
3221  int pid;
3224  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3225 
3226  SpinLockAcquire(&walsnd->mutex);
3227  if (walsnd->pid == 0)
3228  {
3229  SpinLockRelease(&walsnd->mutex);
3230  continue;
3231  }
3232  pid = walsnd->pid;
3233  sentPtr = walsnd->sentPtr;
3234  state = walsnd->state;
3235  write = walsnd->write;
3236  flush = walsnd->flush;
3237  apply = walsnd->apply;
3238  writeLag = walsnd->writeLag;
3239  flushLag = walsnd->flushLag;
3240  applyLag = walsnd->applyLag;
3241  priority = walsnd->sync_standby_priority;
3242  SpinLockRelease(&walsnd->mutex);
3243 
3244  memset(nulls, 0, sizeof(nulls));
3245  values[0] = Int32GetDatum(pid);
3246 
3248  {
3249  /*
3250  * Only superusers and members of pg_read_all_stats can see details.
3251  * Other users only get the pid value to know it's a walsender,
3252  * but no details.
3253  */
3254  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3255  }
3256  else
3257  {
3258  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3259 
3260  if (XLogRecPtrIsInvalid(sentPtr))
3261  nulls[2] = true;
3262  values[2] = LSNGetDatum(sentPtr);
3263 
3264  if (XLogRecPtrIsInvalid(write))
3265  nulls[3] = true;
3266  values[3] = LSNGetDatum(write);
3267 
3268  if (XLogRecPtrIsInvalid(flush))
3269  nulls[4] = true;
3270  values[4] = LSNGetDatum(flush);
3271 
3272  if (XLogRecPtrIsInvalid(apply))
3273  nulls[5] = true;
3274  values[5] = LSNGetDatum(apply);
3275 
3276  /*
3277  * Treat a standby such as a pg_basebackup background process
3278  * which always returns an invalid flush location, as an
3279  * asynchronous standby.
3280  */
3281  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3282 
3283  if (writeLag < 0)
3284  nulls[6] = true;
3285  else
3286  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3287 
3288  if (flushLag < 0)
3289  nulls[7] = true;
3290  else
3291  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3292 
3293  if (applyLag < 0)
3294  nulls[8] = true;
3295  else
3296  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3297 
3298  values[9] = Int32GetDatum(priority);
3299 
3300  /*
3301  * More easily understood version of standby state. This is purely
3302  * informational.
3303  *
3304  * In quorum-based sync replication, the role of each standby
3305  * listed in synchronous_standby_names can be changing very
3306  * frequently. Any standbys considered as "sync" at one moment can
3307  * be switched to "potential" ones at the next moment. So, it's
3308  * basically useless to report "sync" or "potential" as their sync
3309  * states. We report just "quorum" for them.
3310  */
3311  if (priority == 0)
3312  values[10] = CStringGetTextDatum("async");
3313  else if (list_member_int(sync_standbys, i))
3315  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3316  else
3317  values[10] = CStringGetTextDatum("potential");
3318  }
3319 
3320  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3321  }
3322 
3323  /* clean up and return the tuplestore */
3324  tuplestore_donestoring(tupstore);
3325 
3326  return (Datum) 0;
3327 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:564
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:284
#define write(a, b, c)
Definition: win32.h:14
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:897
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:678
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1724
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
bool list_member_int(const List *list, int datum)
Definition: list.c:485
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:121
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:365
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3132
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:113
#define DEFAULT_ROLE_READ_ALL_STATS
Definition: pg_authid.h:108
static XLogRecPtr sentPtr
Definition: walsender.c:158
int allowedModes
Definition: execnodes.h:282
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4857
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:284
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1120
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:216
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:287
static Datum values[MAXATTR]
Definition: bootstrap.c:164
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:280
#define Int32GetDatum(X)
Definition: postgres.h:462
TupleDesc setDesc
Definition: execnodes.h:288
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3151
XLogRecPtr apply
Definition: pg_list.h:45

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1725 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1726 {
1727  bool changed = false;
1729 
1730  Assert(lsn != InvalidXLogRecPtr);
1731  SpinLockAcquire(&slot->mutex);
1732  if (slot->data.restart_lsn != lsn)
1733  {
1734  changed = true;
1735  slot->data.restart_lsn = lsn;
1736  }
1737  SpinLockRelease(&slot->mutex);
1738 
1739  if (changed)
1740  {
1743  }
1744 
1745  /*
1746  * One could argue that the slot should be saved to disk now, but that'd
1747  * be energy wasted - the worst lost information can do here is give us
1748  * wrong information in a statistics view - we'll just potentially be more
1749  * conservative in removing files.
1750  */
1751 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:120
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:741
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:688
XLogRecPtr restart_lsn
Definition: slot.h:73
slock_t mutex
Definition: slot.h:93
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1849 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1850 {
1851  bool changed = false;
1853 
1854  SpinLockAcquire(&slot->mutex);
1856 
1857  /*
1858  * For physical replication we don't need the interlock provided by xmin
1859  * and effective_xmin since the consequences of a missed increase are
1860  * limited to query cancellations, so set both at once.
1861  */
1862  if (!TransactionIdIsNormal(slot->data.xmin) ||
1863  !TransactionIdIsNormal(feedbackXmin) ||
1864  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1865  {
1866  changed = true;
1867  slot->data.xmin = feedbackXmin;
1868  slot->effective_xmin = feedbackXmin;
1869  }
1870  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1871  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1872  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1873  {
1874  changed = true;
1875  slot->data.catalog_xmin = feedbackCatalogXmin;
1876  slot->effective_catalog_xmin = feedbackCatalogXmin;
1877  }
1878  SpinLockRelease(&slot->mutex);
1879 
1880  if (changed)
1881  {
1884  }
1885 }
TransactionId xmin
Definition: proc.h:225
ReplicationSlotPersistentData data
Definition: slot.h:120
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:116
TransactionId catalog_xmin
Definition: slot.h:70
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:62
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:117
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:93
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:695
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1592 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1593 {
1594  unsigned char firstchar;
1595  int r;
1596  bool received = false;
1597 
1598  for (;;)
1599  {
1600  pq_startmsgread();
1601  r = pq_getbyte_if_available(&firstchar);
1602  if (r < 0)
1603  {
1604  /* unexpected error or EOF */
1606  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1607  errmsg("unexpected EOF on standby connection")));
1608  proc_exit(0);
1609  }
1610  if (r == 0)
1611  {
1612  /* no data available without blocking */
1613  pq_endmsgread();
1614  break;
1615  }
1616 
1617  /* Read the message contents */
1619  if (pq_getmessage(&reply_message, 0))
1620  {
1622  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1623  errmsg("unexpected EOF on standby connection")));
1624  proc_exit(0);
1625  }
1626 
1627  /*
1628  * If we already received a CopyDone from the frontend, the frontend
1629  * should not send us anything until we've closed our end of the COPY.
1630  * XXX: In theory, the frontend could already send the next command
1631  * before receiving the CopyDone, but libpq doesn't currently allow
1632  * that.
1633  */
1634  if (streamingDoneReceiving && firstchar != 'X')
1635  ereport(FATAL,
1636  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1637  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1638  firstchar)));
1639 
1640  /* Handle the very limited subset of commands expected in this phase */
1641  switch (firstchar)
1642  {
1643  /*
1644  * 'd' means a standby reply wrapped in a CopyData packet.
1645  */
1646  case 'd':
1648  received = true;
1649  break;
1650 
1651  /*
1652  * CopyDone means the standby requested to finish streaming.
1653  * Reply with CopyDone, if we had not sent that already.
1654  */
1655  case 'c':
1656  if (!streamingDoneSending)
1657  {
1658  pq_putmessage_noblock('c', NULL, 0);
1659  streamingDoneSending = true;
1660  }
1661 
1662  streamingDoneReceiving = true;
1663  received = true;
1664  break;
1665 
1666  /*
1667  * 'X' means that the standby is closing down the socket.
1668  */
1669  case 'X':
1670  proc_exit(0);
1671 
1672  default:
1673  ereport(FATAL,
1674  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1675  errmsg("invalid standby message type \"%c\"",
1676  firstchar)));
1677  }
1678  }
1679 
1680  /*
1681  * Save the last reply timestamp if we've received at least one reply.
1682  */
1683  if (received)
1684  {
1686  waiting_for_ping_response = false;
1687  }
1688 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1694
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:575
void pq_startmsgread(void)
Definition: pqcomm.c:1210
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1040
static bool streamingDoneSending
Definition: walsender.c:180
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:122
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1272
static StringInfoData reply_message
Definition: walsender.c:162
void pq_endmsgread(void)
Definition: pqcomm.c:1234
static bool streamingDoneReceiving
Definition: walsender.c:181
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:172
static TimestampTz last_reply_timestamp
Definition: walsender.c:169

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1926 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, MyPgXact, MyReplicationSlot, PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1927 {
1928  TransactionId feedbackXmin;
1929  uint32 feedbackEpoch;
1930  TransactionId feedbackCatalogXmin;
1931  uint32 feedbackCatalogEpoch;
1932 
1933  /*
1934  * Decipher the reply message. The caller already consumed the msgtype
1935  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1936  * of this message.
1937  */
1938  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1939  feedbackXmin = pq_getmsgint(&reply_message, 4);
1940  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1941  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1942  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1943 
1944  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1945  feedbackXmin,
1946  feedbackEpoch,
1947  feedbackCatalogXmin,
1948  feedbackCatalogEpoch);
1949 
1950  /*
1951  * Unset WalSender's xmins if the feedback message values are invalid.
1952  * This happens when the downstream turned hot_standby_feedback off.
1953  */
1954  if (!TransactionIdIsNormal(feedbackXmin)
1955  && !TransactionIdIsNormal(feedbackCatalogXmin))
1956  {
1958  if (MyReplicationSlot != NULL)
1959  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1960  return;
1961  }
1962 
1963  /*
1964  * Check that the provided xmin/epoch are sane, that is, not in the future
1965  * and not so far back as to be already wrapped around. Ignore if not.
1966  */
1967  if (TransactionIdIsNormal(feedbackXmin) &&
1968  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1969  return;
1970 
1971  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1972  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1973  return;
1974 
1975  /*
1976  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1977  * the xmin will be taken into account by GetOldestXmin. This will hold
1978  * back the removal of dead rows and thereby prevent the generation of
1979  * cleanup conflicts on the standby server.
1980  *
1981  * There is a small window for a race condition here: although we just
1982  * checked that feedbackXmin precedes nextXid, the nextXid could have
1983  * gotten advanced between our fetching it and applying the xmin below,
1984  * perhaps far enough to make feedbackXmin wrap around. In that case the
1985  * xmin we set here would be "in the future" and have no effect. No point
1986  * in worrying about this since it's too late to save the desired data
1987  * anyway. Assuming that the standby sends us an increasing sequence of
1988  * xmins, this could only happen during the first reply cycle, else our
1989  * own xmin would prevent nextXid from advancing so far.
1990  *
1991  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1992  * is assumed atomic, and there's no real need to prevent a concurrent
1993  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1994  * safe, and if we're moving it backwards, well, the data is at risk
1995  * already since a VACUUM could have just finished calling GetOldestXmin.)
1996  *
1997  * If we're using a replication slot we reserve the xmin via that,
1998  * otherwise via the walsender's PGXACT entry. We can only track the
1999  * catalog xmin separately when using a slot, so we store the least of the
2000  * two provided when not using a slot.
2001  *
2002  * XXX: It might make sense to generalize the ephemeral slot concept and
2003  * always use the slot mechanism to handle the feedback xmin.
2004  */
2005  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2006  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2007  else
2008  {
2009  if (TransactionIdIsNormal(feedbackCatalogXmin)
2010  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2011  MyPgXact->xmin = feedbackCatalogXmin;
2012  else
2013  MyPgXact->xmin = feedbackXmin;
2014  }
2015 }
uint32 TransactionId
Definition: c.h:463
TransactionId xmin
Definition: proc.h:225
PGXACT * MyPgXact
Definition: proc.c:68
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1898
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:314
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:162
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1849
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define elog
Definition: elog.h:219
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1694 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1695 {
1696  char msgtype;
1697 
1698  /*
1699  * Check message type from the first byte.
1700  */
1701  msgtype = pq_getmsgbyte(&reply_message);
1702 
1703  switch (msgtype)
1704  {
1705  case 'r':
1707  break;
1708 
1709  case 'h':
1711  break;
1712 
1713  default:
1715  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1716  errmsg("unexpected message type \"%c\"", msgtype)));
1717  proc_exit(0);
1718  }
1719 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:575
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static StringInfoData reply_message
Definition: walsender.c:162
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1757
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1926

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1757 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1758 {
1759  XLogRecPtr writePtr,
1760  flushPtr,
1761  applyPtr;
1762  bool replyRequested;
1763  TimeOffset writeLag,
1764  flushLag,
1765  applyLag;
1766  bool clearLagTimes;
1767  TimestampTz now;
1768 
1769  static bool fullyAppliedLastTime = false;
1770 
1771  /* the caller already consumed the msgtype byte */
1772  writePtr = pq_getmsgint64(&reply_message);
1773  flushPtr = pq_getmsgint64(&reply_message);
1774  applyPtr = pq_getmsgint64(&reply_message);
1775  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1776  replyRequested = pq_getmsgbyte(&reply_message);
1777 
1778  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1779  (uint32) (writePtr >> 32), (uint32) writePtr,
1780  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1781  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1782  replyRequested ? " (reply requested)" : "");
1783 
1784  /* See if we can compute the round-trip lag for these positions. */
1785  now = GetCurrentTimestamp();
1786  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1787  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1788  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1789 
1790  /*
1791  * If the standby reports that it has fully replayed the WAL in two
1792  * consecutive reply messages, then the second such message must result
1793  * from wal_receiver_status_interval expiring on the standby. This is a
1794  * convenient time to forget the lag times measured when it last
1795  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1796  * until more WAL traffic arrives.
1797  */
1798  clearLagTimes = false;
1799  if (applyPtr == sentPtr)
1800  {
1801  if (fullyAppliedLastTime)
1802  clearLagTimes = true;
1803  fullyAppliedLastTime = true;
1804  }
1805  else
1806  fullyAppliedLastTime = false;
1807 
1808  /* Send a reply if the standby requested one. */
1809  if (replyRequested)
1810  WalSndKeepalive(false);
1811 
1812  /*
1813  * Update shared state for this WalSender process based on reply data from
1814  * standby.
1815  */
1816  {
1817  WalSnd *walsnd = MyWalSnd;
1818 
1819  SpinLockAcquire(&walsnd->mutex);
1820  walsnd->write = writePtr;
1821  walsnd->flush = flushPtr;
1822  walsnd->apply = applyPtr;
1823  if (writeLag != -1 || clearLagTimes)
1824  walsnd->writeLag = writeLag;
1825  if (flushLag != -1 || clearLagTimes)
1826  walsnd->flushLag = flushLag;
1827  if (applyLag != -1 || clearLagTimes)
1828  walsnd->applyLag = applyLag;
1829  SpinLockRelease(&walsnd->mutex);
1830  }
1831 
1834 
1835  /*
1836  * Advance our local xmin horizon when the client confirmed a flush.
1837  */
1838  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1839  {
1842  else
1844  }
1845 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3335
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1725
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:314
#define SlotIsLogical(slot)
Definition: slot.h:142
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:162
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3458
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:158
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:934
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define elog
Definition: elog.h:219
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:410
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
bool am_cascading_walsender
Definition: walsender.c:116
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 428 of file walsender.c.

References buf, BYTEAOID, CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TEXTOID, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

429 {
431  char histfname[MAXFNAMELEN];
432  char path[MAXPGPATH];
433  int fd;
434  off_t histfilelen;
435  off_t bytesleft;
436  Size len;
437 
438  /*
439  * Reply with a result set with one row, and two columns. The first col is
440  * the name of the history file, 2nd is the contents.
441  */
442 
443  TLHistoryFileName(histfname, cmd->timeline);
444  TLHistoryFilePath(path, cmd->timeline);
445 
446  /* Send a RowDescription message */
447  pq_beginmessage(&buf, 'T');
448  pq_sendint16(&buf, 2); /* 2 fields */
449 
450  /* first field */
451  pq_sendstring(&buf, "filename"); /* col name */
452  pq_sendint32(&buf, 0); /* table oid */
453  pq_sendint16(&buf, 0); /* attnum */
454  pq_sendint32(&buf, TEXTOID); /* type oid */
455  pq_sendint16(&buf, -1); /* typlen */
456  pq_sendint32(&buf, 0); /* typmod */
457  pq_sendint16(&buf, 0); /* format code */
458 
459  /* second field */
460  pq_sendstring(&buf, "content"); /* col name */
461  pq_sendint32(&buf, 0); /* table oid */
462  pq_sendint16(&buf, 0); /* attnum */
463  pq_sendint32(&buf, BYTEAOID); /* type oid */
464  pq_sendint16(&buf, -1); /* typlen */
465  pq_sendint32(&buf, 0); /* typmod */
466  pq_sendint16(&buf, 0); /* format code */
467  pq_endmessage(&buf);
468 
469  /* Send a DataRow message */
470  pq_beginmessage(&buf, 'D');
471  pq_sendint16(&buf, 2); /* # of columns */
472  len = strlen(histfname);
473  pq_sendint32(&buf, len); /* col1 len */
474  pq_sendbytes(&buf, histfname, len);
475 
476  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
477  if (fd < 0)
478  ereport(ERROR,
480  errmsg("could not open file \"%s\": %m", path)));
481 
482  /* Determine file length and send it to client */
483  histfilelen = lseek(fd, 0, SEEK_END);
484  if (histfilelen < 0)
485  ereport(ERROR,
487  errmsg("could not seek to end of file \"%s\": %m", path)));
488  if (lseek(fd, 0, SEEK_SET) != 0)
489  ereport(ERROR,
491  errmsg("could not seek to beginning of file \"%s\": %m", path)));
492 
493  pq_sendint32(&buf, histfilelen); /* col2 len */
494 
495  bytesleft = histfilelen;
496  while (bytesleft > 0)
497  {
498  char rbuf[BLCKSZ];
499  int nread;
500 
502  nread = read(fd, rbuf, sizeof(rbuf));
504  if (nread <= 0)
505  ereport(ERROR,
507  errmsg("could not read file \"%s\": %m",
508  path)));
509  pq_sendbytes(&buf, rbuf, nread);
510  bytesleft -= nread;
511  }
512  CloseTransientFile(fd);
513 
514  pq_endmessage(&buf);
515 }
#define TEXTOID
Definition: pg_type.h:324
static void pq_sendint32(StringInfo buf, int32 i)
Definition: pqformat.h:148
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1069
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2403
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:67
int errcode_for_file_access(void)
Definition: elog.c:598
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2573
#define MAXFNAMELEN
size_t Size
Definition: c.h:422
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1236
#define BYTEAOID
Definition: pg_type.h:292
static void pq_sendint16(StringInfo buf, int16 i)
Definition: pqformat.h:140
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1041 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1042 {
1044 
1045  /* make sure that our requirements are still fulfilled */
1047 
1049 
1050  ReplicationSlotAcquire(cmd->slotname, true);
1051 
1052  /*
1053  * Force a disconnect, so that the decoding code doesn't need to care
1054  * about an eventual switch from running in recovery, to running in a
1055  * normal environment. Client code is expected to handle reconnects.
1056  */
1058  {
1059  ereport(LOG,
1060  (errmsg("terminating walsender process after promotion")));
1061  got_STOPPING = true;
1062  }
1063 
1065 
1066  /* Send a CopyBothResponse message, and start streaming */
1067  pq_beginmessage(&buf, 'W');
1068  pq_sendbyte(&buf, 0);
1069  pq_sendint16(&buf, 0);
1070  pq_endmessage(&buf);
1071  pq_flush();
1072 
1073  /*
1074  * Initialize position to the last ack'ed one, then the xlog records begin
1075  * to be shipped from that position.
1076  */
1078  false,
1083 
1084  /* Start reading WAL from the oldest required WAL. */
1086 
1087  /*
1088  * Report the location after which we'll send out further commits as the
1089  * current sentPtr.
1090  */
1092 
1093  /* Also update the sent position status in shared memory */
1097 
1098  replication_active = true;
1099 
1101 
1102  /* Main loop of walsender */
1104 
1107 
1108  replication_active = false;
1109  if (got_STOPPING)
1110  proc_exit(0);
1112 
1113  /* Get out of COPY mode (CommandComplete). */
1114  EndCommand("COPY 0", DestRemote);
1115 }
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
void proc_exit(int code)
Definition: ipc.c:104
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
ReplicationSlotPersistentData data
Definition: slot.h:120
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7919
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:81
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:749
#define SpinLockAcquire(lock)
Definition: spin.h:62
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:198
static char * buf
Definition: pg_test_fsync.c:67
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1153
static XLogRecPtr logical_startptr
Definition: walsender.c:199
void ReplicationSlotRelease(void)
Definition: slot.c:416
void SyncRepInitConfig(void)
Definition: syncrep.c:382
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2094
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1126
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
XLogRecPtr sentPtr
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:348
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:158
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:688
void WalSndSetState(WalSndState state)
Definition: walsender.c:3113
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:481
static void XLogSendLogical(void)
Definition: walsender.c:2755
XLogRecPtr restart_lsn
Definition: slot.h:73
static void pq_sendint16(StringInfo buf, int16 i)
Definition: pqformat.h:140
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
bool am_cascading_walsender
Definition: walsender.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1251

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 524 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), INT8OID, InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TEXTOID, ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TupleDescInitBuiltinEntry(), values, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

525 {
527  XLogRecPtr FlushPtr;
528 
529  if (ThisTimeLineID == 0)
530  ereport(ERROR,
531  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
532  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
533 
534  /*
535  * We assume here that we're logging enough information in the WAL for
536  * log-shipping, since this is checked in PostmasterMain().
537  *
538  * NOTE: wal_level can only change at shutdown, so in most cases it is
539  * difficult for there to be WAL data that we can still see that was
540  * written at wal_level='minimal'.
541  */
542 
543  if (cmd->slotname)
544  {
545  ReplicationSlotAcquire(cmd->slotname, true);
547  ereport(ERROR,
548  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
549  (errmsg("cannot use a logical replication slot for physical replication"))));
550  }
551 
552  /*
553  * Select the timeline. If it was given explicitly by the client, use
554  * that. Otherwise use the timeline of the last replayed record, which is
555  * kept in ThisTimeLineID.
556  */
558  {
559  /* this also updates ThisTimeLineID */
560  FlushPtr = GetStandbyFlushRecPtr();
561  }
562  else
563  FlushPtr = GetFlushRecPtr();
564 
565  if (cmd->timeline != 0)
566  {
567  XLogRecPtr switchpoint;
568 
569  sendTimeLine = cmd->timeline;
571  {
572  sendTimeLineIsHistoric = false;
574  }
575  else
576  {
577  List *timeLineHistory;
578 
579  sendTimeLineIsHistoric = true;
580 
581  /*
582  * Check that the timeline the client requested exists, and the
583  * requested start location is on that timeline.
584  */
585  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
586  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
588  list_free_deep(timeLineHistory);
589 
590  /*
591  * Found the requested timeline in the history. Check that
592  * requested startpoint is on that timeline in our history.
593  *
594  * This is quite loose on purpose. We only check that we didn't
595  * fork off the requested timeline before the switchpoint. We
596  * don't check that we switched *to* it before the requested
597  * starting point. This is because the client can legitimately
598  * request to start replication from the beginning of the WAL
599  * segment that contains switchpoint, but on the new timeline, so
600  * that it doesn't end up with a partial segment. If you ask for
601  * too old a starting point, you'll get an error later when we
602  * fail to find the requested WAL segment in pg_wal.
603  *
604  * XXX: we could be more strict here and only allow a startpoint
605  * that's older than the switchpoint, if it's still in the same
606  * WAL segment.
607  */
608  if (!XLogRecPtrIsInvalid(switchpoint) &&
609  switchpoint < cmd->startpoint)
610  {
611  ereport(ERROR,
612  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
613  (uint32) (cmd->startpoint >> 32),
614  (uint32) (cmd->startpoint),
615  cmd->timeline),
616  errdetail("This server's history forked from timeline %u at %X/%X.",
617  cmd->timeline,
618  (uint32) (switchpoint >> 32),
619  (uint32) (switchpoint))));
620  }
621  sendTimeLineValidUpto = switchpoint;
622  }
623  }
624  else
625  {
628  sendTimeLineIsHistoric = false;
629  }
630 
632 
633  /* If there is nothing to stream, don't even enter COPY mode */
635  {
636  /*
637  * When we first start replication the standby will be behind the
638  * primary. For some applications, for example synchronous
639  * replication, it is important to have a clear state for this initial
640  * catchup mode, so we can trigger actions when we change streaming
641  * state later. We may stay in this state for a long time, which is
642  * exactly why we want to be able to monitor whether or not we are
643  * still here.
644  */
646 
647  /* Send a CopyBothResponse message, and start streaming */
648  pq_beginmessage(&buf, 'W');
649  pq_sendbyte(&buf, 0);
650  pq_sendint16(&buf, 0);
651  pq_endmessage(&buf);
652  pq_flush();
653 
654  /*
655  * Don't allow a request to stream from a future point in WAL that
656  * hasn't been flushed to disk in this server yet.
657  */
658  if (FlushPtr < cmd->startpoint)
659  {
660  ereport(ERROR,
661  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
662  (uint32) (cmd->startpoint >> 32),
663  (uint32) (cmd->startpoint),
664  (uint32) (FlushPtr >> 32),
665  (uint32) (FlushPtr))));
666  }
667 
668  /* Start streaming from the requested point */
669  sentPtr = cmd->startpoint;
670 
671  /* Initialize shared memory status, too */
675 
677 
678  /* Main loop of walsender */
679  replication_active = true;
680 
682 
683  replication_active = false;
684  if (got_STOPPING)
685  proc_exit(0);
687 
689  }
690 
691  if (cmd->slotname)
693 
694  /*
695  * Copy is finished now. Send a single-row result set indicating the next
696  * timeline.
697  */
699  {
700  char startpos_str[8 + 1 + 8 + 1];
702  TupOutputState *tstate;
703  TupleDesc tupdesc;
704  Datum values[2];
705  bool nulls[2];
706 
707  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
708  (uint32) (sendTimeLineValidUpto >> 32),
710 
712  MemSet(nulls, false, sizeof(nulls));
713 
714  /*
715  * Need a tuple descriptor representing two columns. int8 may seem
716  * like a surprising data type for this, but in theory int4 would not
717  * be wide enough for this, as TimeLineID is unsigned.
718  */
719  tupdesc = CreateTemplateTupleDesc(2, false);
720  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
721  INT8OID, -1, 0);
722  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
723  TEXTOID, -1, 0);
724 
725  /* prepare for projection of tuple */
726  tstate = begin_tup_output_tupdesc(dest, tupdesc);
727 
728  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
729  values[1] = CStringGetTextDatum(startpos_str);
730 
731  /* send it to dest */
732  do_tup_output(tstate, values, nulls);
733 
734  end_tup_output(tstate);
735  }
736 
737  /* Send CommandComplete message */
738  pq_puttextmessage('C', "START_STREAMING");
739 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
#define TEXTOID
Definition: pg_type.h:324
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2492
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1309
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:575
#define MemSet(start, val, len)
Definition: c.h:897
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8251
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
void list_free_deep(List *list)
Definition: list.c:1147
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1367
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:614
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:67
static bool streamingDoneSending
Definition: walsender.c:180
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
int errdetail(const char *fmt,...)
Definition: elog.c:873
unsigned int uint32
Definition: c.h:314
void ReplicationSlotRelease(void)
Definition: slot.c:416
#define SlotIsLogical(slot)
Definition: slot.h:142
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1786
void SyncRepInitConfig(void)
Definition: syncrep.c:382
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1291
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2094
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
uintptr_t Datum
Definition: postgres.h:365
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:561
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:158
TimeLineID ThisTimeLineID
Definition: xlog.c:181
#define INT8OID
Definition: pg_type.h:304
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:149
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:688
void WalSndSetState(WalSndState state)
Definition: walsender.c:3113
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:150
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:152
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:43
static bool streamingDoneReceiving
Definition: walsender.c:181
static void pq_sendint16(StringInfo buf, int16 i)
Definition: pqformat.h:140
static Datum values[MAXATTR]
Definition: bootstrap.c:164
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2865
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
Definition: pg_list.h:45
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:151
bool am_cascading_walsender
Definition: walsender.c:116

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 1898 of file walsender.c.

References GetNextXidAndEpoch(), and TransactionIdPrecedesOrEquals().

Referenced by ProcessStandbyHSFeedbackMessage().

1899 {
1900  TransactionId nextXid;
1901  uint32 nextEpoch;
1902 
1903  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1904 
1905  if (xid <= nextXid)
1906  {
1907  if (epoch != nextEpoch)
1908  return false;
1909  }
1910  else
1911  {
1912  if (epoch + 1 != nextEpoch)
1913  return false;
1914  }
1915 
1916  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1917  return false; /* epoch OK, but it's wrapped around */
1918 
1919  return true;
1920 }
uint32 TransactionId
Definition: c.h:463
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8320
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:314
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( TimestampTz  now)
static

Definition at line 2067 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2068 {
2069  TimestampTz timeout;
2070 
2071  /* don't bail out if we're doing something that doesn't require timeouts */
2072  if (last_reply_timestamp <= 0)
2073  return;
2074 
2077 
2078  if (wal_sender_timeout > 0 && now >= timeout)
2079  {
2080  /*
2081  * Since typically expiration of replication timeout means
2082  * communication problem, we don't send the error message to the
2083  * standby.
2084  */
2086  (errmsg("terminating walsender process due to replication timeout")));
2087 
2088  WalSndShutdown();
2089  }
2090 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:797
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2025 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2026 {
2027  long sleeptime = 10000; /* 10 s */
2028 
2030  {
2031  TimestampTz wakeup_time;
2032  long sec_to_timeout;
2033  int microsec_to_timeout;
2034 
2035  /*
2036  * At the latest stop sleeping once wal_sender_timeout has been
2037  * reached.
2038  */
2041 
2042  /*
2043  * If no ping has been sent yet, wakeup when it's time to do so.
2044  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2045  * the timeout passed without a response.
2046  */
2049  wal_sender_timeout / 2);
2050 
2051  /* Compute relative time until wakeup. */
2052  TimestampDifference(now, wakeup_time,
2053  &sec_to_timeout, &microsec_to_timeout);
2054 
2055  sleeptime = sec_to_timeout * 1000 +
2056  microsec_to_timeout / 1000;
2057  }
2058 
2059  return sleeptime;
2060 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:42
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
static bool waiting_for_ping_response
Definition: walsender.c:172
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2825 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2826 {
2827  XLogRecPtr replicatedPtr;
2828 
2829  /* ... let's just be real sure we're caught up ... */
2830  send_data();
2831 
2832  /*
2833  * To figure out whether all WAL has successfully been replicated, check
2834  * flush location if valid, write otherwise. Tools like pg_receivewal will
2835  * usually (unless in synchronous mode) return an invalid flush location.
2836  */
2837  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2839 
2840  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2841  !pq_is_send_pending())
2842  {
2843  /* Inform the standby that XLOG streaming is done */
2844  EndCommand("COPY 0", DestRemote);
2845  pq_flush();
2846 
2847  proc_exit(0);
2848  }
2850  {
2851  WalSndKeepalive(true);
2853  }
2854 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3335
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static bool WalSndCaughtUp
Definition: walsender.c:184
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:158
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:172

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 292 of file walsender.c.

References close, ConditionVariableCancelSleep(), DestNone, DestRemote, got_SIGUSR2, got_STOPPING, LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, WalSndSetState(), WalSndShutdown(), WALSNDSTATE_STARTUP, and whereToSendOutput.

Referenced by PostgresMain().

293 {
297 
298  if (sendFile >= 0)
299  {
300  close(sendFile);
301  sendFile = -1;
302  }
303 
304  if (MyReplicationSlot != NULL)
306 
308 
309  replication_active = false;
310 
311  if (got_STOPPING || got_SIGUSR2)
312  proc_exit(0);
313 
314  /* Revert back to startup state */
316 }
static int sendFile
Definition: walsender.c:136
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
void proc_exit(int code)
Definition: ipc.c:104
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:416
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3113
void ReplicationSlotCleanup(void)
Definition: slot.c:471
void LWLockReleaseAll(void)
Definition: lwlock.c:1823
#define close(a)
Definition: win32.h:12

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3132 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3133 {
3134  switch (state)
3135  {
3136  case WALSNDSTATE_STARTUP:
3137  return "startup";
3138  case WALSNDSTATE_BACKUP:
3139  return "backup";
3140  case WALSNDSTATE_CATCHUP:
3141  return "catchup";
3142  case WALSNDSTATE_STREAMING:
3143  return "streaming";
3144  case WALSNDSTATE_STOPPING:
3145  return "stopping";
3146  }
3147  return "UNKNOWN";
3148 }
Definition: regguts.h:298

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3049 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3050 {
3051  int i;
3052 
3053  for (i = 0; i < max_wal_senders; i++)
3054  {
3055  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3056  pid_t pid;
3057 
3058  SpinLockAcquire(&walsnd->mutex);
3059  pid = walsnd->pid;
3060  SpinLockRelease(&walsnd->mutex);
3061 
3062  if (pid == 0)
3063  continue;
3064 
3066  }
3067 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3335 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3336 {
3337  elog(DEBUG2, "sending replication keepalive");
3338 
3339  /* construct the message... */
3341  pq_sendbyte(&output_message, 'k');
3344  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3345 
3346  /* ... and send it wrapped in CopyData */
3348 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
static StringInfoData output_message
Definition: walsender.c:161
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static XLogRecPtr sentPtr
Definition: walsender.c:158
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog
Definition: elog.h:219

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( TimestampTz  now)
static

Definition at line 3354 of file walsender.c.

References last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3355 {
3356  TimestampTz ping_time;
3357 
3358  /*
3359  * Don't send keepalive messages if timeouts are globally disabled or
3360  * we're doing something not partaking in timeouts.
3361  */
3362  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3363  return;
3364 
3366  return;
3367 
3368  /*
3369  * If half of wal_sender_timeout has lapsed without receiving any reply
3370  * from the standby, send a keep-alive message to the standby requesting
3371  * an immediate reply.
3372  */
3374  wal_sender_timeout / 2);
3375  if (now >= ping_time)
3376  {
3377  WalSndKeepalive(true);
3379 
3380  /* Try to flush pending output to the client */
3381  if (pq_flush_if_writable() != 0)
3382  WalSndShutdown();
3383  }
3384 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3335
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:172
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2289 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2290 {
2291  WalSnd *walsnd = MyWalSnd;
2292 
2293  Assert(walsnd != NULL);
2294 
2295  MyWalSnd = NULL;
2296 
2297  SpinLockAcquire(&walsnd->mutex);
2298  /* clear latch while holding the spinlock, so it can safely be read */
2299  walsnd->latch = NULL;
2300  /* Mark WalSnd struct as no longer being in use. */
2301  walsnd->pid = 0;
2302  SpinLockRelease(&walsnd->mutex);
2303 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:688

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2940 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2941 {
2942  int save_errno = errno;
2943 
2944  got_SIGUSR2 = true;
2945  SetLatch(MyLatch);
2946 
2947  errno = save_errno;
2948 }
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
struct Latch * MyLatch
Definition: globals.c:52

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2094 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, now(), PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

2095 {
2096  /*
2097  * Initialize the last reply timestamp. That enables timeout processing
2098  * from hereon.
2099  */
2101  waiting_for_ping_response = false;
2102 
2103  /*
2104  * Loop until we reach the end of this timeline or the client requests to
2105  * stop streaming.
2106  */
2107  for (;;)
2108  {
2109  TimestampTz now;
2110 
2111  /*
2112  * Emergency bailout if postmaster has died. This is to avoid the
2113  * necessity for manual cleanup of all postmaster children.
2114  */
2115  if (!PostmasterIsAlive())
2116  exit(1);
2117 
2118  /* Clear any already-pending wakeups */
2120 
2122 
2123  /* Process any requests or signals received recently */
2124  if (ConfigReloadPending)
2125  {
2126  ConfigReloadPending = false;
2129  }
2130 
2131  /* Check for input from the client */
2133 
2134  /*
2135  * If we have received CopyDone from the client, sent CopyDone
2136  * ourselves, and the output buffer is empty, it's time to exit
2137  * streaming.
2138  */
2140  !pq_is_send_pending())
2141  break;
2142 
2143  /*
2144  * If we don't have any pending data in the output buffer, try to send
2145  * some more. If there is some, we don't bother to call send_data
2146  * again until we've flushed it ... but we'd better assume we are not
2147  * caught up.
2148  */
2149  if (!pq_is_send_pending())
2150  send_data();
2151  else
2152  WalSndCaughtUp = false;
2153 
2154  /* Try to flush pending output to the client */
2155  if (pq_flush_if_writable() != 0)
2156  WalSndShutdown();
2157 
2158  /* If nothing remains to be sent right now ... */
2160  {
2161  /*
2162  * If we're in catchup state, move to streaming. This is an
2163  * important state change for users to know about, since before
2164  * this point data loss might occur if the primary dies and we
2165  * need to failover to the standby. The state change is also
2166  * important for synchronous replication, since commits that
2167  * started to wait at that point might wait for some time.
2168  */
2170  {
2171  ereport(DEBUG1,
2172  (errmsg("standby \"%s\" has now caught up with primary",
2173  application_name)));
2175  }
2176 
2177  /*
2178  * When SIGUSR2 arrives, we send any outstanding logs up to the
2179  * shutdown checkpoint record (i.e., the latest record), wait for
2180  * them to be replicated to the standby, and exit. This may be a
2181  * normal termination at shutdown, or a promotion, the walsender
2182  * is not sure which.
2183  */
2184  if (got_SIGUSR2)
2185  WalSndDone(send_data);
2186  }
2187 
2188  now = GetCurrentTimestamp();
2189 
2190  /* Check for replication timeout. */
2191  WalSndCheckTimeOut(now);
2192 
2193  /* Send keepalive if the time has come */
2195 
2196  /*
2197  * We don't block if not caught up, unless there is unsent data
2198  * pending in which case we'd better block until the socket is
2199  * write-ready. This test is only needed for the case where the
2200  * send_data callback handled a subset of the available data but then
2201  * pq_flush_if_writable flushed it all --- we should immediately try
2202  * to send more.
2203  */
2205  {
2206  long sleeptime;
2207  int wakeEvents;
2208 
2209  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2211 
2212  sleeptime = WalSndComputeSleeptime(now);
2213 
2214  if (pq_is_send_pending())
2215  wakeEvents |= WL_SOCKET_WRITEABLE;
2216 
2217  /* Sleep until something happens or we time out */
2218  WaitLatchOrSocket(MyLatch, wakeEvents,
2219  MyProcPort->sock, sleeptime,
2221  }
2222  }
2223  return;
2224 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:41
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2825
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:42
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
pgsocket sock
Definition: libpq-be.h:118
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:180
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
void SyncRepInitConfig(void)
Definition: syncrep.c:382
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:184
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3354
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2025
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2067
void WalSndSetState(WalSndState state)
Definition: walsender.c:3113
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static bool streamingDoneReceiving
Definition: walsender.c:181
char * application_name
Definition: guc.c:470
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static bool waiting_for_ping_response
Definition: walsender.c:172
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1592
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1126 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1127 {
1128  /* can't have sync rep confused by sending the same LSN several times */
1129  if (!last_write)
1130  lsn = InvalidXLogRecPtr;
1131 
1132  resetStringInfo(ctx->out);
1133 
1134  pq_sendbyte(ctx->out, 'w');
1135  pq_sendint64(ctx->out, lsn); /* dataStart */
1136  pq_sendint64(ctx->out, lsn); /* walEnd */
1137 
1138  /*
1139  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1140  * reserve space here.
1141  */
1142  pq_sendint64(ctx->out, 0); /* sendtime */
1143 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
StringInfo out
Definition: logical.h:73

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2895 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2896 {
2897  int i;
2898 
2899  for (i = 0; i < max_wal_senders; i++)
2900  {
2901  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2902 
2903  SpinLockAcquire(&walsnd->mutex);
2904  if (walsnd->pid == 0)
2905  {
2906  SpinLockRelease(&walsnd->mutex);
2907  continue;
2908  }
2909  walsnd->needreload = true;
2910  SpinLockRelease(&walsnd->mutex);
2911  }
2912 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3113 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3114 {
3115  WalSnd *walsnd = MyWalSnd;
3116 
3118 
3119  if (walsnd->state == state)
3120  return;
3121 
3122  SpinLockAcquire(&walsnd->mutex);
3123  walsnd->state = state;
3124  SpinLockRelease(&walsnd->mutex);
3125 }
slock_t mutex
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:688
Definition: regguts.h:298

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 2988 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

2989 {
2990  bool found;
2991  int i;
2992 
2993  WalSndCtl = (WalSndCtlData *)
2994  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2995 
2996  if (!found)
2997  {
2998  /* First time through, so initialize */
3000 
3001  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3003 
3004  for (i = 0; i < max_wal_senders; i++)
3005  {
3006  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3007 
3008  SpinLockInit(&walsnd->mutex);
3009  }
3010  }
3011 }
Size WalSndShmemSize(void)
Definition: walsender.c:2976
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:897
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 2976 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2977 {
2978  Size size = 0;
2979 
2980  size = offsetof(WalSndCtlData, walsnds);
2981  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2982 
2983  return size;
2984 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:422
#define offsetof(type, field)
Definition: c.h:611

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 229 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndErrorCleanup(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

261 {
263 
264  /* Create a per-walsender data structure in shared memory */
266 
267  /* Set up resource owner */
268  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
269 
270  /*
271  * Let postmaster know that we're a WAL sender. Once we've declared us as
272  * a WAL sender process, postmaster will let us outlive the bgwriter and
273  * kill us last in the shutdown sequence, so we get a chance to stream all
274  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
275  * there's no going back, and we mustn't write any WAL records after this.
276  */
279 
280  /* Initialize empty timestamp buffer for lag tracking. */
281  memset(&LagTracker, 0, sizeof(LagTracker));
282 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2228
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
bool RecoveryInProgress(void)
Definition: xlog.c:7919
static struct @26 LagTracker
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
bool am_cascading_walsender
Definition: walsender.c:116
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 2952 of file walsender.c.

References die(), InitializeTimeouts(), PostgresSigHupHandler(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

2953 {
2954  /* Set up signal handlers */
2955  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2956  * file */
2957  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2958  pqsignal(SIGTERM, die); /* request shutdown */
2959  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2960  InitializeTimeouts(); /* establishes SIGALRM handler */
2963  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2964  * shutdown */
2965 
2966  /* Reset some signals that are accepted by postmaster but not here */
2972 }
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGQUIT
Definition: win32_port.h:164
#define SIGTTOU
Definition: win32_port.h:175
#define SIGTTIN
Definition: win32_port.h:174
#define SIGUSR1
Definition: win32_port.h:177
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2940
#define SIGCHLD
Definition: win32_port.h:173
#define SIGWINCH
Definition: win32_port.h:176
#define SIGCONT
Definition: win32_port.h:172
#define SIGPIPE
Definition: win32_port.h:168
#define SIGUSR2
Definition: win32_port.h:178
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2687
#define SIGHUP
Definition: win32_port.h:163
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2727
#define SIG_IGN
Definition: win32_port.h:160
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIG_DFL
Definition: win32_port.h:158
void die(SIGNAL_ARGS)
Definition: postgres.c:2656
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:261
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2595

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1251 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1252 {
1253  static TimestampTz sendTime = 0;
1255 
1256  /*
1257  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1258  * avoid flooding the lag tracker when we commit frequently.
1259  */
1260 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1261  if (!TimestampDifferenceExceeds(sendTime, now,
1263  return;
1264 
1265  LagTrackerWrite(lsn, now);
1266  sendTime = now;
1267 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3393
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1277 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, now(), PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1278 {
1279  int wakeEvents;
1280  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1281 
1282 
1283  /*
1284  * Fast path to avoid acquiring the spinlock in case we already know we
1285  * have enough WAL available. This is particularly interesting if we're
1286  * far behind.
1287  */
1288  if (RecentFlushPtr != InvalidXLogRecPtr &&
1289  loc <= RecentFlushPtr)
1290  return RecentFlushPtr;
1291 
1292  /* Get a more recent flush pointer. */
1293  if (!RecoveryInProgress())
1294  RecentFlushPtr = GetFlushRecPtr();
1295  else
1296  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1297 
1298  for (;;)
1299  {
1300  long sleeptime;
1301  TimestampTz now;
1302 
1303  /*
1304  * Emergency bailout if postmaster has died. This is to avoid the
1305  * necessity for manual cleanup of all postmaster children.
1306  */
1307  if (!PostmasterIsAlive())
1308  exit(1);
1309 
1310  /* Clear any already-pending wakeups */
1312 
1314 
1315  /* Process any requests or signals received recently */
1316  if (ConfigReloadPending)
1317  {
1318  ConfigReloadPending = false;
1321  }
1322 
1323  /* Check for input from the client */
1325 
1326  /*
1327  * If we're shutting down, trigger pending WAL to be written out,
1328  * otherwise we'd possibly end up waiting for WAL that never gets
1329  * written, because walwriter has shut down already.
1330  */
1331  if (got_STOPPING)
1333 
1334  /* Update our idea of the currently flushed position. */
1335  if (!RecoveryInProgress())
1336  RecentFlushPtr = GetFlushRecPtr();
1337  else
1338  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1339 
1340  /*
1341  * If postmaster asked us to stop, don't wait anymore.
1342  *
1343  * It's important to do this check after the recomputation of
1344  * RecentFlushPtr, so we can send all remaining data before shutting
1345  * down.
1346  */
1347  if (got_STOPPING)
1348  break;
1349 
1350  /*
1351  * We only send regular messages to the client for full decoded
1352  * transactions, but a synchronous replication and walsender shutdown
1353  * possibly are waiting for a later location. So we send pings
1354  * containing the flush location every now and then.
1355  */
1356  if (MyWalSnd->flush < sentPtr &&
1357  MyWalSnd->write < sentPtr &&
1359  {
1360  WalSndKeepalive(false);
1362  }
1363 
1364  /* check whether we're done */
1365  if (loc <= RecentFlushPtr)
1366  break;
1367 
1368  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1369  WalSndCaughtUp = true;
1370 
1371  /*
1372  * Try to flush any pending output to the client.
1373  */
1374  if (pq_flush_if_writable() != 0)
1375  WalSndShutdown();
1376 
1377  /*
1378  * If we have received CopyDone from the client, sent CopyDone
1379  * ourselves, and the output buffer is empty, it's time to exit
1380  * streaming, so fail the current WAL fetch request.
1381  */
1383  !pq_is_send_pending())
1384  break;
1385 
1386  now = GetCurrentTimestamp();
1387 
1388  /* die if timeout was reached */
1389  WalSndCheckTimeOut(now);
1390 
1391  /* Send keepalive if the time has come */
1393 
1394  /*
1395  * Sleep until something happens or we time out. Also wait for the
1396  * socket becoming writable, if there's still pending output.
1397  * Otherwise we might sit on sendable output data while waiting for
1398  * new WAL to be generated. (But if we have nothing to send, we don't
1399  * want to wake on socket-writable.)
1400  */
1401  sleeptime = WalSndComputeSleeptime(now);
1402 
1403  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1405 
1406  if (pq_is_send_pending())
1407  wakeEvents |= WL_SOCKET_WRITEABLE;
1408 
1409  WaitLatchOrSocket(MyLatch, wakeEvents,
1410  MyProcPort->sock, sleeptime,
1412  }
1413 
1414  /* reactivate latch so WalSndLoop knows to continue */
1415  SetLatch(MyLatch);
1416  return RecentFlushPtr;
1417 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:41
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8251
int sleeptime
Definition: pg_standby.c:42
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
bool RecoveryInProgress(void)
Definition: xlog.c:7919
pgsocket sock
Definition: libpq-be.h:118
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3335
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11155
bool XLogBackgroundFlush(void)
Definition: xlog.c:2952
static bool streamingDoneSending
Definition: walsender.c:180
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void SyncRepInitConfig(void)
Definition: syncrep.c:382
static bool WalSndCaughtUp
Definition: walsender.c:184
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:158
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3354
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2025
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2067
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static bool streamingDoneReceiving
Definition: walsender.c:181
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static bool waiting_for_ping_response
Definition: walsender.c:172
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1592
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3075 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3076 {
3077  for (;;)
3078  {
3079  int i;
3080  bool all_stopped = true;
3081 
3082  for (i = 0; i < max_wal_senders; i++)
3083  {
3084  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3085 
3086  SpinLockAcquire(&walsnd->mutex);
3087 
3088  if (walsnd->pid == 0)
3089  {
3090  SpinLockRelease(&walsnd->mutex);
3091  continue;
3092  }
3093 
3094  if (walsnd->state != WALSNDSTATE_STOPPING)
3095  {
3096  all_stopped = false;
3097  SpinLockRelease(&walsnd->mutex);
3098  break;
3099  }
3100  SpinLockRelease(&walsnd->mutex);
3101  }
3102 
3103  /* safe to leave if confirmation is done for all WAL senders */
3104  if (all_stopped)
3105  return;
3106 
3107  pg_usleep(10000L); /* wait for 10 msec */
3108  }
3109 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3020 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3021 {
3022  int i;
3023 
3024  for (i = 0; i < max_wal_senders; i++)
3025  {
3026  Latch *latch;
3027  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3028 
3029  /*
3030  * Get latch pointer with spinlock held, for the unlikely case that
3031  * pointer reads aren't atomic (as they're 8 bytes).
3032  */
3033  SpinLockAcquire(&walsnd->mutex);
3034  latch = walsnd->latch;
3035  SpinLockRelease(&walsnd->mutex);
3036 
3037  if (latch != NULL)
3038  SetLatch(latch);
3039  }
3040 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1153 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1155 {
1156  TimestampTz now;
1157 
1158  /* output previously gathered data in a CopyData packet */
1159  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1160 
1161  /*
1162  * Fill the send timestamp last, so that it is taken as late as possible.
1163  * This is somewhat ugly, but the protocol is set as it's already used for
1164  * several releases by streaming physical replication.
1165  */
1167  now = GetCurrentTimestamp();
1168  pq_sendint64(&tmpbuf, now);
1169  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1170  tmpbuf.data, sizeof(int64));
1171 
1173 
1174  /* Try to flush pending output to the client */
1175  if (pq_flush_if_writable() != 0)
1176  WalSndShutdown();
1177 
1178  /* Try taking fast path unless we get too close to walsender timeout. */
1180  wal_sender_timeout / 2) &&
1181  !pq_is_send_pending())
1182  {
1183  return;
1184  }
1185 
1186  /* If we have pending write here, go to slow path */
1187  for (;;)
1188  {
1189  int wakeEvents;
1190  long sleeptime;
1191 
1192  /* Check for input from the client */
1194 
1195  now = GetCurrentTimestamp();
1196 
1197  /* die if timeout was reached */
1198  WalSndCheckTimeOut(now);
1199 
1200  /* Send keepalive if the time has come */
1202 
1203  if (!pq_is_send_pending())
1204  break;
1205 
1206  sleeptime = WalSndComputeSleeptime(now);
1207 
1208  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1210 
1211  /* Sleep until something happens or we time out */
1212  WaitLatchOrSocket(MyLatch, wakeEvents,
1213  MyProcPort->sock, sleeptime,
1215 
1216  /*
1217  * Emergency bailout if postmaster has died. This is to avoid the
1218  * necessity for manual cleanup of all postmaster children.
1219  */
1220  if (!PostmasterIsAlive())
1221  exit(1);
1222 
1223  /* Clear any already-pending wakeups */
1225 
1227 
1228  /* Process any requests or signals received recently */
1229  if (ConfigReloadPending)
1230  {
1231  ConfigReloadPending = false;
1234  }
1235 
1236  /* Try to flush pending output to the client */
1237  if (pq_flush_if_writable() != 0)
1238  WalSndShutdown();
1239  }
1240 
1241  /* reactivate latch so WalSndLoop knows to continue */
1242  SetLatch(MyLatch);
1243 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
int wal_sender_timeout
Definition: walsender.c:123
struct Port * MyProcPort
Definition: globals.c:41
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:42
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
pgsocket sock
Definition: libpq-be.h:118
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
void SyncRepInitConfig(void)
Definition: syncrep.c:382
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3354
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2025
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2067
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static StringInfoData tmpbuf
Definition: walsender.c:163
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:73
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1592
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ XLogRead()

static void XLogRead ( char *  buf,
XLogRecPtr  startptr,
Size  count 
)
static

Definition at line 2317 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), buf, CheckXLogRemoved(), close, curFileTimeLine, ereport, errcode_for_file_access(), errmsg(), ERROR, MAXPGPATH, WalSnd::mutex, MyWalSnd, WalSnd::needreload, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sendFile, sendOff, sendSegNo, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, WAIT_EVENT_WAL_READ, wal_segment_size, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegmentOffset.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2318 {
2319  char *p;
2320  XLogRecPtr recptr;
2321  Size nbytes;
2322  XLogSegNo segno;
2323 
2324 retry:
2325  p = buf;
2326  recptr = startptr;
2327  nbytes = count;
2328 
2329  while (nbytes > 0)
2330  {
2331  uint32 startoff;
2332  int segbytes;
2333  int readbytes;
2334 
2335  startoff = XLogSegmentOffset(recptr, wal_segment_size);
2336 
2337  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
2338  {
2339  char path[MAXPGPATH];
2340 
2341  /* Switch to another logfile segment */
2342  if (sendFile >= 0)
2343  close(sendFile);
2344 
2346 
2347  /*-------
2348  * When reading from a historic timeline, and there is a timeline
2349  * switch within this segment, read from the WAL segment belonging
2350  * to the new timeline.
2351  *
2352  * For example, imagine that this server is currently on timeline
2353  * 5, and we're streaming timeline 4. The switch from timeline 4
2354  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2355  *
2356  * ...
2357  * 000000040000000000000012
2358  * 000000040000000000000013
2359  * 000000050000000000000013
2360  * 000000050000000000000014
2361  * ...
2362  *
2363  * In this situation, when requested to send the WAL from
2364  * segment 0x13, on timeline 4, we read the WAL from file
2365  * 000000050000000000000013. Archive recovery prefers files from
2366  * newer timelines, so if the segment was restored from the
2367  * archive on this server, the file belonging to the old timeline,
2368  * 000000040000000000000013, might not exist. Their contents are
2369  * equal up to the switchpoint, because at a timeline switch, the
2370  * used portion of the old segment is copied to the new file.
2371  *-------
2372  */
2375  {
2376  XLogSegNo endSegNo;
2377 
2379  if (sendSegNo == endSegNo)
2381  }
2382 
2384 
2385  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2386  if (sendFile < 0)
2387  {
2388  /*
2389  * If the file is not found, assume it's because the standby
2390  * asked for a too old WAL segment that has already been
2391  * removed or recycled.
2392  */
2393  if (errno == ENOENT)
2394  ereport(ERROR,
2396  errmsg("requested WAL segment %s has already been removed",
2398  else
2399  ereport(ERROR,
2401  errmsg("could not open file \"%s\": %m",
2402  path)));
2403  }
2404  sendOff = 0;
2405  }
2406 
2407  /* Need to seek in the file? */
2408  if (sendOff != startoff)
2409  {
2410  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2411  ereport(ERROR,
2413  errmsg("could not seek in log segment %s to offset %u: %m",
2415  startoff)));
2416  sendOff = startoff;
2417  }
2418 
2419  /* How many bytes are within this segment? */
2420  if (nbytes > (wal_segment_size - startoff))
2421  segbytes = wal_segment_size - startoff;
2422  else
2423  segbytes = nbytes;
2424 
2426  readbytes = read(sendFile, p, segbytes);
2428  if (readbytes <= 0)
2429  {
2430  ereport(ERROR,
2432  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2434  sendOff, (unsigned long) segbytes)));
2435  }
2436 
2437  /* Update state for read */
2438  recptr += readbytes;
2439 
2440  sendOff += readbytes;
2441  nbytes -= readbytes;
2442  p += readbytes;
2443  }
2444 
2445  /*
2446  * After reading into the buffer, check that what we read was valid. We do
2447  * this after reading, because even though the segment was present when we
2448  * opened it, it might get recycled or removed while we read it. The
2449  * read() succeeds in that case, but the data we tried to read might
2450  * already have been overwritten with new WAL records.
2451  */
2452  XLByteToSeg(startptr, segno, wal_segment_size);
2454 
2455  /*
2456  * During recovery, the currently-open WAL file might be replaced with the
2457  * file of the same name retrieved from archive. So we always need to
2458  * check what we read was valid after reading into the buffer. If it's
2459  * invalid, we try to open and read the file again.
2460  */
2462  {
2463  WalSnd *walsnd = MyWalSnd;
2464  bool reload;
2465 
2466  SpinLockAcquire(&walsnd->mutex);
2467  reload = walsnd->needreload;
2468  walsnd->needreload = false;
2469  SpinLockRelease(&walsnd->mutex);
2470 
2471  if (reload && sendFile >= 0)
2472  {
2473  close(sendFile);
2474  sendFile = -1;
2475 
2476  goto retry;
2477  }
2478  }
2479 }
int wal_segment_size
Definition: xlog.c:113
static int sendFile
Definition: walsender.c:136
#define PG_BINARY
Definition: c.h:1069
slock_t mutex
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3774
#define MAXPGPATH
static TimeLineID curFileTimeLine
Definition: walsender.c:141
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10158
static char * buf
Definition: pg_test_fsync.c:67
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:314
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
#define ereport(elevel, rest)
Definition: elog.h:122
#define SpinLockRelease(lock)
Definition: spin.h:64
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
WalSnd * MyWalSnd
Definition: walsender.c:112
TimeLineID ThisTimeLineID
Definition: xlog.c:181
static TimeLineID sendTimeLine
Definition: walsender.c:149
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool needreload
size_t Size
Definition: c.h:422
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1236
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:150
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:152
static XLogSegNo sendSegNo
Definition: walsender.c:137
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:939
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 sendOff
Definition: walsender.c:138
#define close(a)
Definition: win32.h:12
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:151
bool am_cascading_walsender
Definition: walsender.c:116
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2755 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2756 {
2757  XLogRecord *record;
2758  char *errm;
2759 
2760  /*
2761  * Don't know whether we've caught up yet. We'll set it to true in
2762  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2763  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2764  * i.e. when we're shutting down.
2765  */
2766  WalSndCaughtUp = false;
2767 
2770 
2771  /* xlog record was invalid */
2772  if (errm != NULL)
2773  elog(ERROR, "%s", errm);
2774 
2775  if (record != NULL)
2776  {
2777  /*
2778  * Note the lack of any call to LagTrackerWrite() which is handled by
2779  * WalSndUpdateProgress which is called by output plugin through
2780  * logical decoding write api.
2781  */
2783 
2785  }
2786  else
2787  {
2788  /*
2789  * If the record we just wanted read is at or beyond the flushed
2790  * point, then we're caught up.
2791  */
2793  {
2794  WalSndCaughtUp = true;
2795 
2796  /*
2797  * Have WalSndLoop() terminate the connection in an orderly
2798  * manner, after writing out all the pending data.
2799  */
2800  if (got_STOPPING)
2801  got_SIGUSR2 = true;
2802  }
2803  }
2804 
2805  /* Update shared memory status */
2806  {
2807  WalSnd *walsnd = MyWalSnd;
2808 
2809  SpinLockAcquire(&walsnd->mutex);
2810  walsnd->sentPtr = sentPtr;
2811  SpinLockRelease(&walsnd->mutex);
2812  }
2813 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8251
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:195
XLogRecPtr EndRecPtr
Definition: xlogreader.h:120
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:96
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:198
static XLogRecPtr logical_startptr
Definition: walsender.c:199
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
static bool WalSndCaughtUp
Definition: walsender.c:184
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:158
XLogReaderState * reader
Definition: logical.h:44
#define elog
Definition: elog.h:219

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 2492 of file walsender.c.

References am_cascading_walsender, Assert, close, StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendFile, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, and XLogRead().

Referenced by StartReplication().

2493 {
2494  XLogRecPtr SendRqstPtr;
2495  XLogRecPtr startptr;
2496  XLogRecPtr endptr;
2497  Size nbytes;
2498 
2499  /* If requested switch the WAL sender to the stopping state. */
2500  if (got_STOPPING)
2502 
2504  {
2505  WalSndCaughtUp = true;
2506  return;
2507  }
2508 
2509  /* Figure out how far we can safely send the WAL. */
2511  {
2512  /*
2513  * Streaming an old timeline that's in this server's history, but is
2514  * not the one we're currently inserting or replaying. It can be
2515  * streamed up to the point where we switched off that timeline.
2516  */
2517  SendRqstPtr = sendTimeLineValidUpto;
2518  }
2519  else if (am_cascading_walsender)
2520  {
2521  /*
2522  * Streaming the latest timeline on a standby.
2523  *
2524  * Attempt to send all WAL that has already been replayed, so that we
2525  * know it's valid. If we're receiving WAL through streaming
2526  * replication, it's also OK to send any WAL that has been received
2527  * but not replayed.
2528  *
2529  * The timeline we're recovering from can change, or we can be
2530  * promoted. In either case, the current timeline becomes historic. We
2531  * need to detect that so that we don't try to stream past the point
2532  * where we switched to another timeline. We check for promotion or
2533  * timeline switch after calculating FlushPtr, to avoid a race
2534  * condition: if the timeline becomes historic just after we checked
2535  * that it was still current, it's still be OK to stream it up to the
2536  * FlushPtr that was calculated before it became historic.
2537  */
2538  bool becameHistoric = false;
2539 
2540  SendRqstPtr = GetStandbyFlushRecPtr();
2541 
2542  if (!RecoveryInProgress())
2543  {
2544  /*
2545  * We have been promoted. RecoveryInProgress() updated
2546  * ThisTimeLineID to the new current timeline.
2547  */
2548  am_cascading_walsender = false;
2549  becameHistoric = true;
2550  }
2551  else
2552  {
2553  /*
2554  * Still a cascading standby. But is the timeline we're sending
2555  * still the one recovery is recovering from? ThisTimeLineID was
2556  * updated by the GetStandbyFlushRecPtr() call above.
2557  */
2559  becameHistoric = true;
2560  }
2561 
2562  if (becameHistoric)
2563  {
2564  /*
2565  * The timeline we were sending has become historic. Read the
2566  * timeline history file of the new timeline to see where exactly
2567  * we forked off from the timeline we were sending.
2568  */
2569  List *history;
2570 
2573 
2575  list_free_deep(history);
2576 
2577  sendTimeLineIsHistoric = true;
2578 
2579  SendRqstPtr = sendTimeLineValidUpto;
2580  }
2581  }
2582  else
2583  {
2584  /*
2585  * Streaming the current timeline on a master.
2586  *
2587  * Attempt to send all data that's already been written out and
2588  * fsync'd to disk. We cannot go further than what's been written out
2589  * given the current implementation of XLogRead(). And in any case
2590  * it's unsafe to send WAL that is not securely down to disk on the
2591  * master: if the master subsequently crashes and restarts, standbys
2592  * must not have applied any WAL that got lost on the master.
2593  */
2594  SendRqstPtr = GetFlushRecPtr();
2595  }
2596 
2597  /*
2598  * Record the current system time as an approximation of the time at which
2599  * this WAL location was written for the purposes of lag tracking.
2600  *
2601  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2602  * is flushed and we could get that time as well as the LSN when we call
2603  * GetFlushRecPtr() above (and likewise for the cascading standby
2604  * equivalent), but rather than putting any new code into the hot WAL path
2605  * it seems good enough to capture the time here. We should reach this
2606  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2607  * may take some time, we read the WAL flush pointer and take the time
2608  * very close to together here so that we'll get a later position if it is
2609  * still moving.
2610  *
2611  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2612  * this gives us a cheap approximation for the WAL flush time for this
2613  * LSN.
2614  *
2615  * Note that the LSN is not necessarily the LSN for the data contained in
2616  * the present message; it's the end of the WAL, which might be further
2617  * ahead. All the lag tracking machinery cares about is finding out when
2618  * that arbitrary LSN is eventually reported as written, flushed and
2619  * applied, so that it can measure the elapsed time.
2620  */
2621  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2622 
2623  /*
2624  * If this is a historic timeline and we've reached the point where we
2625  * forked to the next timeline, stop streaming.
2626  *
2627  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2628  * startup process will normally replay all WAL that has been received
2629  * from the master, before promoting, but if the WAL streaming is
2630  * terminated at a WAL page boundary, the valid portion of the timeline
2631  * might end in the middle of a WAL record. We might've already sent the
2632  * first half of that partial WAL record to the cascading standby, so that
2633  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2634  * replay the partial WAL record either, so it can still follow our
2635  * timeline switch.
2636  */
2638  {
2639  /* close the current file. */
2640  if (sendFile >= 0)
2641  close(sendFile);
2642  sendFile = -1;
2643 
2644  /* Send CopyDone */
2645  pq_putmessage_noblock('c', NULL, 0);
2646  streamingDoneSending = true;
2647 
2648  WalSndCaughtUp = true;
2649 
2650