PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void XLogRead (WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static WALOpenSegmentsendSeg = NULL
 
static WALSegmentContextsendCxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 203 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 105 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 221 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 851 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

852 {
853  const char *snapshot_name = NULL;
854  char xloc[MAXFNAMELEN];
855  char *slot_name;
856  bool reserve_wal = false;
857  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
859  TupOutputState *tstate;
860  TupleDesc tupdesc;
861  Datum values[4];
862  bool nulls[4];
863 
865 
866  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
867 
868  /* setup state for XLogRead */
869  sendTimeLineIsHistoric = false;
871 
872  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
873  {
874  ReplicationSlotCreate(cmd->slotname, false,
876  }
877  else
878  {
880 
881  /*
882  * Initially create persistent slot as ephemeral - that allows us to
883  * nicely handle errors during initialization because it'll get
884  * dropped if this transaction fails. We'll make it persistent at the
885  * end. Temporary slots can be created as temporary from beginning as
886  * they get dropped on error as well.
887  */
888  ReplicationSlotCreate(cmd->slotname, true,
890  }
891 
892  if (cmd->kind == REPLICATION_KIND_LOGICAL)
893  {
895  bool need_full_snapshot = false;
896 
897  /*
898  * Do options check early so that we can bail before calling the
899  * DecodingContextFindStartpoint which can take long time.
900  */
901  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
902  {
903  if (IsTransactionBlock())
904  ereport(ERROR,
905  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
906  (errmsg("%s must not be called inside a transaction",
907  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
908 
909  need_full_snapshot = true;
910  }
911  else if (snapshot_action == CRS_USE_SNAPSHOT)
912  {
913  if (!IsTransactionBlock())
914  ereport(ERROR,
915  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
916  (errmsg("%s must be called inside a transaction",
917  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
918 
920  ereport(ERROR,
921  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
922  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
923  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
924 
925  if (FirstSnapshotSet)
926  ereport(ERROR,
927  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
928  (errmsg("%s must be called before any query",
929  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
930 
931  if (IsSubTransaction())
932  ereport(ERROR,
933  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
934  (errmsg("%s must not be called in a subtransaction",
935  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
936 
937  need_full_snapshot = true;
938  }
939 
940  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
945 
946  /*
947  * Signal that we don't need the timeout mechanism. We're just
948  * creating the replication slot and don't yet accept feedback
949  * messages or send keepalives. As we possibly need to wait for
950  * further WAL the walsender would otherwise possibly be killed too
951  * soon.
952  */
954 
955  /* build initial snapshot, might take a while */
957 
958  /*
959  * Export or use the snapshot if we've been asked to do so.
960  *
961  * NB. We will convert the snapbuild.c kind of snapshot to normal
962  * snapshot when doing this.
963  */
964  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
965  {
966  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
967  }
968  else if (snapshot_action == CRS_USE_SNAPSHOT)
969  {
970  Snapshot snap;
971 
974  }
975 
976  /* don't need the decoding context anymore */
977  FreeDecodingContext(ctx);
978 
979  if (!cmd->temporary)
981  }
982  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
983  {
985 
987 
988  /* Write this slot to disk if it's a permanent one. */
989  if (!cmd->temporary)
991  }
992 
993  snprintf(xloc, sizeof(xloc), "%X/%X",
996 
998  MemSet(nulls, false, sizeof(nulls));
999 
1000  /*----------
1001  * Need a tuple descriptor representing four columns:
1002  * - first field: the slot name
1003  * - second field: LSN at which we became consistent
1004  * - third field: exported snapshot's name
1005  * - fourth field: output plugin
1006  *----------
1007  */
1008  tupdesc = CreateTemplateTupleDesc(4);
1009  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1010  TEXTOID, -1, 0);
1011  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1012  TEXTOID, -1, 0);
1013  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1014  TEXTOID, -1, 0);
1015  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1016  TEXTOID, -1, 0);
1017 
1018  /* prepare for projection of tuples */
1019  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1020 
1021  /* slot_name */
1022  slot_name = NameStr(MyReplicationSlot->data.name);
1023  values[0] = CStringGetTextDatum(slot_name);
1024 
1025  /* consistent wal location */
1026  values[1] = CStringGetTextDatum(xloc);
1027 
1028  /* snapshot name, or NULL if none */
1029  if (snapshot_name != NULL)
1030  values[2] = CStringGetTextDatum(snapshot_name);
1031  else
1032  nulls[2] = true;
1033 
1034  /* plugin, or NULL if none */
1035  if (cmd->plugin != NULL)
1036  values[3] = CStringGetTextDatum(cmd->plugin);
1037  else
1038  nulls[3] = true;
1039 
1040  /* send it to dest */
1041  do_tup_output(tstate, values, nulls);
1042  end_tup_output(tstate);
1043 
1045 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:798
PGPROC * MyProc
Definition: proc.c:68
#define XACT_REPEATABLE_READ
Definition: xact.h:38
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2225
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2252
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define MemSet(start, val, len)
Definition: c.h:955
void ReplicationSlotSave(void)
Definition: slot.c:645
ReplicationSlotPersistentData data
Definition: slot.h:132
XLogRecPtr confirmed_flush
Definition: slot.h:80
ReplicationKind kind
Definition: replnodes.h:56
bool IsTransactionBlock(void)
Definition: xact.c:4633
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:636
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:466
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2310
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:205
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2232
void ReplicationSlotPersist(void)
Definition: slot.c:680
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1178
unsigned int uint32
Definition: c.h:358
void ReplicationSlotRelease(void)
Definition: slot.c:424
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1151
#define MAXFNAMELEN
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:231
uintptr_t Datum
Definition: postgres.h:367
TimeLineID ThisTimeLineID
Definition: xlog.c:187
struct SnapBuild * snapshot_builder
Definition: logical.h:44
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:140
#define Assert(condition)
Definition: c.h:732
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:512
int XactIsoLevel
Definition: xact.c:75
bool IsSubTransaction(void)
Definition: xact.c:4706
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:544
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:784
CRSSnapshotAction
Definition: walsender.h:20
#define NameStr(name)
Definition: c.h:609
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:764
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:79
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:163
static bool sendTimeLineIsHistoric
Definition: walsender.c:142
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1267

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1051 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1052 {
1053  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1054  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1055 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1431 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, generate_unaccent_rules::dest, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_IDLE, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1432 {
1433  int parse_rc;
1434  Node *cmd_node;
1435  MemoryContext cmd_context;
1436  MemoryContext old_context;
1437 
1438  /*
1439  * If WAL sender has been told that shutdown is getting close, switch its
1440  * status accordingly to handle the next replication commands correctly.
1441  */
1442  if (got_STOPPING)
1444 
1445  /*
1446  * Throw error if in stopping mode. We need prevent commands that could
1447  * generate WAL while the shutdown checkpoint is being written. To be
1448  * safe, we just prohibit all new commands.
1449  */
1451  ereport(ERROR,
1452  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1453 
1454  /*
1455  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1456  * command arrives. Clean up the old stuff if there's anything.
1457  */
1459 
1461 
1463  "Replication command context",
1465  old_context = MemoryContextSwitchTo(cmd_context);
1466 
1467  replication_scanner_init(cmd_string);
1468  parse_rc = replication_yyparse();
1469  if (parse_rc != 0)
1470  ereport(ERROR,
1471  (errcode(ERRCODE_SYNTAX_ERROR),
1472  (errmsg_internal("replication command parser returned %d",
1473  parse_rc))));
1474 
1475  cmd_node = replication_parse_result;
1476 
1477  /*
1478  * Log replication command if log_replication_commands is enabled. Even
1479  * when it's disabled, log the command with DEBUG1 level for backward
1480  * compatibility. Note that SQL commands are not logged here, and will be
1481  * logged later if log_statement is enabled.
1482  */
1483  if (cmd_node->type != T_SQLCmd)
1485  (errmsg("received replication command: %s", cmd_string)));
1486 
1487  /*
1488  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1489  * called outside of transaction the snapshot should be cleared here.
1490  */
1491  if (!IsTransactionBlock())
1493 
1494  /*
1495  * For aborted transactions, don't allow anything except pure SQL, the
1496  * exec_simple_query() will handle it correctly.
1497  */
1498  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1499  ereport(ERROR,
1500  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1501  errmsg("current transaction is aborted, "
1502  "commands ignored until end of transaction block")));
1503 
1505 
1506  /*
1507  * Allocate buffers that will be used for each outgoing and incoming
1508  * message. We do this just once per command to reduce palloc overhead.
1509  */
1513 
1514  /* Report to pgstat that this process is running */
1516 
1517  switch (cmd_node->type)
1518  {
1519  case T_IdentifySystemCmd:
1520  IdentifySystem();
1521  break;
1522 
1523  case T_BaseBackupCmd:
1524  PreventInTransactionBlock(true, "BASE_BACKUP");
1525  SendBaseBackup((BaseBackupCmd *) cmd_node);
1526  break;
1527 
1530  break;
1531 
1534  break;
1535 
1536  case T_StartReplicationCmd:
1537  {
1538  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1539 
1540  PreventInTransactionBlock(true, "START_REPLICATION");
1541 
1542  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1543  StartReplication(cmd);
1544  else
1546  break;
1547  }
1548 
1549  case T_TimeLineHistoryCmd:
1550  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1552  break;
1553 
1554  case T_VariableShowStmt:
1555  {
1557  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1558 
1559  /* syscache access needs a transaction environment */
1561  GetPGVariable(n->name, dest);
1563  }
1564  break;
1565 
1566  case T_SQLCmd:
1567  if (MyDatabaseId == InvalidOid)
1568  ereport(ERROR,
1569  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1570 
1571  /* Report to pgstat that this process is now idle */
1573 
1574  /* Tell the caller that this wasn't a WalSender command. */
1575  return false;
1576 
1577  default:
1578  elog(ERROR, "unrecognized replication command node tag: %u",
1579  cmd_node->type);
1580  }
1581 
1582  /* done */
1583  MemoryContextSwitchTo(old_context);
1584  MemoryContextDelete(cmd_context);
1585 
1586  /* Send CommandComplete message */
1587  EndCommand("SELECT", DestRemote);
1588 
1589  /* Report to pgstat that this process is now idle */
1591 
1592  return true;
1593 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3121
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:433
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1051
void CommitTransactionCommand(void)
Definition: xact.c:2895
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:376
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:525
static StringInfoData output_message
Definition: walsender.c:152
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8738
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4633
ReplicationKind kind
Definition: replnodes.h:82
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:760
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
WalSndState state
NodeTag type
Definition: nodes.h:527
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:539
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3328
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define ereport(elevel, rest)
Definition: elog.h:141
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static StringInfoData reply_message
Definition: walsender.c:153
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1062
Oid MyDatabaseId
Definition: globals.c:85
WalSnd * MyWalSnd
Definition: walsender.c:111
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:814
void WalSndSetState(WalSndState state)
Definition: walsender.c:3166
void StartTransactionCommand(void)
Definition: xact.c:2794
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:851
static StringInfoData tmpbuf
Definition: walsender.c:154
bool log_replication_commands
Definition: walsender.c:124
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void IdentifySystem(void)
Definition: walsender.c:344
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:697
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2922 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2923 {
2924  XLogRecPtr replayPtr;
2925  TimeLineID replayTLI;
2926  XLogRecPtr receivePtr;
2928  XLogRecPtr result;
2929 
2930  /*
2931  * We can safely send what's already been replayed. Also, if walreceiver
2932  * is streaming WAL from the same timeline, we can send anything that it
2933  * has streamed, but hasn't been replayed yet.
2934  */
2935 
2936  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2937  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2938 
2939  ThisTimeLineID = replayTLI;
2940 
2941  result = replayPtr;
2942  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2943  result = receivePtr;
2944 
2945  return result;
2946 }
uint32 TimeLineID
Definition: xlogdefs.h:52
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11149
static TimeLineID receiveTLI
Definition: xlog.c:209
TimeLineID ThisTimeLineID
Definition: xlog.c:187
uint64 XLogRecPtr
Definition: xlogdefs.h:21

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 2975 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

2976 {
2978 
2979  /*
2980  * If replication has not yet started, die like with SIGTERM. If
2981  * replication is active, only set a flag and wake up the main loop. It
2982  * will send any outstanding WAL, wait for it to be replicated to the
2983  * standby, and then exit gracefully.
2984  */
2985  if (!replication_active)
2986  kill(MyProcPid, SIGTERM);
2987  else
2988  got_STOPPING = true;
2989 }
int MyProcPid
Definition: globals.c:40
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
#define kill(pid, sig)
Definition: win32_port.h:426
bool am_walsender
Definition: walsender.c:114
static volatile sig_atomic_t replication_active
Definition: walsender.c:190
#define Assert(condition)
Definition: c.h:732

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 344 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

345 {
346  char sysid[32];
347  char xloc[MAXFNAMELEN];
348  XLogRecPtr logptr;
349  char *dbname = NULL;
351  TupOutputState *tstate;
352  TupleDesc tupdesc;
353  Datum values[4];
354  bool nulls[4];
355 
356  /*
357  * Reply with a result set with one row, four columns. First col is system
358  * ID, second is timeline ID, third is current xlog location and the
359  * fourth contains the database name if we are connected to one.
360  */
361 
362  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
364 
367  {
368  /* this also updates ThisTimeLineID */
369  logptr = GetStandbyFlushRecPtr();
370  }
371  else
372  logptr = GetFlushRecPtr();
373 
374  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
375 
376  if (MyDatabaseId != InvalidOid)
377  {
379 
380  /* syscache access needs a transaction env. */
382  /* make dbname live outside TX context */
386  /* CommitTransactionCommand switches to TopMemoryContext */
388  }
389 
391  MemSet(nulls, false, sizeof(nulls));
392 
393  /* need a tuple descriptor representing four columns */
394  tupdesc = CreateTemplateTupleDesc(4);
395  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
396  TEXTOID, -1, 0);
397  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
398  INT4OID, -1, 0);
399  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
400  TEXTOID, -1, 0);
401  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
402  TEXTOID, -1, 0);
403 
404  /* prepare for projection of tuples */
405  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
406 
407  /* column 1: system identifier */
408  values[0] = CStringGetTextDatum(sysid);
409 
410  /* column 2: timeline */
411  values[1] = Int32GetDatum(ThisTimeLineID);
412 
413  /* column 3: wal location */
414  values[2] = CStringGetTextDatum(xloc);
415 
416  /* column 4: database name, or NULL if none */
417  if (dbname)
418  values[3] = CStringGetTextDatum(dbname);
419  else
420  nulls[3] = true;
421 
422  /* send it to dest */
423  do_tup_output(tstate, values, nulls);
424 
425  end_tup_output(tstate);
426 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void CommitTransactionCommand(void)
Definition: xact.c:2895
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2252
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:955
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8249
bool RecoveryInProgress(void)
Definition: xlog.c:7917
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2310
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2232
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2100
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
unsigned int uint32
Definition: c.h:358
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:187
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2794
char * dbname
Definition: streamutil.c:52
static Datum values[MAXATTR]
Definition: bootstrap.c:167
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4802
#define Int32GetDatum(X)
Definition: postgres.h:479
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2922
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:401
bool am_cascading_walsender
Definition: walsender.c:115

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2279 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2280 {
2281  int i;
2282 
2283  /*
2284  * WalSndCtl should be set up already (we inherit this by fork() or
2285  * EXEC_BACKEND mechanism from the postmaster).
2286  */
2287  Assert(WalSndCtl != NULL);
2288  Assert(MyWalSnd == NULL);
2289 
2290  /*
2291  * Find a free walsender slot and reserve it. This must not fail due to
2292  * the prior check for free WAL senders in InitProcess().
2293  */
2294  for (i = 0; i < max_wal_senders; i++)
2295  {
2296  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2297 
2298  SpinLockAcquire(&walsnd->mutex);
2299 
2300  if (walsnd->pid != 0)
2301  {
2302  SpinLockRelease(&walsnd->mutex);
2303  continue;
2304  }
2305  else
2306  {
2307  /*
2308  * Found a free slot. Reserve it for us.
2309  */
2310  walsnd->pid = MyProcPid;
2311  walsnd->sentPtr = InvalidXLogRecPtr;
2312  walsnd->write = InvalidXLogRecPtr;
2313  walsnd->flush = InvalidXLogRecPtr;
2314  walsnd->apply = InvalidXLogRecPtr;
2315  walsnd->writeLag = -1;
2316  walsnd->flushLag = -1;
2317  walsnd->applyLag = -1;
2318  walsnd->state = WALSNDSTATE_STARTUP;
2319  walsnd->latch = &MyProc->procLatch;
2320  walsnd->replyTime = 0;
2321  SpinLockRelease(&walsnd->mutex);
2322  /* don't need the lock anymore */
2323  MyWalSnd = (WalSnd *) walsnd;
2324 
2325  break;
2326  }
2327  }
2328 
2329  Assert(MyWalSnd != NULL);
2330 
2331  /* Arrange to clean up at walsender exit */
2333 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:40
PGPROC * MyProc
Definition: proc.c:68
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2337
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:732
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3518 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3519 {
3520  TimestampTz time = 0;
3521 
3522  /* Read all unread samples up to this LSN or end of buffer. */
3523  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3524  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3525  {
3526  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3527  lag_tracker->last_read[head] =
3529  lag_tracker->read_heads[head] =
3531  }
3532 
3533  /*
3534  * If the lag tracker is empty, that means the standby has processed
3535  * everything we've ever sent so we should now clear 'last_read'. If we
3536  * didn't do that, we'd risk using a stale and irrelevant sample for
3537  * interpolation at the beginning of the next burst of WAL after a period
3538  * of idleness.
3539  */
3541  lag_tracker->last_read[head].time = 0;
3542 
3543  if (time > now)
3544  {
3545  /* If the clock somehow went backwards, treat as not found. */
3546  return -1;
3547  }
3548  else if (time == 0)
3549  {
3550  /*
3551  * We didn't cross a time. If there is a future sample that we
3552  * haven't reached yet, and we've already reached at least one sample,
3553  * let's interpolate the local flushed time. This is mainly useful
3554  * for reporting a completely stuck apply position as having
3555  * increasing lag, since otherwise we'd have to wait for it to
3556  * eventually start moving again and cross one of our samples before
3557  * we can show the lag increasing.
3558  */
3560  {
3561  /* There are no future samples, so we can't interpolate. */
3562  return -1;
3563  }
3564  else if (lag_tracker->last_read[head].time != 0)
3565  {
3566  /* We can interpolate between last_read and the next sample. */
3567  double fraction;
3568  WalTimeSample prev = lag_tracker->last_read[head];
3570 
3571  if (lsn < prev.lsn)
3572  {
3573  /*
3574  * Reported LSNs shouldn't normally go backwards, but it's
3575  * possible when there is a timeline change. Treat as not
3576  * found.
3577  */
3578  return -1;
3579  }
3580 
3581  Assert(prev.lsn < next.lsn);
3582 
3583  if (prev.time > next.time)
3584  {
3585  /* If the clock somehow went backwards, treat as not found. */
3586  return -1;
3587  }
3588 
3589  /* See how far we are between the previous and next samples. */
3590  fraction =
3591  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3592 
3593  /* Scale the local flush time proportionally. */
3594  time = (TimestampTz)
3595  ((double) prev.time + (next.time - prev.time) * fraction);
3596  }
3597  else
3598  {
3599  /*
3600  * We have only a future sample, implying that we were entirely
3601  * caught up but and now there is a new burst of WAL and the
3602  * standby hasn't processed the first sample yet. Until the
3603  * standby reaches the future sample the best we can do is report
3604  * the hypothetical lag if that sample were to be replayed now.
3605  */
3606  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3607  }
3608  }
3609 
3610  /* Return the elapsed time since local flush time in microseconds. */
3611  Assert(time != 0);
3612  return now - time;
3613 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:209
static int32 next
Definition: blutils.c:215
int write_head
Definition: walsender.c:210
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:212
TimestampTz time
Definition: walsender.c:199
static LagTracker * lag_tracker
Definition: walsender.c:215
XLogRecPtr lsn
Definition: walsender.c:198
#define Assert(condition)
Definition: c.h:732
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:211
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:203
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3453 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3454 {
3455  bool buffer_full;
3456  int new_write_head;
3457  int i;
3458 
3459  if (!am_walsender)
3460  return;
3461 
3462  /*
3463  * If the lsn hasn't advanced since last time, then do nothing. This way
3464  * we only record a new sample when new WAL has been written.
3465  */
3466  if (lag_tracker->last_lsn == lsn)
3467  return;
3468  lag_tracker->last_lsn = lsn;
3469 
3470  /*
3471  * If advancing the write head of the circular buffer would crash into any
3472  * of the read heads, then the buffer is full. In other words, the
3473  * slowest reader (presumably apply) is the one that controls the release
3474  * of space.
3475  */
3476  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3477  buffer_full = false;
3478  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3479  {
3480  if (new_write_head == lag_tracker->read_heads[i])
3481  buffer_full = true;
3482  }
3483 
3484  /*
3485  * If the buffer is full, for now we just rewind by one slot and overwrite
3486  * the last sample, as a simple (if somewhat uneven) way to lower the
3487  * sampling rate. There may be better adaptive compaction algorithms.
3488  */
3489  if (buffer_full)
3490  {
3491  new_write_head = lag_tracker->write_head;
3492  if (lag_tracker->write_head > 0)
3494  else
3496  }
3497 
3498  /* Store a sample at the current write head position. */
3500  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3501  lag_tracker->write_head = new_write_head;
3502 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:209
int write_head
Definition: walsender.c:210
TimestampTz time
Definition: walsender.c:199
static LagTracker * lag_tracker
Definition: walsender.c:215
bool am_walsender
Definition: walsender.c:114
XLogRecPtr lsn
Definition: walsender.c:198
XLogRecPtr last_lsn
Definition: walsender.c:208
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:211
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:203
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 764 of file walsender.c.

References XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WalSndWaitForWal(), XLogRead(), and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

766 {
767  XLogRecPtr flushptr;
768  int count;
769 
770  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
772  sendTimeLine = state->currTLI;
774  sendTimeLineNextTLI = state->nextTLI;
775 
776  /* make sure we have enough WAL available */
777  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
778 
779  /* fail if not (implies we are going to shut down) */
780  if (flushptr < targetPagePtr + reqLen)
781  return -1;
782 
783  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
784  count = XLOG_BLCKSZ; /* more than one block available */
785  else
786  count = flushptr - targetPagePtr; /* part of the page available */
787 
788  /* now actually read the data, we know it's there */
789  XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
790 
791  return count;
792 }
static WALSegmentContext * sendCxt
Definition: walsender.c:132
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:803
static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2365
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:192
TimeLineID nextTLI
Definition: xlogreader.h:198
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1293
TimeLineID ThisTimeLineID
Definition: xlog.c:187
TimeLineID currTLI
Definition: xlogreader.h:182
static TimeLineID sendTimeLine
Definition: walsender.c:140
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:141
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:143
static bool sendTimeLineIsHistoric
Definition: walsender.c:142

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3204 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3205 {
3206  Interval *result = palloc(sizeof(Interval));
3207 
3208  result->month = 0;
3209  result->day = 0;
3210  result->time = offset;
3211 
3212  return result;
3213 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:949

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 798 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

801 {
802  ListCell *lc;
803  bool snapshot_action_given = false;
804  bool reserve_wal_given = false;
805 
806  /* Parse options */
807  foreach(lc, cmd->options)
808  {
809  DefElem *defel = (DefElem *) lfirst(lc);
810 
811  if (strcmp(defel->defname, "export_snapshot") == 0)
812  {
813  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
814  ereport(ERROR,
815  (errcode(ERRCODE_SYNTAX_ERROR),
816  errmsg("conflicting or redundant options")));
817 
818  snapshot_action_given = true;
819  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
821  }
822  else if (strcmp(defel->defname, "use_snapshot") == 0)
823  {
824  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
825  ereport(ERROR,
826  (errcode(ERRCODE_SYNTAX_ERROR),
827  errmsg("conflicting or redundant options")));
828 
829  snapshot_action_given = true;
830  *snapshot_action = CRS_USE_SNAPSHOT;
831  }
832  else if (strcmp(defel->defname, "reserve_wal") == 0)
833  {
834  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
835  ereport(ERROR,
836  (errcode(ERRCODE_SYNTAX_ERROR),
837  errmsg("conflicting or redundant options")));
838 
839  reserve_wal_given = true;
840  *reserve_wal = true;
841  }
842  else
843  elog(ERROR, "unrecognized option: %s", defel->defname);
844  }
845 }
int errcode(int sqlerrcode)
Definition: elog.c:570
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
char * defname
Definition: parsenodes.h:730

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3220 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, IntervalPGetDatum, is_member_of_role(), IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3221 {
3222 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3223  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3224  TupleDesc tupdesc;
3225  Tuplestorestate *tupstore;
3226  MemoryContext per_query_ctx;
3227  MemoryContext oldcontext;
3228  List *sync_standbys;
3229  int i;
3230 
3231  /* check to see if caller supports us returning a tuplestore */
3232  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3233  ereport(ERROR,
3234  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3235  errmsg("set-valued function called in context that cannot accept a set")));
3236  if (!(rsinfo->allowedModes & SFRM_Materialize))
3237  ereport(ERROR,
3238  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3239  errmsg("materialize mode required, but it is not " \
3240  "allowed in this context")));
3241 
3242  /* Build a tuple descriptor for our result type */
3243  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3244  elog(ERROR, "return type must be a row type");
3245 
3246  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3247  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3248 
3249  tupstore = tuplestore_begin_heap(true, false, work_mem);
3250  rsinfo->returnMode = SFRM_Materialize;
3251  rsinfo->setResult = tupstore;
3252  rsinfo->setDesc = tupdesc;
3253 
3254  MemoryContextSwitchTo(oldcontext);
3255 
3256  /*
3257  * Get the currently active synchronous standbys.
3258  */
3259  LWLockAcquire(SyncRepLock, LW_SHARED);
3260  sync_standbys = SyncRepGetSyncStandbys(NULL);
3261  LWLockRelease(SyncRepLock);
3262 
3263  for (i = 0; i < max_wal_senders; i++)
3264  {
3265  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3267  XLogRecPtr write;
3268  XLogRecPtr flush;
3269  XLogRecPtr apply;
3270  TimeOffset writeLag;
3271  TimeOffset flushLag;
3272  TimeOffset applyLag;
3273  int priority;
3274  int pid;
3276  TimestampTz replyTime;
3278  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3279 
3280  SpinLockAcquire(&walsnd->mutex);
3281  if (walsnd->pid == 0)
3282  {
3283  SpinLockRelease(&walsnd->mutex);
3284  continue;
3285  }
3286  pid = walsnd->pid;
3287  sentPtr = walsnd->sentPtr;
3288  state = walsnd->state;
3289  write = walsnd->write;
3290  flush = walsnd->flush;
3291  apply = walsnd->apply;
3292  writeLag = walsnd->writeLag;
3293  flushLag = walsnd->flushLag;
3294  applyLag = walsnd->applyLag;
3295  priority = walsnd->sync_standby_priority;
3296  replyTime = walsnd->replyTime;
3297  SpinLockRelease(&walsnd->mutex);
3298 
3299  memset(nulls, 0, sizeof(nulls));
3300  values[0] = Int32GetDatum(pid);
3301 
3302  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3303  {
3304  /*
3305  * Only superusers and members of pg_read_all_stats can see
3306  * details. Other users only get the pid value to know it's a
3307  * walsender, but no details.
3308  */
3309  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3310  }
3311  else
3312  {
3313  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3314 
3315  if (XLogRecPtrIsInvalid(sentPtr))
3316  nulls[2] = true;
3317  values[2] = LSNGetDatum(sentPtr);
3318 
3319  if (XLogRecPtrIsInvalid(write))
3320  nulls[3] = true;
3321  values[3] = LSNGetDatum(write);
3322 
3323  if (XLogRecPtrIsInvalid(flush))
3324  nulls[4] = true;
3325  values[4] = LSNGetDatum(flush);
3326 
3327  if (XLogRecPtrIsInvalid(apply))
3328  nulls[5] = true;
3329  values[5] = LSNGetDatum(apply);
3330 
3331  /*
3332  * Treat a standby such as a pg_basebackup background process
3333  * which always returns an invalid flush location, as an
3334  * asynchronous standby.
3335  */
3336  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3337 
3338  if (writeLag < 0)
3339  nulls[6] = true;
3340  else
3341  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3342 
3343  if (flushLag < 0)
3344  nulls[7] = true;
3345  else
3346  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3347 
3348  if (applyLag < 0)
3349  nulls[8] = true;
3350  else
3351  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3352 
3353  values[9] = Int32GetDatum(priority);
3354 
3355  /*
3356  * More easily understood version of standby state. This is purely
3357  * informational.
3358  *
3359  * In quorum-based sync replication, the role of each standby
3360  * listed in synchronous_standby_names can be changing very
3361  * frequently. Any standbys considered as "sync" at one moment can
3362  * be switched to "potential" ones at the next moment. So, it's
3363  * basically useless to report "sync" or "potential" as their sync
3364  * states. We report just "quorum" for them.
3365  */
3366  if (priority == 0)
3367  values[10] = CStringGetTextDatum("async");
3368  else if (list_member_int(sync_standbys, i))
3370  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3371  else
3372  values[10] = CStringGetTextDatum("potential");
3373 
3374  if (replyTime == 0)
3375  nulls[11] = true;
3376  else
3377  values[11] = TimestampTzGetDatum(replyTime);
3378  }
3379 
3380  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3381  }
3382 
3383  /* clean up and return the tuplestore */
3384  tuplestore_donestoring(tupstore);
3385 
3386  return (Datum) 0;
3387 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:380
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:955
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:681
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
bool list_member_int(const List *list, int datum)
Definition: list.c:655
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:141
int max_wal_senders
Definition: walsender.c:120
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3185
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:121
static XLogRecPtr sentPtr
Definition: walsender.c:149
int allowedModes
Definition: execnodes.h:303
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4932
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:305
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:231
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:308
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:301
#define Int32GetDatum(X)
Definition: postgres.h:479
TupleDesc setDesc
Definition: execnodes.h:309
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:83
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3204
XLogRecPtr apply
Definition: pg_list.h:50

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1735 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1736 {
1737  bool changed = false;
1739 
1740  Assert(lsn != InvalidXLogRecPtr);
1741  SpinLockAcquire(&slot->mutex);
1742  if (slot->data.restart_lsn != lsn)
1743  {
1744  changed = true;
1745  slot->data.restart_lsn = lsn;
1746  }
1747  SpinLockRelease(&slot->mutex);
1748 
1749  if (changed)
1750  {
1753  }
1754 
1755  /*
1756  * One could argue that the slot should be saved to disk now, but that'd
1757  * be energy wasted - the worst lost information can do here is give us
1758  * wrong information in a statistics view - we'll just potentially be more
1759  * conservative in removing files.
1760  */
1761 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:132
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:732
XLogRecPtr restart_lsn
Definition: slot.h:72
slock_t mutex
Definition: slot.h:105
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1872 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1873 {
1874  bool changed = false;
1876 
1877  SpinLockAcquire(&slot->mutex);
1879 
1880  /*
1881  * For physical replication we don't need the interlock provided by xmin
1882  * and effective_xmin since the consequences of a missed increase are
1883  * limited to query cancellations, so set both at once.
1884  */
1885  if (!TransactionIdIsNormal(slot->data.xmin) ||
1886  !TransactionIdIsNormal(feedbackXmin) ||
1887  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1888  {
1889  changed = true;
1890  slot->data.xmin = feedbackXmin;
1891  slot->effective_xmin = feedbackXmin;
1892  }
1893  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1894  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1895  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1896  {
1897  changed = true;
1898  slot->data.catalog_xmin = feedbackCatalogXmin;
1899  slot->effective_catalog_xmin = feedbackCatalogXmin;
1900  }
1901  SpinLockRelease(&slot->mutex);
1902 
1903  if (changed)
1904  {
1907  }
1908 }
TransactionId xmin
Definition: proc.h:228
ReplicationSlotPersistentData data
Definition: slot.h:132
PGXACT * MyPgXact
Definition: proc.c:69
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:128
TransactionId catalog_xmin
Definition: slot.h:69
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:61
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:129
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:105
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1600 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1601 {
1602  unsigned char firstchar;
1603  int r;
1604  bool received = false;
1605 
1607 
1608  for (;;)
1609  {
1610  pq_startmsgread();
1611  r = pq_getbyte_if_available(&firstchar);
1612  if (r < 0)
1613  {
1614  /* unexpected error or EOF */
1616  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1617  errmsg("unexpected EOF on standby connection")));
1618  proc_exit(0);
1619  }
1620  if (r == 0)
1621  {
1622  /* no data available without blocking */
1623  pq_endmsgread();
1624  break;
1625  }
1626 
1627  /* Read the message contents */
1629  if (pq_getmessage(&reply_message, 0))
1630  {
1632  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1633  errmsg("unexpected EOF on standby connection")));
1634  proc_exit(0);
1635  }
1636 
1637  /*
1638  * If we already received a CopyDone from the frontend, the frontend
1639  * should not send us anything until we've closed our end of the COPY.
1640  * XXX: In theory, the frontend could already send the next command
1641  * before receiving the CopyDone, but libpq doesn't currently allow
1642  * that.
1643  */
1644  if (streamingDoneReceiving && firstchar != 'X')
1645  ereport(FATAL,
1646  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1647  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1648  firstchar)));
1649 
1650  /* Handle the very limited subset of commands expected in this phase */
1651  switch (firstchar)
1652  {
1653  /*
1654  * 'd' means a standby reply wrapped in a CopyData packet.
1655  */
1656  case 'd':
1658  received = true;
1659  break;
1660 
1661  /*
1662  * CopyDone means the standby requested to finish streaming.
1663  * Reply with CopyDone, if we had not sent that already.
1664  */
1665  case 'c':
1666  if (!streamingDoneSending)
1667  {
1668  pq_putmessage_noblock('c', NULL, 0);
1669  streamingDoneSending = true;
1670  }
1671 
1672  streamingDoneReceiving = true;
1673  received = true;
1674  break;
1675 
1676  /*
1677  * 'X' means that the standby is closing down the socket.
1678  */
1679  case 'X':
1680  proc_exit(0);
1681 
1682  default:
1683  ereport(FATAL,
1684  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1685  errmsg("invalid standby message type \"%c\"",
1686  firstchar)));
1687  }
1688  }
1689 
1690  /*
1691  * Save the last reply timestamp if we've received at least one reply.
1692  */
1693  if (received)
1694  {
1696  waiting_for_ping_response = false;
1697  }
1698 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1704
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
static TimestampTz last_processing
Definition: walsender.c:157
void pq_startmsgread(void)
Definition: pqcomm.c:1211
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1041
static bool streamingDoneSending
Definition: walsender.c:174
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:141
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1273
static StringInfoData reply_message
Definition: walsender.c:153
void pq_endmsgread(void)
Definition: pqcomm.c:1235
static bool streamingDoneReceiving
Definition: walsender.c:175
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:166
static TimestampTz last_reply_timestamp
Definition: walsender.c:163

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1952 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, log_min_messages, WalSnd::mutex, MyPgXact, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1953 {
1954  TransactionId feedbackXmin;
1955  uint32 feedbackEpoch;
1956  TransactionId feedbackCatalogXmin;
1957  uint32 feedbackCatalogEpoch;
1958  TimestampTz replyTime;
1959 
1960  /*
1961  * Decipher the reply message. The caller already consumed the msgtype
1962  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1963  * of this message.
1964  */
1965  replyTime = pq_getmsgint64(&reply_message);
1966  feedbackXmin = pq_getmsgint(&reply_message, 4);
1967  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1968  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1969  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1970 
1971  if (log_min_messages <= DEBUG2)
1972  {
1973  char *replyTimeStr;
1974 
1975  /* Copy because timestamptz_to_str returns a static buffer */
1976  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1977 
1978  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
1979  feedbackXmin,
1980  feedbackEpoch,
1981  feedbackCatalogXmin,
1982  feedbackCatalogEpoch,
1983  replyTimeStr);
1984 
1985  pfree(replyTimeStr);
1986  }
1987 
1988  /*
1989  * Update shared state for this WalSender process based on reply data from
1990  * standby.
1991  */
1992  {
1993  WalSnd *walsnd = MyWalSnd;
1994 
1995  SpinLockAcquire(&walsnd->mutex);
1996  walsnd->replyTime = replyTime;
1997  SpinLockRelease(&walsnd->mutex);
1998  }
1999 
2000  /*
2001  * Unset WalSender's xmins if the feedback message values are invalid.
2002  * This happens when the downstream turned hot_standby_feedback off.
2003  */
2004  if (!TransactionIdIsNormal(feedbackXmin)
2005  && !TransactionIdIsNormal(feedbackCatalogXmin))
2006  {
2008  if (MyReplicationSlot != NULL)
2009  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2010  return;
2011  }
2012 
2013  /*
2014  * Check that the provided xmin/epoch are sane, that is, not in the future
2015  * and not so far back as to be already wrapped around. Ignore if not.
2016  */
2017  if (TransactionIdIsNormal(feedbackXmin) &&
2018  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2019  return;
2020 
2021  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2022  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2023  return;
2024 
2025  /*
2026  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2027  * the xmin will be taken into account by GetOldestXmin. This will hold
2028  * back the removal of dead rows and thereby prevent the generation of
2029  * cleanup conflicts on the standby server.
2030  *
2031  * There is a small window for a race condition here: although we just
2032  * checked that feedbackXmin precedes nextXid, the nextXid could have
2033  * gotten advanced between our fetching it and applying the xmin below,
2034  * perhaps far enough to make feedbackXmin wrap around. In that case the
2035  * xmin we set here would be "in the future" and have no effect. No point
2036  * in worrying about this since it's too late to save the desired data
2037  * anyway. Assuming that the standby sends us an increasing sequence of
2038  * xmins, this could only happen during the first reply cycle, else our
2039  * own xmin would prevent nextXid from advancing so far.
2040  *
2041  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2042  * is assumed atomic, and there's no real need to prevent a concurrent
2043  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2044  * safe, and if we're moving it backwards, well, the data is at risk
2045  * already since a VACUUM could have just finished calling GetOldestXmin.)
2046  *
2047  * If we're using a replication slot we reserve the xmin via that,
2048  * otherwise via the walsender's PGXACT entry. We can only track the
2049  * catalog xmin separately when using a slot, so we store the least of the
2050  * two provided when not using a slot.
2051  *
2052  * XXX: It might make sense to generalize the ephemeral slot concept and
2053  * always use the slot mechanism to handle the feedback xmin.
2054  */
2055  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2056  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2057  else
2058  {
2059  if (TransactionIdIsNormal(feedbackCatalogXmin)
2060  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2061  MyPgXact->xmin = feedbackCatalogXmin;
2062  else
2063  MyPgXact->xmin = feedbackXmin;
2064  }
2065 }
uint32 TransactionId
Definition: c.h:507
TransactionId xmin
Definition: proc.h:228
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
slock_t mutex
PGXACT * MyPgXact
Definition: proc.c:69
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1921
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1056
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:358
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:153
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
int log_min_messages
Definition: guc.c:510
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:226
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1872
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1704 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1705 {
1706  char msgtype;
1707 
1708  /*
1709  * Check message type from the first byte.
1710  */
1711  msgtype = pq_getmsgbyte(&reply_message);
1712 
1713  switch (msgtype)
1714  {
1715  case 'r':
1717  break;
1718 
1719  case 'h':
1721  break;
1722 
1723  default:
1725  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1726  errmsg("unexpected message type \"%c\"", msgtype)));
1727  proc_exit(0);
1728  }
1729 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
static StringInfoData reply_message
Definition: walsender.c:153
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1767
int errmsg(const char *fmt,...)
Definition: elog.c:784
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1952

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1767 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), log_min_messages, LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1768 {
1769  XLogRecPtr writePtr,
1770  flushPtr,
1771  applyPtr;
1772  bool replyRequested;
1773  TimeOffset writeLag,
1774  flushLag,
1775  applyLag;
1776  bool clearLagTimes;
1777  TimestampTz now;
1778  TimestampTz replyTime;
1779 
1780  static bool fullyAppliedLastTime = false;
1781 
1782  /* the caller already consumed the msgtype byte */
1783  writePtr = pq_getmsgint64(&reply_message);
1784  flushPtr = pq_getmsgint64(&reply_message);
1785  applyPtr = pq_getmsgint64(&reply_message);
1786  replyTime = pq_getmsgint64(&reply_message);
1787  replyRequested = pq_getmsgbyte(&reply_message);
1788 
1789  if (log_min_messages <= DEBUG2)
1790  {
1791  char *replyTimeStr;
1792 
1793  /* Copy because timestamptz_to_str returns a static buffer */
1794  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1795 
1796  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1797  (uint32) (writePtr >> 32), (uint32) writePtr,
1798  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1799  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1800  replyRequested ? " (reply requested)" : "",
1801  replyTimeStr);
1802 
1803  pfree(replyTimeStr);
1804  }
1805 
1806  /* See if we can compute the round-trip lag for these positions. */
1807  now = GetCurrentTimestamp();
1808  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1809  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1810  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1811 
1812  /*
1813  * If the standby reports that it has fully replayed the WAL in two
1814  * consecutive reply messages, then the second such message must result
1815  * from wal_receiver_status_interval expiring on the standby. This is a
1816  * convenient time to forget the lag times measured when it last
1817  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1818  * until more WAL traffic arrives.
1819  */
1820  clearLagTimes = false;
1821  if (applyPtr == sentPtr)
1822  {
1823  if (fullyAppliedLastTime)
1824  clearLagTimes = true;
1825  fullyAppliedLastTime = true;
1826  }
1827  else
1828  fullyAppliedLastTime = false;
1829 
1830  /* Send a reply if the standby requested one. */
1831  if (replyRequested)
1832  WalSndKeepalive(false);
1833 
1834  /*
1835  * Update shared state for this WalSender process based on reply data from
1836  * standby.
1837  */
1838  {
1839  WalSnd *walsnd = MyWalSnd;
1840 
1841  SpinLockAcquire(&walsnd->mutex);
1842  walsnd->write = writePtr;
1843  walsnd->flush = flushPtr;
1844  walsnd->apply = applyPtr;
1845  if (writeLag != -1 || clearLagTimes)
1846  walsnd->writeLag = writeLag;
1847  if (flushLag != -1 || clearLagTimes)
1848  walsnd->flushLag = flushLag;
1849  if (applyLag != -1 || clearLagTimes)
1850  walsnd->applyLag = applyLag;
1851  walsnd->replyTime = replyTime;
1852  SpinLockRelease(&walsnd->mutex);
1853  }
1854 
1857 
1858  /*
1859  * Advance our local xmin horizon when the client confirmed a flush.
1860  */
1861  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1862  {
1865  else
1867  }
1868 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1056
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3395
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1735
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:358
#define SlotIsLogical(slot)
Definition: slot.h:154
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:153
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3518
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
int log_min_messages
Definition: guc.c:510
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1005
#define elog(elevel,...)
Definition: elog.h:226
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:411
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
bool am_cascading_walsender
Definition: walsender.c:115
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 433 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

434 {
436  char histfname[MAXFNAMELEN];
437  char path[MAXPGPATH];
438  int fd;
439  off_t histfilelen;
440  off_t bytesleft;
441  Size len;
442 
443  /*
444  * Reply with a result set with one row, and two columns. The first col is
445  * the name of the history file, 2nd is the contents.
446  */
447 
448  TLHistoryFileName(histfname, cmd->timeline);
449  TLHistoryFilePath(path, cmd->timeline);
450 
451  /* Send a RowDescription message */
452  pq_beginmessage(&buf, 'T');
453  pq_sendint16(&buf, 2); /* 2 fields */
454 
455  /* first field */
456  pq_sendstring(&buf, "filename"); /* col name */
457  pq_sendint32(&buf, 0); /* table oid */
458  pq_sendint16(&buf, 0); /* attnum */
459  pq_sendint32(&buf, TEXTOID); /* type oid */
460  pq_sendint16(&buf, -1); /* typlen */
461  pq_sendint32(&buf, 0); /* typmod */
462  pq_sendint16(&buf, 0); /* format code */
463 
464  /* second field */
465  pq_sendstring(&buf, "content"); /* col name */
466  pq_sendint32(&buf, 0); /* table oid */
467  pq_sendint16(&buf, 0); /* attnum */
468  pq_sendint32(&buf, BYTEAOID); /* type oid */
469  pq_sendint16(&buf, -1); /* typlen */
470  pq_sendint32(&buf, 0); /* typmod */
471  pq_sendint16(&buf, 0); /* format code */
472  pq_endmessage(&buf);
473 
474  /* Send a DataRow message */
475  pq_beginmessage(&buf, 'D');
476  pq_sendint16(&buf, 2); /* # of columns */
477  len = strlen(histfname);
478  pq_sendint32(&buf, len); /* col1 len */
479  pq_sendbytes(&buf, histfname, len);
480 
481  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
482  if (fd < 0)
483  ereport(ERROR,
485  errmsg("could not open file \"%s\": %m", path)));
486 
487  /* Determine file length and send it to client */
488  histfilelen = lseek(fd, 0, SEEK_END);
489  if (histfilelen < 0)
490  ereport(ERROR,
492  errmsg("could not seek to end of file \"%s\": %m", path)));
493  if (lseek(fd, 0, SEEK_SET) != 0)
494  ereport(ERROR,
496  errmsg("could not seek to beginning of file \"%s\": %m", path)));
497 
498  pq_sendint32(&buf, histfilelen); /* col2 len */
499 
500  bytesleft = histfilelen;
501  while (bytesleft > 0)
502  {
503  PGAlignedBlock rbuf;
504  int nread;
505 
507  nread = read(fd, rbuf.data, sizeof(rbuf));
509  if (nread < 0)
510  ereport(ERROR,
512  errmsg("could not read file \"%s\": %m",
513  path)));
514  else if (nread == 0)
515  ereport(ERROR,
517  errmsg("could not read file \"%s\": read %d of %zu",
518  path, nread, (Size) bytesleft)));
519 
520  pq_sendbytes(&buf, rbuf.data, nread);
521  bytesleft -= nread;
522  }
523 
524  if (CloseTransientFile(fd) != 0)
525  ereport(ERROR,
527  errmsg("could not close file \"%s\": %m", path)));
528 
529  pq_endmessage(&buf);
530 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
int errcode(int sqlerrcode)
Definition: elog.c:570
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1191
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1060
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2255
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:68
int errcode_for_file_access(void)
Definition: elog.c:593
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int CloseTransientFile(int fd)
Definition: fd.c:2432
#define MAXFNAMELEN
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:784
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1062 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1063 {
1065 
1066  /* make sure that our requirements are still fulfilled */
1068 
1070 
1071  ReplicationSlotAcquire(cmd->slotname, true);
1072 
1073  /*
1074  * Force a disconnect, so that the decoding code doesn't need to care
1075  * about an eventual switch from running in recovery, to running in a
1076  * normal environment. Client code is expected to handle reconnects.
1077  */
1079  {
1080  ereport(LOG,
1081  (errmsg("terminating walsender process after promotion")));
1082  got_STOPPING = true;
1083  }
1084 
1085  /*
1086  * Create our decoding context, making it start at the previously ack'ed
1087  * position.
1088  *
1089  * Do this before sending a CopyBothResponse message, so that any errors
1090  * are reported early.
1091  */
1093  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1097 
1098 
1100 
1101  /* Send a CopyBothResponse message, and start streaming */
1102  pq_beginmessage(&buf, 'W');
1103  pq_sendbyte(&buf, 0);
1104  pq_sendint16(&buf, 0);
1105  pq_endmessage(&buf);
1106  pq_flush();
1107 
1108 
1109  /* Start reading WAL from the oldest required WAL. */
1111 
1112  /*
1113  * Report the location after which we'll send out further commits as the
1114  * current sentPtr.
1115  */
1117 
1118  /* Also update the sent position status in shared memory */
1122 
1123  replication_active = true;
1124 
1126 
1127  /* Main loop of walsender */
1129 
1132 
1133  replication_active = false;
1134  if (got_STOPPING)
1135  proc_exit(0);
1137 
1138  /* Get out of COPY mode (CommandComplete). */
1139  EndCommand("COPY 0", DestRemote);
1140 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
void proc_exit(int code)
Definition: ipc.c:104
ReplicationSlotPersistentData data
Definition: slot.h:132
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7917
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:80
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:192
static char * buf
Definition: pg_test_fsync.c:68
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1178
static XLogRecPtr logical_startptr
Definition: walsender.c:193
void ReplicationSlotRelease(void)
Definition: slot.c:424
void SyncRepInitConfig(void)
Definition: syncrep.c:383
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2152
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1151
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:190
XLogRecPtr sentPtr
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:374
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:732
void WalSndSetState(WalSndState state)
Definition: walsender.c:3166
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:512
static void XLogSendLogical(void)
Definition: walsender.c:2812
XLogRecPtr restart_lsn
Definition: slot.h:72
int errmsg(const char *fmt,...)
Definition: elog.c:784
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:764
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:79
bool am_cascading_walsender
Definition: walsender.c:115
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1267

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 539 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

540 {
542  XLogRecPtr FlushPtr;
543 
544  if (ThisTimeLineID == 0)
545  ereport(ERROR,
546  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
547  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
548 
549  /*
550  * We assume here that we're logging enough information in the WAL for
551  * log-shipping, since this is checked in PostmasterMain().
552  *
553  * NOTE: wal_level can only change at shutdown, so in most cases it is
554  * difficult for there to be WAL data that we can still see that was
555  * written at wal_level='minimal'.
556  */
557 
558  if (cmd->slotname)
559  {
560  ReplicationSlotAcquire(cmd->slotname, true);
562  ereport(ERROR,
563  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
564  (errmsg("cannot use a logical replication slot for physical replication"))));
565  }
566 
567  /*
568  * Select the timeline. If it was given explicitly by the client, use
569  * that. Otherwise use the timeline of the last replayed record, which is
570  * kept in ThisTimeLineID.
571  */
573  {
574  /* this also updates ThisTimeLineID */
575  FlushPtr = GetStandbyFlushRecPtr();
576  }
577  else
578  FlushPtr = GetFlushRecPtr();
579 
580  if (cmd->timeline != 0)
581  {
582  XLogRecPtr switchpoint;
583 
584  sendTimeLine = cmd->timeline;
586  {
587  sendTimeLineIsHistoric = false;
589  }
590  else
591  {
592  List *timeLineHistory;
593 
594  sendTimeLineIsHistoric = true;
595 
596  /*
597  * Check that the timeline the client requested exists, and the
598  * requested start location is on that timeline.
599  */
600  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
601  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
603  list_free_deep(timeLineHistory);
604 
605  /*
606  * Found the requested timeline in the history. Check that
607  * requested startpoint is on that timeline in our history.
608  *
609  * This is quite loose on purpose. We only check that we didn't
610  * fork off the requested timeline before the switchpoint. We
611  * don't check that we switched *to* it before the requested
612  * starting point. This is because the client can legitimately
613  * request to start replication from the beginning of the WAL
614  * segment that contains switchpoint, but on the new timeline, so
615  * that it doesn't end up with a partial segment. If you ask for
616  * too old a starting point, you'll get an error later when we
617  * fail to find the requested WAL segment in pg_wal.
618  *
619  * XXX: we could be more strict here and only allow a startpoint
620  * that's older than the switchpoint, if it's still in the same
621  * WAL segment.
622  */
623  if (!XLogRecPtrIsInvalid(switchpoint) &&
624  switchpoint < cmd->startpoint)
625  {
626  ereport(ERROR,
627  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
628  (uint32) (cmd->startpoint >> 32),
629  (uint32) (cmd->startpoint),
630  cmd->timeline),
631  errdetail("This server's history forked from timeline %u at %X/%X.",
632  cmd->timeline,
633  (uint32) (switchpoint >> 32),
634  (uint32) (switchpoint))));
635  }
636  sendTimeLineValidUpto = switchpoint;
637  }
638  }
639  else
640  {
643  sendTimeLineIsHistoric = false;
644  }
645 
647 
648  /* If there is nothing to stream, don't even enter COPY mode */
650  {
651  /*
652  * When we first start replication the standby will be behind the
653  * primary. For some applications, for example synchronous
654  * replication, it is important to have a clear state for this initial
655  * catchup mode, so we can trigger actions when we change streaming
656  * state later. We may stay in this state for a long time, which is
657  * exactly why we want to be able to monitor whether or not we are
658  * still here.
659  */
661 
662  /* Send a CopyBothResponse message, and start streaming */
663  pq_beginmessage(&buf, 'W');
664  pq_sendbyte(&buf, 0);
665  pq_sendint16(&buf, 0);
666  pq_endmessage(&buf);
667  pq_flush();
668 
669  /*
670  * Don't allow a request to stream from a future point in WAL that
671  * hasn't been flushed to disk in this server yet.
672  */
673  if (FlushPtr < cmd->startpoint)
674  {
675  ereport(ERROR,
676  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
677  (uint32) (cmd->startpoint >> 32),
678  (uint32) (cmd->startpoint),
679  (uint32) (FlushPtr >> 32),
680  (uint32) (FlushPtr))));
681  }
682 
683  /* Start streaming from the requested point */
684  sentPtr = cmd->startpoint;
685 
686  /* Initialize shared memory status, too */
690 
692 
693  /* Main loop of walsender */
694  replication_active = true;
695 
697 
698  replication_active = false;
699  if (got_STOPPING)
700  proc_exit(0);
702 
704  }
705 
706  if (cmd->slotname)
708 
709  /*
710  * Copy is finished now. Send a single-row result set indicating the next
711  * timeline.
712  */
714  {
715  char startpos_str[8 + 1 + 8 + 1];
717  TupOutputState *tstate;
718  TupleDesc tupdesc;
719  Datum values[2];
720  bool nulls[2];
721 
722  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
723  (uint32) (sendTimeLineValidUpto >> 32),
725 
727  MemSet(nulls, false, sizeof(nulls));
728 
729  /*
730  * Need a tuple descriptor representing two columns. int8 may seem
731  * like a surprising data type for this, but in theory int4 would not
732  * be wide enough for this, as TimeLineID is unsigned.
733  */
734  tupdesc = CreateTemplateTupleDesc(2);
735  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
736  INT8OID, -1, 0);
737  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
738  TEXTOID, -1, 0);
739 
740  /* prepare for projection of tuple */
741  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
742 
743  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
744  values[1] = CStringGetTextDatum(startpos_str);
745 
746  /* send it to dest */
747  do_tup_output(tstate, values, nulls);
748 
749  end_tup_output(tstate);
750  }
751 
752  /* Send CommandComplete message */
753  pq_puttextmessage('C', "START_STREAMING");
754 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2549
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2252
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
#define MemSet(start, val, len)
Definition: c.h:955
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8249
void list_free_deep(List *list)
Definition: list.c:1391
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2310
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2232
static char * buf
Definition: pg_test_fsync.c:68
static bool streamingDoneSending
Definition: walsender.c:174
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:860
unsigned int uint32
Definition: c.h:358
void ReplicationSlotRelease(void)
Definition: slot.c:424
#define SlotIsLogical(slot)
Definition: slot.h:154
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1699
void SyncRepInitConfig(void)
Definition: syncrep.c:383
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2152
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:190
uintptr_t Datum
Definition: postgres.h:367
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
TimeLineID ThisTimeLineID
Definition: xlog.c:187
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:140
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
void WalSndSetState(WalSndState state)
Definition: walsender.c:3166
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:141
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:143
static bool streamingDoneReceiving
Definition: walsender.c:175
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:784
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2922
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
Definition: pg_list.h:50
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:142
bool am_cascading_walsender
Definition: walsender.c:115

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 1921 of file walsender.c.

References EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

1922 {
1923  FullTransactionId nextFullXid;
1924  TransactionId nextXid;
1925  uint32 nextEpoch;
1926 
1927  nextFullXid = ReadNextFullTransactionId();
1928  nextXid = XidFromFullTransactionId(nextFullXid);
1929  nextEpoch = EpochFromFullTransactionId(nextFullXid);
1930 
1931  if (xid <= nextXid)
1932  {
1933  if (epoch != nextEpoch)
1934  return false;
1935  }
1936  else
1937  {
1938  if (epoch + 1 != nextEpoch)
1939  return false;
1940  }
1941 
1942  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1943  return false; /* epoch OK, but it's wrapped around */
1944 
1945  return true;
1946 }
uint32 TransactionId
Definition: c.h:507
#define XidFromFullTransactionId(x)
Definition: transam.h:48
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:358
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2125 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2126 {
2127  TimestampTz timeout;
2128 
2129  /* don't bail out if we're doing something that doesn't require timeouts */
2130  if (last_reply_timestamp <= 0)
2131  return;
2132 
2135 
2136  if (wal_sender_timeout > 0 && last_processing >= timeout)
2137  {
2138  /*
2139  * Since typically expiration of replication timeout means
2140  * communication problem, we don't send the error message to the
2141  * standby.
2142  */
2144  (errmsg("terminating walsender process due to replication timeout")));
2145 
2146  WalSndShutdown();
2147  }
2148 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:157
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndShutdown(void)
Definition: walsender.c:225
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:784
static TimestampTz last_reply_timestamp
Definition: walsender.c:163

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2075 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2076 {
2077  long sleeptime = 10000; /* 10 s */
2078 
2080  {
2081  TimestampTz wakeup_time;
2082  long sec_to_timeout;
2083  int microsec_to_timeout;
2084 
2085  /*
2086  * At the latest stop sleeping once wal_sender_timeout has been
2087  * reached.
2088  */
2091 
2092  /*
2093  * If no ping has been sent yet, wakeup when it's time to do so.
2094  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2095  * the timeout passed without a response.
2096  */
2099  wal_sender_timeout / 2);
2100 
2101  /* Compute relative time until wakeup. */
2102  TimestampDifference(now, wakeup_time,
2103  &sec_to_timeout, &microsec_to_timeout);
2104 
2105  sleeptime = sec_to_timeout * 1000 +
2106  microsec_to_timeout / 1000;
2107  }
2108 
2109  return sleeptime;
2110 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:42
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1657
static bool waiting_for_ping_response
Definition: walsender.c:166
static TimestampTz last_reply_timestamp
Definition: walsender.c:163
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2882 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2883 {
2884  XLogRecPtr replicatedPtr;
2885 
2886  /* ... let's just be real sure we're caught up ... */
2887  send_data();
2888 
2889  /*
2890  * To figure out whether all WAL has successfully been replicated, check
2891  * flush location if valid, write otherwise. Tools like pg_receivewal will
2892  * usually (unless in synchronous mode) return an invalid flush location.
2893  */
2894  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2896 
2897  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2898  !pq_is_send_pending())
2899  {
2900  /* Inform the standby that XLOG streaming is done */
2901  EndCommand("COPY 0", DestRemote);
2902  pq_flush();
2903 
2904  proc_exit(0);
2905  }
2907  {
2908  WalSndKeepalive(true);
2910  }
2911 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3395
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:178
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:166

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 297 of file walsender.c.

References close, ConditionVariableCancelSleep(), DestNone, DestRemote, got_SIGUSR2, got_STOPPING, LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_STARTUP, whereToSendOutput, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

298 {
302 
303  if (sendSeg->ws_file >= 0)
304  {
306  sendSeg->ws_file = -1;
307  }
308 
309  if (MyReplicationSlot != NULL)
311 
313 
314  replication_active = false;
315 
316  if (got_STOPPING || got_SIGUSR2)
317  proc_exit(0);
318 
319  /* Revert back to startup state */
321 }
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
void proc_exit(int code)
Definition: ipc.c:104
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:424
static WALOpenSegment * sendSeg
Definition: walsender.c:131
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:181
static volatile sig_atomic_t replication_active
Definition: walsender.c:190
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3166
void ReplicationSlotCleanup(void)
Definition: slot.c:479
void LWLockReleaseAll(void)
Definition: lwlock.c:1825
#define close(a)
Definition: win32.h:12

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3185 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3186 {
3187  switch (state)
3188  {
3189  case WALSNDSTATE_STARTUP:
3190  return "startup";
3191  case WALSNDSTATE_BACKUP:
3192  return "backup";
3193  case WALSNDSTATE_CATCHUP:
3194  return "catchup";
3195  case WALSNDSTATE_STREAMING:
3196  return "streaming";
3197  case WALSNDSTATE_STOPPING:
3198  return "stopping";
3199  }
3200  return "UNKNOWN";
3201 }
Definition: regguts.h:298

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3102 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3103 {
3104  int i;
3105 
3106  for (i = 0; i < max_wal_senders; i++)
3107  {
3108  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3109  pid_t pid;
3110 
3111  SpinLockAcquire(&walsnd->mutex);
3112  pid = walsnd->pid;
3113  SpinLockRelease(&walsnd->mutex);
3114 
3115  if (pid == 0)
3116  continue;
3117 
3119  }
3120 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3395 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3396 {
3397  elog(DEBUG2, "sending replication keepalive");
3398 
3399  /* construct the message... */
3401  pq_sendbyte(&output_message, 'k');
3404  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3405 
3406  /* ... and send it wrapped in CopyData */
3408 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
static StringInfoData output_message
Definition: walsender.c:152
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static XLogRecPtr sentPtr
Definition: walsender.c:149
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog(elevel,...)
Definition: elog.h:226

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3414 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3415 {
3416  TimestampTz ping_time;
3417 
3418  /*
3419  * Don't send keepalive messages if timeouts are globally disabled or
3420  * we're doing something not partaking in timeouts.
3421  */
3422  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3423  return;
3424 
3426  return;
3427 
3428  /*
3429  * If half of wal_sender_timeout has lapsed without receiving any reply
3430  * from the standby, send a keep-alive message to the standby requesting
3431  * an immediate reply.
3432  */
3434  wal_sender_timeout / 2);
3435  if (last_processing >= ping_time)
3436  {
3437  WalSndKeepalive(true);
3439 
3440  /* Try to flush pending output to the client */
3441  if (pq_flush_if_writable() != 0)
3442  WalSndShutdown();
3443  }
3444 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3395
static TimestampTz last_processing
Definition: walsender.c:157
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:225
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:166
static TimestampTz last_reply_timestamp
Definition: walsender.c:163

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2337 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2338 {
2339  WalSnd *walsnd = MyWalSnd;
2340 
2341  Assert(walsnd != NULL);
2342 
2343  MyWalSnd = NULL;
2344 
2345  SpinLockAcquire(&walsnd->mutex);
2346  /* clear latch while holding the spinlock, so it can safely be read */
2347  walsnd->latch = NULL;
2348  /* Mark WalSnd struct as no longer being in use. */
2349  walsnd->pid = 0;
2350  SpinLockRelease(&walsnd->mutex);
2351 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:732

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2997 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2998 {
2999  int save_errno = errno;
3000 
3001  got_SIGUSR2 = true;
3002  SetLatch(MyLatch);
3003 
3004  errno = save_errno;
3005 }
void SetLatch(Latch *latch)
Definition: latch.c:436
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:181
struct Latch * MyLatch
Definition: globals.c:54

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2152 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

2153 {
2154  /*
2155  * Initialize the last reply timestamp. That enables timeout processing
2156  * from hereon.
2157  */
2159  waiting_for_ping_response = false;
2160 
2161  /*
2162  * Loop until we reach the end of this timeline or the client requests to
2163  * stop streaming.
2164  */
2165  for (;;)
2166  {
2167  /* Clear any already-pending wakeups */
2169 
2171 
2172  /* Process any requests or signals received recently */
2173  if (ConfigReloadPending)
2174  {
2175  ConfigReloadPending = false;
2178  }
2179 
2180  /* Check for input from the client */
2182 
2183  /*
2184  * If we have received CopyDone from the client, sent CopyDone
2185  * ourselves, and the output buffer is empty, it's time to exit
2186  * streaming.
2187  */
2189  !pq_is_send_pending())
2190  break;
2191 
2192  /*
2193  * If we don't have any pending data in the output buffer, try to send
2194  * some more. If there is some, we don't bother to call send_data
2195  * again until we've flushed it ... but we'd better assume we are not
2196  * caught up.
2197  */
2198  if (!pq_is_send_pending())
2199  send_data();
2200  else
2201  WalSndCaughtUp = false;
2202 
2203  /* Try to flush pending output to the client */
2204  if (pq_flush_if_writable() != 0)
2205  WalSndShutdown();
2206 
2207  /* If nothing remains to be sent right now ... */
2209  {
2210  /*
2211  * If we're in catchup state, move to streaming. This is an
2212  * important state change for users to know about, since before
2213  * this point data loss might occur if the primary dies and we
2214  * need to failover to the standby. The state change is also
2215  * important for synchronous replication, since commits that
2216  * started to wait at that point might wait for some time.
2217  */
2219  {
2220  ereport(DEBUG1,
2221  (errmsg("\"%s\" has now caught up with upstream server",
2222  application_name)));
2224  }
2225 
2226  /*
2227  * When SIGUSR2 arrives, we send any outstanding logs up to the
2228  * shutdown checkpoint record (i.e., the latest record), wait for
2229  * them to be replicated to the standby, and exit. This may be a
2230  * normal termination at shutdown, or a promotion, the walsender
2231  * is not sure which.
2232  */
2233  if (got_SIGUSR2)
2234  WalSndDone(send_data);
2235  }
2236 
2237  /* Check for replication timeout. */
2239 
2240  /* Send keepalive if the time has come */
2242 
2243  /*
2244  * We don't block if not caught up, unless there is unsent data
2245  * pending in which case we'd better block until the socket is
2246  * write-ready. This test is only needed for the case where the
2247  * send_data callback handled a subset of the available data but then
2248  * pq_flush_if_writable flushed it all --- we should immediately try
2249  * to send more.
2250  */
2252  {
2253  long sleeptime;
2254  int wakeEvents;
2255 
2256  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2258 
2259  /*
2260  * Use fresh timestamp, not last_processing, to reduce the chance
2261  * of reaching wal_sender_timeout before sending a keepalive.
2262  */
2264 
2265  if (pq_is_send_pending())
2266  wakeEvents |= WL_SOCKET_WRITEABLE;
2267 
2268  /* Sleep until something happens or we time out */
2269  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2270  MyProcPort->sock, sleeptime,
2272  }
2273  }
2274  return;
2275 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2882
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:42
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:174
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:181
void SyncRepInitConfig(void)
Definition: syncrep.c:383
#define ereport(elevel, rest)
Definition: elog.h:141
static bool WalSndCaughtUp
Definition: walsender.c:178
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3414
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2125
static void WalSndShutdown(void)
Definition: walsender.c:225
WalSnd * MyWalSnd
Definition: walsender.c:111
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2075
void WalSndSetState(WalSndState state)
Definition: walsender.c:3166
static bool streamingDoneReceiving
Definition: walsender.c:175
char * application_name
Definition: guc.c:529
int errmsg(const char *fmt,...)
Definition: elog.c:784
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:166
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:163
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1600
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1151 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1152 {
1153  /* can't have sync rep confused by sending the same LSN several times */
1154  if (!last_write)
1155  lsn = InvalidXLogRecPtr;
1156 
1157  resetStringInfo(ctx->out);
1158 
1159  pq_sendbyte(ctx->out, 'w');
1160  pq_sendint64(ctx->out, lsn); /* dataStart */
1161  pq_sendint64(ctx->out, lsn); /* walEnd */
1162 
1163  /*
1164  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1165  * reserve space here.
1166  */
1167  pq_sendint64(ctx->out, 0); /* sendtime */
1168 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
StringInfo out
Definition: logical.h:71

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2952 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2953 {
2954  int i;
2955 
2956  for (i = 0; i < max_wal_senders; i++)
2957  {
2958  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2959 
2960  SpinLockAcquire(&walsnd->mutex);
2961  if (walsnd->pid == 0)
2962  {
2963  SpinLockRelease(&walsnd->mutex);
2964  continue;
2965  }
2966  walsnd->needreload = true;
2967  SpinLockRelease(&walsnd->mutex);
2968  }
2969 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3166 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3167 {
3168  WalSnd *walsnd = MyWalSnd;
3169 
3171 
3172  if (walsnd->state == state)
3173  return;
3174 
3175  SpinLockAcquire(&walsnd->mutex);
3176  walsnd->state = state;
3177  SpinLockRelease(&walsnd->mutex);
3178 }
slock_t mutex
bool am_walsender
Definition: walsender.c:114
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:732
Definition: regguts.h:298

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3041 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3042 {
3043  bool found;
3044  int i;
3045 
3046  WalSndCtl = (WalSndCtlData *)
3047  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3048 
3049  if (!found)
3050  {
3051  /* First time through, so initialize */
3053 
3054  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3056 
3057  for (i = 0; i < max_wal_senders; i++)
3058  {
3059  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3060 
3061  SpinLockInit(&walsnd->mutex);
3062  }
3063  }
3064 }
Size WalSndShmemSize(void)
Definition: walsender.c:3029
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
#define MemSet(start, val, len)
Definition: c.h:955
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:120
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3029 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3030 {
3031  Size size = 0;
3032 
3033  size = offsetof(WalSndCtlData, walsnds);
3034  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3035 
3036  return size;
3037 }
int max_wal_senders
Definition: walsender.c:120
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:466
#define offsetof(type, field)
Definition: c.h:655

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 225 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndErrorCleanup(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

257 {
259 
260  /* Create a per-walsender data structure in shared memory */
262 
263  /*
264  * We don't currently need any ResourceOwner in a walsender process, but
265  * if we did, we could call CreateAuxProcessResourceOwner here.
266  */
267 
268  /*
269  * Let postmaster know that we're a WAL sender. Once we've declared us as
270  * a WAL sender process, postmaster will let us outlive the bgwriter and
271  * kill us last in the shutdown sequence, so we get a chance to stream all
272  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
273  * there's no going back, and we mustn't write any WAL records after this.
274  */
277 
278  /* Initialize empty timestamp buffer for lag tracking. */
280 
281  /* Make sure we can remember the current read position in XLOG. */
282  sendSeg = (WALOpenSegment *)
287 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2279
int wal_segment_size
Definition: xlog.c:112
static WALSegmentContext * sendCxt
Definition: walsender.c:132
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir)
Definition: xlogreader.c:207
bool RecoveryInProgress(void)
Definition: xlog.c:7917
static LagTracker * lag_tracker
Definition: walsender.c:215
static WALOpenSegment * sendSeg
Definition: walsender.c:131
MemoryContext TopMemoryContext
Definition: mcxt.c:44
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
bool am_cascading_walsender
Definition: walsender.c:115

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3009 of file walsender.c.

References die, InitializeTimeouts(), PostgresSigHupHandler(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3010 {
3011  /* Set up signal handlers */
3012  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
3013  * file */
3014  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3015  pqsignal(SIGTERM, die); /* request shutdown */
3016  pqsignal(SIGQUIT, quickdie); /* hard crash time */
3017  InitializeTimeouts(); /* establishes SIGALRM handler */
3020  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3021  * shutdown */
3022 
3023  /* Reset some signals that are accepted by postmaster but not here */
3025 }
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGQUIT
Definition: win32_port.h:155
#define SIGUSR1
Definition: win32_port.h:166
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2997
#define SIGCHLD
Definition: win32_port.h:164
#define SIGPIPE
Definition: win32_port.h:159
#define SIGUSR2
Definition: win32_port.h:167
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2792
#define SIGHUP
Definition: win32_port.h:154
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2832
#define SIG_IGN
Definition: win32_port.h:151
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:149
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:261
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2694
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1267 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1268 {
1269  static TimestampTz sendTime = 0;
1271 
1272  /*
1273  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1274  * avoid flooding the lag tracker when we commit frequently.
1275  */
1276 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1277  if (!TimestampDifferenceExceeds(sendTime, now,
1279  return;
1280 
1281  LagTrackerWrite(lsn, now);
1282  sendTime = now;
1283 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3453
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1293 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1294 {
1295  int wakeEvents;
1296  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1297 
1298  /*
1299  * Fast path to avoid acquiring the spinlock in case we already know we
1300  * have enough WAL available. This is particularly interesting if we're
1301  * far behind.
1302  */
1303  if (RecentFlushPtr != InvalidXLogRecPtr &&
1304  loc <= RecentFlushPtr)
1305  return RecentFlushPtr;
1306 
1307  /* Get a more recent flush pointer. */
1308  if (!RecoveryInProgress())
1309  RecentFlushPtr = GetFlushRecPtr();
1310  else
1311  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1312 
1313  for (;;)
1314  {
1315  long sleeptime;
1316 
1317  /* Clear any already-pending wakeups */
1319 
1321 
1322  /* Process any requests or signals received recently */
1323  if (ConfigReloadPending)
1324  {
1325  ConfigReloadPending = false;
1328  }
1329 
1330  /* Check for input from the client */
1332 
1333  /*
1334  * If we're shutting down, trigger pending WAL to be written out,
1335  * otherwise we'd possibly end up waiting for WAL that never gets
1336  * written, because walwriter has shut down already.
1337  */
1338  if (got_STOPPING)
1340 
1341  /* Update our idea of the currently flushed position. */
1342  if (!RecoveryInProgress())
1343  RecentFlushPtr = GetFlushRecPtr();
1344  else
1345  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1346 
1347  /*
1348  * If postmaster asked us to stop, don't wait anymore.
1349  *
1350  * It's important to do this check after the recomputation of
1351  * RecentFlushPtr, so we can send all remaining data before shutting
1352  * down.
1353  */
1354  if (got_STOPPING)
1355  break;
1356 
1357  /*
1358  * We only send regular messages to the client for full decoded
1359  * transactions, but a synchronous replication and walsender shutdown
1360  * possibly are waiting for a later location. So we send pings
1361  * containing the flush location every now and then.
1362  */
1363  if (MyWalSnd->flush < sentPtr &&
1364  MyWalSnd->write < sentPtr &&
1366  {
1367  WalSndKeepalive(false);
1369  }
1370 
1371  /* check whether we're done */
1372  if (loc <= RecentFlushPtr)
1373  break;
1374 
1375  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1376  WalSndCaughtUp = true;
1377 
1378  /*
1379  * Try to flush any pending output to the client.
1380  */
1381  if (pq_flush_if_writable() != 0)
1382  WalSndShutdown();
1383 
1384  /*
1385  * If we have received CopyDone from the client, sent CopyDone
1386  * ourselves, and the output buffer is empty, it's time to exit
1387  * streaming, so fail the current WAL fetch request.
1388  */
1390  !pq_is_send_pending())
1391  break;
1392 
1393  /* die if timeout was reached */
1395 
1396  /* Send keepalive if the time has come */
1398 
1399  /*
1400  * Sleep until something happens or we time out. Also wait for the
1401  * socket becoming writable, if there's still pending output.
1402  * Otherwise we might sit on sendable output data while waiting for
1403  * new WAL to be generated. (But if we have nothing to send, we don't
1404  * want to wake on socket-writable.)
1405  */
1407 
1408  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1410 
1411  if (pq_is_send_pending())
1412  wakeEvents |= WL_SOCKET_WRITEABLE;
1413 
1414  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1415  MyProcPort->sock, sleeptime,
1417  }
1418 
1419  /* reactivate latch so WalSndLoop knows to continue */
1420  SetLatch(MyLatch);
1421  return RecentFlushPtr;
1422 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8249
int sleeptime
Definition: pg_standby.c:42
bool RecoveryInProgress(void)
Definition: xlog.c:7917
void SetLatch(Latch *latch)
Definition: latch.c:436
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3395
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11149
bool XLogBackgroundFlush(void)
Definition: xlog.c:2987
static bool streamingDoneSending
Definition: walsender.c:174
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void SyncRepInitConfig(void)
Definition: syncrep.c:383
static bool WalSndCaughtUp
Definition: walsender.c:178
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3414
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2125
static void WalSndShutdown(void)
Definition: walsender.c:225
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2075
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool streamingDoneReceiving
Definition: walsender.c:175
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:166
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1600
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3128 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3129 {
3130  for (;;)
3131  {
3132  int i;
3133  bool all_stopped = true;
3134 
3135  for (i = 0; i < max_wal_senders; i++)
3136  {
3137  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3138 
3139  SpinLockAcquire(&walsnd->mutex);
3140 
3141  if (walsnd->pid == 0)
3142  {
3143  SpinLockRelease(&walsnd->mutex);
3144  continue;
3145  }
3146 
3147  if (walsnd->state != WALSNDSTATE_STOPPING)
3148  {
3149  all_stopped = false;
3150  SpinLockRelease(&walsnd->mutex);
3151  break;
3152  }
3153  SpinLockRelease(&walsnd->mutex);
3154  }
3155 
3156  /* safe to leave if confirmation is done for all WAL senders */
3157  if (all_stopped)
3158  return;
3159 
3160  pg_usleep(10000L); /* wait for 10 msec */
3161  }
3162 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3073 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3074 {
3075  int i;
3076 
3077  for (i = 0; i < max_wal_senders; i++)
3078  {
3079  Latch *latch;
3080  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3081 
3082  /*
3083  * Get latch pointer with spinlock held, for the unlikely case that
3084  * pointer reads aren't atomic (as they're 8 bytes).
3085  */
3086  SpinLockAcquire(&walsnd->mutex);
3087  latch = walsnd->latch;
3088  SpinLockRelease(&walsnd->mutex);
3089 
3090  if (latch != NULL)
3091  SetLatch(latch);
3092  }
3093 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:436
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1178 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1180 {
1181  TimestampTz now;
1182 
1183  /* output previously gathered data in a CopyData packet */
1184  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1185 
1186  /*
1187  * Fill the send timestamp last, so that it is taken as late as possible.
1188  * This is somewhat ugly, but the protocol is set as it's already used for
1189  * several releases by streaming physical replication.
1190  */
1192  now = GetCurrentTimestamp();
1193  pq_sendint64(&tmpbuf, now);
1194  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1195  tmpbuf.data, sizeof(int64));
1196 
1198 
1199  /* Try to flush pending output to the client */
1200  if (pq_flush_if_writable() != 0)
1201  WalSndShutdown();
1202 
1203  /* Try taking fast path unless we get too close to walsender timeout. */
1205  wal_sender_timeout / 2) &&
1206  !pq_is_send_pending())
1207  {
1208  return;
1209  }
1210 
1211  /* If we have pending write here, go to slow path */
1212  for (;;)
1213  {
1214  int wakeEvents;
1215  long sleeptime;
1216 
1217  /* Check for input from the client */
1219 
1220  /* die if timeout was reached */
1222 
1223  /* Send keepalive if the time has come */
1225 
1226  if (!pq_is_send_pending())
1227  break;
1228 
1230 
1231  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1233 
1234  /* Sleep until something happens or we time out */
1235  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1236  MyProcPort->sock, sleeptime,
1238 
1239  /* Clear any already-pending wakeups */
1241 
1243 
1244  /* Process any requests or signals received recently */
1245  if (ConfigReloadPending)
1246  {
1247  ConfigReloadPending = false;
1250  }
1251 
1252  /* Try to flush pending output to the client */
1253  if (pq_flush_if_writable() != 0)
1254  WalSndShutdown();
1255  }
1256 
1257  /* reactivate latch so WalSndLoop knows to continue */
1258  SetLatch(MyLatch);
1259 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
int wal_sender_timeout
Definition: walsender.c:122
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:42
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void SetLatch(Latch *latch)
Definition: latch.c:436
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
void SyncRepInitConfig(void)
Definition: syncrep.c:383
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3414
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2125
static void WalSndShutdown(void)
Definition: walsender.c:225
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2075
static StringInfoData tmpbuf
Definition: walsender.c:154
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:71
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:163
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1600
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ XLogRead()

static void XLogRead ( WALSegmentContext segcxt,
char *  buf,
XLogRecPtr  startptr,
Size  count 
)
static

Definition at line 2365 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), buf, CheckXLogRemoved(), close, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, MAXPGPATH, WalSnd::mutex, MyWalSnd, WalSnd::needreload, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, startptr, ThisTimeLineID, WAIT_EVENT_WAL_READ, WALOpenSegment::ws_file, WALOpenSegment::ws_off, WALOpenSegment::ws_segno, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegmentOffset.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2366 {
2367  char *p;
2368  XLogRecPtr recptr;
2369  Size nbytes;
2370  XLogSegNo segno;
2371 
2372 retry:
2373  p = buf;
2374  recptr = startptr;
2375  nbytes = count;
2376 
2377  while (nbytes > 0)
2378  {
2379  uint32 startoff;
2380  int segbytes;
2381  int readbytes;
2382 
2383  startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
2384 
2385  if (sendSeg->ws_file < 0 ||
2386  !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
2387  {
2388  char path[MAXPGPATH];
2389 
2390  /* Switch to another logfile segment */
2391  if (sendSeg->ws_file >= 0)
2392  close(sendSeg->ws_file);
2393 
2394  XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
2395 
2396  /*-------
2397  * When reading from a historic timeline, and there is a timeline
2398  * switch within this segment, read from the WAL segment belonging
2399  * to the new timeline.
2400  *
2401  * For example, imagine that this server is currently on timeline
2402  * 5, and we're streaming timeline 4. The switch from timeline 4
2403  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2404  *
2405  * ...
2406  * 000000040000000000000012
2407  * 000000040000000000000013
2408  * 000000050000000000000013
2409  * 000000050000000000000014
2410  * ...
2411  *
2412  * In this situation, when requested to send the WAL from
2413  * segment 0x13, on timeline 4, we read the WAL from file
2414  * 000000050000000000000013. Archive recovery prefers files from
2415  * newer timelines, so if the segment was restored from the
2416  * archive on this server, the file belonging to the old timeline,
2417  * 000000040000000000000013, might not exist. Their contents are
2418  * equal up to the switchpoint, because at a timeline switch, the
2419  * used portion of the old segment is copied to the new file.
2420  *-------
2421  */
2424  {
2425  XLogSegNo endSegNo;
2426 
2427  XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
2428  if (sendSeg->ws_segno == endSegNo)
2430  }
2431 
2433 
2434  sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2435  if (sendSeg->ws_file < 0)
2436  {
2437  /*
2438  * If the file is not found, assume it's because the standby
2439  * asked for a too old WAL segment that has already been
2440  * removed or recycled.
2441  */
2442  if (errno == ENOENT)
2443  ereport(ERROR,
2445  errmsg("requested WAL segment %s has already been removed",
2447  else
2448  ereport(ERROR,
2450  errmsg("could not open file \"%s\": %m",
2451  path)));
2452  }
2453  sendSeg->ws_off = 0;
2454  }
2455 
2456  /* Need to seek in the file? */
2457  if (sendSeg->ws_off != startoff)
2458  {
2459  if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
2460  ereport(ERROR,
2462  errmsg("could not seek in log segment %s to offset %u: %m",
2464  startoff)));
2465  sendSeg->ws_off = startoff;
2466  }
2467 
2468  /* How many bytes are within this segment? */
2469  if (nbytes > (segcxt->ws_segsize - startoff))
2470  segbytes = segcxt->ws_segsize - startoff;
2471  else
2472  segbytes = nbytes;
2473 
2475  readbytes = read(sendSeg->ws_file, p, segbytes);
2477  if (readbytes < 0)
2478  {
2479  ereport(ERROR,
2481  errmsg("could not read from log segment %s, offset %u, length %zu: %m",
2483  sendSeg->ws_off, (Size) segbytes)));
2484  }
2485  else if (readbytes == 0)
2486  {
2487  ereport(ERROR,
2489  errmsg("could not read from log segment %s, offset %u: read %d of %zu",
2491  sendSeg->ws_off, readbytes, (Size) segbytes)));
2492  }
2493 
2494  /* Update state for read */
2495  recptr += readbytes;
2496 
2497  sendSeg->ws_off += readbytes;
2498  nbytes -= readbytes;
2499  p += readbytes;
2500  }
2501 
2502  /*
2503  * After reading into the buffer, check that what we read was valid. We do
2504  * this after reading, because even though the segment was present when we
2505  * opened it, it might get recycled or removed while we read it. The
2506  * read() succeeds in that case, but the data we tried to read might
2507  * already have been overwritten with new WAL records.
2508  */
2509  XLByteToSeg(startptr, segno, segcxt->ws_segsize);
2511 
2512  /*
2513  * During recovery, the currently-open WAL file might be replaced with the
2514  * file of the same name retrieved from archive. So we always need to
2515  * check what we read was valid after reading into the buffer. If it's
2516  * invalid, we try to open and read the file again.
2517  */
2519  {
2520  WalSnd *walsnd = MyWalSnd;
2521  bool reload;
2522 
2523  SpinLockAcquire(&walsnd->mutex);
2524  reload = walsnd->needreload;
2525  walsnd->needreload = false;
2526  SpinLockRelease(&walsnd->mutex);
2527 
2528  if (reload && sendSeg->ws_file >= 0)
2529  {
2530  close(sendSeg->ws_file);
2531  sendSeg->ws_file = -1;
2532 
2533  goto retry;
2534  }
2535  }
2536 }
int errcode(int sqlerrcode)
Definition: elog.c:570
#define PG_BINARY
Definition: c.h:1191
slock_t mutex
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3846
#define MAXPGPATH
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10149
static char * buf
Definition: pg_test_fsync.c:68
uint64 XLogSegNo
Definition: xlogdefs.h:41
XLogSegNo ws_segno
Definition: xlogreader.h:38
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
static WALOpenSegment * sendSeg
Definition: walsender.c:131
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
#define SpinLockRelease(lock)
Definition: spin.h:64
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeLineID ThisTimeLineID
Definition: xlog.c:187
static TimeLineID sendTimeLine
Definition: walsender.c:140
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool needreload
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:141
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:143
TimeLineID ws_tli
Definition: xlogreader.h:40
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:944
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define close(a)
Definition: win32.h:12
uint32 ws_off
Definition: xlogreader.h:39
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:142
bool am_cascading_walsender
Definition: walsender.c:115
static XLogRecPtr startptr
Definition: basebackup.c:118
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2812 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2813 {
2814  XLogRecord *record;
2815  char *errm;
2816  XLogRecPtr flushPtr;
2817 
2818  /*
2819  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2820  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2821  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2822  * didn't wait - i.e. when we're shutting down.
2823  */
2824  WalSndCaughtUp = false;
2825 
2828 
2829  /* xlog record was invalid */
2830  if (errm != NULL)
2831  elog(ERROR, "%s", errm);
2832 
2833  /*
2834  * We'll use the current flush point to determine whether we've caught up.
2835  */
2836  flushPtr = GetFlushRecPtr();
2837 
2838  if (record != NULL)
2839  {
2840  /*
2841  * Note the lack of any call to LagTrackerWrite() which is handled by
2842  * WalSndUpdateProgress which is called by output plugin through
2843  * logical decoding write api.
2844  */
2846 
2848  }
2849 
2850  /* Set flag if we're caught up. */
2851  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2852  WalSndCaughtUp = true;
2853 
2854  /*
2855  * If we're caught up and have been requested to stop, have WalSndLoop()
2856  * terminate the connection in an orderly manner, after writing out all
2857  * the pending data.
2858  */
2860  got_SIGUSR2 = true;
2861 
2862  /* Update shared memory status */
2863  {
2864  WalSnd *walsnd = MyWalSnd;
2865 
2866  SpinLockAcquire(&walsnd->mutex);
2867  walsnd->sentPtr = sentPtr;
2868  SpinLockRelease(&walsnd->mutex);
2869  }
2870 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8249
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:237
XLogRecPtr EndRecPtr
Definition: xlogreader.h:133
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:97
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:192
static XLogRecPtr logical_startptr
Definition: walsender.c:193
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:181
static bool WalSndCaughtUp
Definition: walsender.c:178
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogReaderState * reader
Definition: logical.h:42
#define elog(elevel,...)
Definition: elog.h:226

◆ XLogSendPhysical()