PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   15
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static int WalSndSegmentOpen (XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static void UpdateSpillStats (LogicalDecodingContext *ctx)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static WALOpenSegmentsendSeg = NULL
 
static WALSegmentContextsendCxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 203 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 105 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   15

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 221 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 875 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

876 {
877  const char *snapshot_name = NULL;
878  char xloc[MAXFNAMELEN];
879  char *slot_name;
880  bool reserve_wal = false;
881  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
883  TupOutputState *tstate;
884  TupleDesc tupdesc;
885  Datum values[4];
886  bool nulls[4];
887 
889 
890  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
891 
892  /* setup state for XLogRead */
893  sendTimeLineIsHistoric = false;
895 
896  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
897  {
898  ReplicationSlotCreate(cmd->slotname, false,
900  }
901  else
902  {
904 
905  /*
906  * Initially create persistent slot as ephemeral - that allows us to
907  * nicely handle errors during initialization because it'll get
908  * dropped if this transaction fails. We'll make it persistent at the
909  * end. Temporary slots can be created as temporary from beginning as
910  * they get dropped on error as well.
911  */
912  ReplicationSlotCreate(cmd->slotname, true,
914  }
915 
916  if (cmd->kind == REPLICATION_KIND_LOGICAL)
917  {
919  bool need_full_snapshot = false;
920 
921  /*
922  * Do options check early so that we can bail before calling the
923  * DecodingContextFindStartpoint which can take long time.
924  */
925  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
926  {
927  if (IsTransactionBlock())
928  ereport(ERROR,
929  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
930  (errmsg("%s must not be called inside a transaction",
931  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
932 
933  need_full_snapshot = true;
934  }
935  else if (snapshot_action == CRS_USE_SNAPSHOT)
936  {
937  if (!IsTransactionBlock())
938  ereport(ERROR,
939  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
940  (errmsg("%s must be called inside a transaction",
941  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
942 
944  ereport(ERROR,
945  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
946  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
947  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
948 
949  if (FirstSnapshotSet)
950  ereport(ERROR,
951  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
952  (errmsg("%s must be called before any query",
953  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
954 
955  if (IsSubTransaction())
956  ereport(ERROR,
957  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
958  (errmsg("%s must not be called in a subtransaction",
959  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
960 
961  need_full_snapshot = true;
962  }
963 
964  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
969 
970  /*
971  * Signal that we don't need the timeout mechanism. We're just
972  * creating the replication slot and don't yet accept feedback
973  * messages or send keepalives. As we possibly need to wait for
974  * further WAL the walsender would otherwise possibly be killed too
975  * soon.
976  */
978 
979  /* build initial snapshot, might take a while */
981 
982  /*
983  * Export or use the snapshot if we've been asked to do so.
984  *
985  * NB. We will convert the snapbuild.c kind of snapshot to normal
986  * snapshot when doing this.
987  */
988  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
989  {
990  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
991  }
992  else if (snapshot_action == CRS_USE_SNAPSHOT)
993  {
994  Snapshot snap;
995 
998  }
999 
1000  /* don't need the decoding context anymore */
1001  FreeDecodingContext(ctx);
1002 
1003  if (!cmd->temporary)
1005  }
1006  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1007  {
1009 
1011 
1012  /* Write this slot to disk if it's a permanent one. */
1013  if (!cmd->temporary)
1015  }
1016 
1017  snprintf(xloc, sizeof(xloc), "%X/%X",
1020 
1022  MemSet(nulls, false, sizeof(nulls));
1023 
1024  /*----------
1025  * Need a tuple descriptor representing four columns:
1026  * - first field: the slot name
1027  * - second field: LSN at which we became consistent
1028  * - third field: exported snapshot's name
1029  * - fourth field: output plugin
1030  *----------
1031  */
1032  tupdesc = CreateTemplateTupleDesc(4);
1033  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1034  TEXTOID, -1, 0);
1035  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1036  TEXTOID, -1, 0);
1037  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1038  TEXTOID, -1, 0);
1039  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1040  TEXTOID, -1, 0);
1041 
1042  /* prepare for projection of tuples */
1043  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1044 
1045  /* slot_name */
1046  slot_name = NameStr(MyReplicationSlot->data.name);
1047  values[0] = CStringGetTextDatum(slot_name);
1048 
1049  /* consistent wal location */
1050  values[1] = CStringGetTextDatum(xloc);
1051 
1052  /* snapshot name, or NULL if none */
1053  if (snapshot_name != NULL)
1054  values[2] = CStringGetTextDatum(snapshot_name);
1055  else
1056  nulls[2] = true;
1057 
1058  /* plugin, or NULL if none */
1059  if (cmd->plugin != NULL)
1060  values[3] = CStringGetTextDatum(cmd->plugin);
1061  else
1062  nulls[3] = true;
1063 
1064  /* send it to dest */
1065  do_tup_output(tstate, values, nulls);
1066  end_tup_output(tstate);
1067 
1069 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:822
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:38
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2225
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define MemSet(start, val, len)
Definition: c.h:962
void ReplicationSlotSave(void)
Definition: slot.c:645
ReplicationSlotPersistentData data
Definition: slot.h:132
XLogRecPtr confirmed_flush
Definition: slot.h:80
ReplicationKind kind
Definition: replnodes.h:56
bool IsTransactionBlock(void)
Definition: xact.c:4635
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:631
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:462
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:205
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
void ReplicationSlotPersist(void)
Definition: slot.c:680
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1202
unsigned int uint32
Definition: c.h:359
void ReplicationSlotRelease(void)
Definition: slot.c:424
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1175
#define MAXFNAMELEN
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:227
uintptr_t Datum
Definition: postgres.h:367
TimeLineID ThisTimeLineID
Definition: xlog.c:187
struct SnapBuild * snapshot_builder
Definition: logical.h:43
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:140
#define Assert(condition)
Definition: c.h:739
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:508
int XactIsoLevel
Definition: xact.c:74
bool IsSubTransaction(void)
Definition: xact.c:4708
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:539
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
CRSSnapshotAction
Definition: walsender.h:20
#define NameStr(name)
Definition: c.h:616
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:766
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:163
static bool sendTimeLineIsHistoric
Definition: walsender.c:142
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1292

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1075 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1076 {
1077  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1078  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1079 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1461 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, generate_unaccent_rules::dest, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_IDLE, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1462 {
1463  int parse_rc;
1464  Node *cmd_node;
1465  MemoryContext cmd_context;
1466  MemoryContext old_context;
1467 
1468  /*
1469  * If WAL sender has been told that shutdown is getting close, switch its
1470  * status accordingly to handle the next replication commands correctly.
1471  */
1472  if (got_STOPPING)
1474 
1475  /*
1476  * Throw error if in stopping mode. We need prevent commands that could
1477  * generate WAL while the shutdown checkpoint is being written. To be
1478  * safe, we just prohibit all new commands.
1479  */
1481  ereport(ERROR,
1482  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1483 
1484  /*
1485  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1486  * command arrives. Clean up the old stuff if there's anything.
1487  */
1489 
1491 
1493  "Replication command context",
1495  old_context = MemoryContextSwitchTo(cmd_context);
1496 
1497  replication_scanner_init(cmd_string);
1498  parse_rc = replication_yyparse();
1499  if (parse_rc != 0)
1500  ereport(ERROR,
1501  (errcode(ERRCODE_SYNTAX_ERROR),
1502  (errmsg_internal("replication command parser returned %d",
1503  parse_rc))));
1504 
1505  cmd_node = replication_parse_result;
1506 
1507  /*
1508  * Log replication command if log_replication_commands is enabled. Even
1509  * when it's disabled, log the command with DEBUG1 level for backward
1510  * compatibility. Note that SQL commands are not logged here, and will be
1511  * logged later if log_statement is enabled.
1512  */
1513  if (cmd_node->type != T_SQLCmd)
1515  (errmsg("received replication command: %s", cmd_string)));
1516 
1517  /*
1518  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1519  * called outside of transaction the snapshot should be cleared here.
1520  */
1521  if (!IsTransactionBlock())
1523 
1524  /*
1525  * For aborted transactions, don't allow anything except pure SQL, the
1526  * exec_simple_query() will handle it correctly.
1527  */
1528  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1529  ereport(ERROR,
1530  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1531  errmsg("current transaction is aborted, "
1532  "commands ignored until end of transaction block")));
1533 
1535 
1536  /*
1537  * Allocate buffers that will be used for each outgoing and incoming
1538  * message. We do this just once per command to reduce palloc overhead.
1539  */
1543 
1544  /* Report to pgstat that this process is running */
1546 
1547  switch (cmd_node->type)
1548  {
1549  case T_IdentifySystemCmd:
1550  IdentifySystem();
1551  break;
1552 
1553  case T_BaseBackupCmd:
1554  PreventInTransactionBlock(true, "BASE_BACKUP");
1555  SendBaseBackup((BaseBackupCmd *) cmd_node);
1556  break;
1557 
1560  break;
1561 
1564  break;
1565 
1566  case T_StartReplicationCmd:
1567  {
1568  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1569 
1570  PreventInTransactionBlock(true, "START_REPLICATION");
1571 
1572  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1573  StartReplication(cmd);
1574  else
1576  break;
1577  }
1578 
1579  case T_TimeLineHistoryCmd:
1580  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1582  break;
1583 
1584  case T_VariableShowStmt:
1585  {
1587  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1588 
1589  /* syscache access needs a transaction environment */
1591  GetPGVariable(n->name, dest);
1593  }
1594  break;
1595 
1596  case T_SQLCmd:
1597  if (MyDatabaseId == InvalidOid)
1598  ereport(ERROR,
1599  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1600 
1601  /* Report to pgstat that this process is now idle */
1603 
1604  /* Tell the caller that this wasn't a WalSender command. */
1605  return false;
1606 
1607  default:
1608  elog(ERROR, "unrecognized replication command node tag: %u",
1609  cmd_node->type);
1610  }
1611 
1612  /* done */
1613  MemoryContextSwitchTo(old_context);
1614  MemoryContextDelete(cmd_context);
1615 
1616  /* Send CommandComplete message */
1617  EndCommand("SELECT", DestRemote);
1618 
1619  /* Report to pgstat that this process is now idle */
1621 
1622  return true;
1623 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3119
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:435
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1075
void CommitTransactionCommand(void)
Definition: xact.c:2898
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:375
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:525
static StringInfoData output_message
Definition: walsender.c:152
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8801
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4635
ReplicationKind kind
Definition: replnodes.h:82
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:759
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
WalSndState state
NodeTag type
Definition: nodes.h:527
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:541
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3331
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define ereport(elevel, rest)
Definition: elog.h:141
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static StringInfoData reply_message
Definition: walsender.c:153
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1086
Oid MyDatabaseId
Definition: globals.c:85
WalSnd * MyWalSnd
Definition: walsender.c:111
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
void WalSndSetState(WalSndState state)
Definition: walsender.c:3126
void StartTransactionCommand(void)
Definition: xact.c:2797
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:875
static StringInfoData tmpbuf
Definition: walsender.c:154
bool log_replication_commands
Definition: walsender.c:124
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void IdentifySystem(void)
Definition: walsender.c:346
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:692
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2882 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2883 {
2884  XLogRecPtr replayPtr;
2885  TimeLineID replayTLI;
2886  XLogRecPtr receivePtr;
2888  XLogRecPtr result;
2889 
2890  /*
2891  * We can safely send what's already been replayed. Also, if walreceiver
2892  * is streaming WAL from the same timeline, we can send anything that it
2893  * has streamed, but hasn't been replayed yet.
2894  */
2895 
2896  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2897  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2898 
2899  ThisTimeLineID = replayTLI;
2900 
2901  result = replayPtr;
2902  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2903  result = receivePtr;
2904 
2905  return result;
2906 }
uint32 TimeLineID
Definition: xlogdefs.h:52
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11172
static TimeLineID receiveTLI
Definition: xlog.c:209
TimeLineID ThisTimeLineID
Definition: xlog.c:187
uint64 XLogRecPtr
Definition: xlogdefs.h:21

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 2935 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

2936 {
2938 
2939  /*
2940  * If replication has not yet started, die like with SIGTERM. If
2941  * replication is active, only set a flag and wake up the main loop. It
2942  * will send any outstanding WAL, wait for it to be replicated to the
2943  * standby, and then exit gracefully.
2944  */
2945  if (!replication_active)
2946  kill(MyProcPid, SIGTERM);
2947  else
2948  got_STOPPING = true;
2949 }
int MyProcPid
Definition: globals.c:40
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
#define kill(pid, sig)
Definition: win32_port.h:426
bool am_walsender
Definition: walsender.c:114
static volatile sig_atomic_t replication_active
Definition: walsender.c:190
#define Assert(condition)
Definition: c.h:739

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 346 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

347 {
348  char sysid[32];
349  char xloc[MAXFNAMELEN];
350  XLogRecPtr logptr;
351  char *dbname = NULL;
353  TupOutputState *tstate;
354  TupleDesc tupdesc;
355  Datum values[4];
356  bool nulls[4];
357 
358  /*
359  * Reply with a result set with one row, four columns. First col is system
360  * ID, second is timeline ID, third is current xlog location and the
361  * fourth contains the database name if we are connected to one.
362  */
363 
364  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
366 
369  {
370  /* this also updates ThisTimeLineID */
371  logptr = GetStandbyFlushRecPtr();
372  }
373  else
374  logptr = GetFlushRecPtr();
375 
376  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
377 
378  if (MyDatabaseId != InvalidOid)
379  {
381 
382  /* syscache access needs a transaction env. */
384  /* make dbname live outside TX context */
388  /* CommitTransactionCommand switches to TopMemoryContext */
390  }
391 
393  MemSet(nulls, false, sizeof(nulls));
394 
395  /* need a tuple descriptor representing four columns */
396  tupdesc = CreateTemplateTupleDesc(4);
397  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
398  TEXTOID, -1, 0);
399  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
400  INT4OID, -1, 0);
401  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
402  TEXTOID, -1, 0);
403  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
404  TEXTOID, -1, 0);
405 
406  /* prepare for projection of tuples */
407  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
408 
409  /* column 1: system identifier */
410  values[0] = CStringGetTextDatum(sysid);
411 
412  /* column 2: timeline */
413  values[1] = Int32GetDatum(ThisTimeLineID);
414 
415  /* column 3: wal location */
416  values[2] = CStringGetTextDatum(xloc);
417 
418  /* column 4: database name, or NULL if none */
419  if (dbname)
420  values[3] = CStringGetTextDatum(dbname);
421  else
422  nulls[3] = true;
423 
424  /* send it to dest */
425  do_tup_output(tstate, values, nulls);
426 
427  end_tup_output(tstate);
428 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void CommitTransactionCommand(void)
Definition: xact.c:2898
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:962
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8267
bool RecoveryInProgress(void)
Definition: xlog.c:7935
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
unsigned int uint32
Definition: c.h:359
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:187
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2797
char * dbname
Definition: streamutil.c:50
static Datum values[MAXATTR]
Definition: bootstrap.c:167
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4799
#define Int32GetDatum(X)
Definition: postgres.h:479
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2882
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:402
bool am_cascading_walsender
Definition: walsender.c:115

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2308 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, WalSnd::spillBytes, WalSnd::spillCount, WalSnd::spillTxns, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2309 {
2310  int i;
2311 
2312  /*
2313  * WalSndCtl should be set up already (we inherit this by fork() or
2314  * EXEC_BACKEND mechanism from the postmaster).
2315  */
2316  Assert(WalSndCtl != NULL);
2317  Assert(MyWalSnd == NULL);
2318 
2319  /*
2320  * Find a free walsender slot and reserve it. This must not fail due to
2321  * the prior check for free WAL senders in InitProcess().
2322  */
2323  for (i = 0; i < max_wal_senders; i++)
2324  {
2325  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2326 
2327  SpinLockAcquire(&walsnd->mutex);
2328 
2329  if (walsnd->pid != 0)
2330  {
2331  SpinLockRelease(&walsnd->mutex);
2332  continue;
2333  }
2334  else
2335  {
2336  /*
2337  * Found a free slot. Reserve it for us.
2338  */
2339  walsnd->pid = MyProcPid;
2340  walsnd->sentPtr = InvalidXLogRecPtr;
2341  walsnd->write = InvalidXLogRecPtr;
2342  walsnd->flush = InvalidXLogRecPtr;
2343  walsnd->apply = InvalidXLogRecPtr;
2344  walsnd->writeLag = -1;
2345  walsnd->flushLag = -1;
2346  walsnd->applyLag = -1;
2347  walsnd->state = WALSNDSTATE_STARTUP;
2348  walsnd->latch = &MyProc->procLatch;
2349  walsnd->replyTime = 0;
2350  walsnd->spillTxns = 0;
2351  walsnd->spillCount = 0;
2352  walsnd->spillBytes = 0;
2353  SpinLockRelease(&walsnd->mutex);
2354  /* don't need the lock anymore */
2355  MyWalSnd = (WalSnd *) walsnd;
2356 
2357  break;
2358  }
2359  }
2360 
2361  Assert(MyWalSnd != NULL);
2362 
2363  /* Arrange to clean up at walsender exit */
2365 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:40
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2369
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:104
int64 spillCount
int64 spillBytes
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
Latch * latch
int64 spillTxns
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:739
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3489 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3490 {
3491  TimestampTz time = 0;
3492 
3493  /* Read all unread samples up to this LSN or end of buffer. */
3494  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3495  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3496  {
3497  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3498  lag_tracker->last_read[head] =
3500  lag_tracker->read_heads[head] =
3502  }
3503 
3504  /*
3505  * If the lag tracker is empty, that means the standby has processed
3506  * everything we've ever sent so we should now clear 'last_read'. If we
3507  * didn't do that, we'd risk using a stale and irrelevant sample for
3508  * interpolation at the beginning of the next burst of WAL after a period
3509  * of idleness.
3510  */
3512  lag_tracker->last_read[head].time = 0;
3513 
3514  if (time > now)
3515  {
3516  /* If the clock somehow went backwards, treat as not found. */
3517  return -1;
3518  }
3519  else if (time == 0)
3520  {
3521  /*
3522  * We didn't cross a time. If there is a future sample that we
3523  * haven't reached yet, and we've already reached at least one sample,
3524  * let's interpolate the local flushed time. This is mainly useful
3525  * for reporting a completely stuck apply position as having
3526  * increasing lag, since otherwise we'd have to wait for it to
3527  * eventually start moving again and cross one of our samples before
3528  * we can show the lag increasing.
3529  */
3531  {
3532  /* There are no future samples, so we can't interpolate. */
3533  return -1;
3534  }
3535  else if (lag_tracker->last_read[head].time != 0)
3536  {
3537  /* We can interpolate between last_read and the next sample. */
3538  double fraction;
3539  WalTimeSample prev = lag_tracker->last_read[head];
3541 
3542  if (lsn < prev.lsn)
3543  {
3544  /*
3545  * Reported LSNs shouldn't normally go backwards, but it's
3546  * possible when there is a timeline change. Treat as not
3547  * found.
3548  */
3549  return -1;
3550  }
3551 
3552  Assert(prev.lsn < next.lsn);
3553 
3554  if (prev.time > next.time)
3555  {
3556  /* If the clock somehow went backwards, treat as not found. */
3557  return -1;
3558  }
3559 
3560  /* See how far we are between the previous and next samples. */
3561  fraction =
3562  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3563 
3564  /* Scale the local flush time proportionally. */
3565  time = (TimestampTz)
3566  ((double) prev.time + (next.time - prev.time) * fraction);
3567  }
3568  else
3569  {
3570  /*
3571  * We have only a future sample, implying that we were entirely
3572  * caught up but and now there is a new burst of WAL and the
3573  * standby hasn't processed the first sample yet. Until the
3574  * standby reaches the future sample the best we can do is report
3575  * the hypothetical lag if that sample were to be replayed now.
3576  */
3577  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3578  }
3579  }
3580 
3581  /* Return the elapsed time since local flush time in microseconds. */
3582  Assert(time != 0);
3583  return now - time;
3584 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:209
static int32 next
Definition: blutils.c:213
int write_head
Definition: walsender.c:210
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:212
TimestampTz time
Definition: walsender.c:199
static LagTracker * lag_tracker
Definition: walsender.c:215
XLogRecPtr lsn
Definition: walsender.c:198
#define Assert(condition)
Definition: c.h:739
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:211
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:203
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3424 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3425 {
3426  bool buffer_full;
3427  int new_write_head;
3428  int i;
3429 
3430  if (!am_walsender)
3431  return;
3432 
3433  /*
3434  * If the lsn hasn't advanced since last time, then do nothing. This way
3435  * we only record a new sample when new WAL has been written.
3436  */
3437  if (lag_tracker->last_lsn == lsn)
3438  return;
3439  lag_tracker->last_lsn = lsn;
3440 
3441  /*
3442  * If advancing the write head of the circular buffer would crash into any
3443  * of the read heads, then the buffer is full. In other words, the
3444  * slowest reader (presumably apply) is the one that controls the release
3445  * of space.
3446  */
3447  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3448  buffer_full = false;
3449  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3450  {
3451  if (new_write_head == lag_tracker->read_heads[i])
3452  buffer_full = true;
3453  }
3454 
3455  /*
3456  * If the buffer is full, for now we just rewind by one slot and overwrite
3457  * the last sample, as a simple (if somewhat uneven) way to lower the
3458  * sampling rate. There may be better adaptive compaction algorithms.
3459  */
3460  if (buffer_full)
3461  {
3462  new_write_head = lag_tracker->write_head;
3463  if (lag_tracker->write_head > 0)
3465  else
3467  }
3468 
3469  /* Store a sample at the current write head position. */
3471  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3472  lag_tracker->write_head = new_write_head;
3473 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:209
int write_head
Definition: walsender.c:210
TimestampTz time
Definition: walsender.c:199
static LagTracker * lag_tracker
Definition: walsender.c:215
bool am_walsender
Definition: walsender.c:114
XLogRecPtr lsn
Definition: walsender.c:198
XLogRecPtr last_lsn
Definition: walsender.c:208
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:211
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:203
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 766 of file walsender.c.

References CheckXLogRemoved(), XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WALRead(), WALReadRaiseError(), WalSndSegmentOpen(), WalSndWaitForWal(), WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

768 {
769  XLogRecPtr flushptr;
770  int count;
771  WALReadError errinfo;
772  XLogSegNo segno;
773 
774  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
776  sendTimeLine = state->currTLI;
778  sendTimeLineNextTLI = state->nextTLI;
779 
780  /* make sure we have enough WAL available */
781  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
782 
783  /* fail if not (implies we are going to shut down) */
784  if (flushptr < targetPagePtr + reqLen)
785  return -1;
786 
787  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
788  count = XLOG_BLCKSZ; /* more than one block available */
789  else
790  count = flushptr - targetPagePtr; /* part of the page available */
791 
792  /* now actually read the data, we know it's there */
793  if (!WALRead(cur_page,
794  targetPagePtr,
795  XLOG_BLCKSZ,
796  sendSeg->ws_tli, /* Pass the current TLI because only
797  * WalSndSegmentOpen controls whether new
798  * TLI is needed. */
799  sendSeg,
800  sendCxt,
802  &errinfo))
803  WALReadRaiseError(&errinfo);
804 
805  /*
806  * After reading into the buffer, check that what we read was valid. We do
807  * this after reading, because even though the segment was present when we
808  * opened it, it might get recycled or removed while we read it. The
809  * read() succeeds in that case, but the data we tried to read might
810  * already have been overwritten with new WAL records.
811  */
812  XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
814 
815  return count;
816 }
static WALSegmentContext * sendCxt
Definition: walsender.c:132
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:944
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:681
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3860
uint64 XLogSegNo
Definition: xlogdefs.h:41
bool WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, WALSegmentContext *segcxt, WALSegmentOpen openSegment, WALReadError *errinfo)
Definition: xlogreader.c:1033
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:192
static WALOpenSegment * sendSeg
Definition: walsender.c:131
static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p)
Definition: walsender.c:2387
TimeLineID nextTLI
Definition: xlogreader.h:198
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1323
TimeLineID ThisTimeLineID
Definition: xlog.c:187
TimeLineID currTLI
Definition: xlogreader.h:182
static TimeLineID sendTimeLine
Definition: walsender.c:140
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:141
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:143
TimeLineID ws_tli
Definition: xlogreader.h:39
static bool sendTimeLineIsHistoric
Definition: walsender.c:142
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3164 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3165 {
3166  Interval *result = palloc(sizeof(Interval));
3167 
3168  result->month = 0;
3169  result->day = 0;
3170  result->time = offset;
3171 
3172  return result;
3173 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:949

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 822 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

825 {
826  ListCell *lc;
827  bool snapshot_action_given = false;
828  bool reserve_wal_given = false;
829 
830  /* Parse options */
831  foreach(lc, cmd->options)
832  {
833  DefElem *defel = (DefElem *) lfirst(lc);
834 
835  if (strcmp(defel->defname, "export_snapshot") == 0)
836  {
837  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
838  ereport(ERROR,
839  (errcode(ERRCODE_SYNTAX_ERROR),
840  errmsg("conflicting or redundant options")));
841 
842  snapshot_action_given = true;
843  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
845  }
846  else if (strcmp(defel->defname, "use_snapshot") == 0)
847  {
848  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
849  ereport(ERROR,
850  (errcode(ERRCODE_SYNTAX_ERROR),
851  errmsg("conflicting or redundant options")));
852 
853  snapshot_action_given = true;
854  *snapshot_action = CRS_USE_SNAPSHOT;
855  }
856  else if (strcmp(defel->defname, "reserve_wal") == 0)
857  {
858  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
859  ereport(ERROR,
860  (errcode(ERRCODE_SYNTAX_ERROR),
861  errmsg("conflicting or redundant options")));
862 
863  reserve_wal_given = true;
864  *reserve_wal = true;
865  }
866  else
867  elog(ERROR, "unrecognized option: %s", defel->defname);
868  }
869 }
int errcode(int sqlerrcode)
Definition: elog.c:608
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
char * defname
Definition: parsenodes.h:730

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3180 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, Int64GetDatum(), IntervalPGetDatum, is_member_of_role(), IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, WalSnd::spillBytes, WalSnd::spillCount, WalSnd::spillTxns, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3181 {
3182 #define PG_STAT_GET_WAL_SENDERS_COLS 15
3183  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3184  TupleDesc tupdesc;
3185  Tuplestorestate *tupstore;
3186  MemoryContext per_query_ctx;
3187  MemoryContext oldcontext;
3188  List *sync_standbys;
3189  int i;
3190 
3191  /* check to see if caller supports us returning a tuplestore */
3192  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3193  ereport(ERROR,
3194  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3195  errmsg("set-valued function called in context that cannot accept a set")));
3196  if (!(rsinfo->allowedModes & SFRM_Materialize))
3197  ereport(ERROR,
3198  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3199  errmsg("materialize mode required, but it is not " \
3200  "allowed in this context")));
3201 
3202  /* Build a tuple descriptor for our result type */
3203  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3204  elog(ERROR, "return type must be a row type");
3205 
3206  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3207  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3208 
3209  tupstore = tuplestore_begin_heap(true, false, work_mem);
3210  rsinfo->returnMode = SFRM_Materialize;
3211  rsinfo->setResult = tupstore;
3212  rsinfo->setDesc = tupdesc;
3213 
3214  MemoryContextSwitchTo(oldcontext);
3215 
3216  /*
3217  * Get the currently active synchronous standbys.
3218  */
3219  LWLockAcquire(SyncRepLock, LW_SHARED);
3220  sync_standbys = SyncRepGetSyncStandbys(NULL);
3221  LWLockRelease(SyncRepLock);
3222 
3223  for (i = 0; i < max_wal_senders; i++)
3224  {
3225  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3227  XLogRecPtr write;
3228  XLogRecPtr flush;
3229  XLogRecPtr apply;
3230  TimeOffset writeLag;
3231  TimeOffset flushLag;
3232  TimeOffset applyLag;
3233  int priority;
3234  int pid;
3236  TimestampTz replyTime;
3237  int64 spillTxns;
3238  int64 spillCount;
3239  int64 spillBytes;
3241  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3242 
3243  SpinLockAcquire(&walsnd->mutex);
3244  if (walsnd->pid == 0)
3245  {
3246  SpinLockRelease(&walsnd->mutex);
3247  continue;
3248  }
3249  pid = walsnd->pid;
3250  sentPtr = walsnd->sentPtr;
3251  state = walsnd->state;
3252  write = walsnd->write;
3253  flush = walsnd->flush;
3254  apply = walsnd->apply;
3255  writeLag = walsnd->writeLag;
3256  flushLag = walsnd->flushLag;
3257  applyLag = walsnd->applyLag;
3258  priority = walsnd->sync_standby_priority;
3259  replyTime = walsnd->replyTime;
3260  spillTxns = walsnd->spillTxns;
3261  spillCount = walsnd->spillCount;
3262  spillBytes = walsnd->spillBytes;
3263  SpinLockRelease(&walsnd->mutex);
3264 
3265  memset(nulls, 0, sizeof(nulls));
3266  values[0] = Int32GetDatum(pid);
3267 
3268  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3269  {
3270  /*
3271  * Only superusers and members of pg_read_all_stats can see
3272  * details. Other users only get the pid value to know it's a
3273  * walsender, but no details.
3274  */
3275  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3276  }
3277  else
3278  {
3279  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3280 
3281  if (XLogRecPtrIsInvalid(sentPtr))
3282  nulls[2] = true;
3283  values[2] = LSNGetDatum(sentPtr);
3284 
3285  if (XLogRecPtrIsInvalid(write))
3286  nulls[3] = true;
3287  values[3] = LSNGetDatum(write);
3288 
3289  if (XLogRecPtrIsInvalid(flush))
3290  nulls[4] = true;
3291  values[4] = LSNGetDatum(flush);
3292 
3293  if (XLogRecPtrIsInvalid(apply))
3294  nulls[5] = true;
3295  values[5] = LSNGetDatum(apply);
3296 
3297  /*
3298  * Treat a standby such as a pg_basebackup background process
3299  * which always returns an invalid flush location, as an
3300  * asynchronous standby.
3301  */
3302  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3303 
3304  if (writeLag < 0)
3305  nulls[6] = true;
3306  else
3307  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3308 
3309  if (flushLag < 0)
3310  nulls[7] = true;
3311  else
3312  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3313 
3314  if (applyLag < 0)
3315  nulls[8] = true;
3316  else
3317  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3318 
3319  values[9] = Int32GetDatum(priority);
3320 
3321  /*
3322  * More easily understood version of standby state. This is purely
3323  * informational.
3324  *
3325  * In quorum-based sync replication, the role of each standby
3326  * listed in synchronous_standby_names can be changing very
3327  * frequently. Any standbys considered as "sync" at one moment can
3328  * be switched to "potential" ones at the next moment. So, it's
3329  * basically useless to report "sync" or "potential" as their sync
3330  * states. We report just "quorum" for them.
3331  */
3332  if (priority == 0)
3333  values[10] = CStringGetTextDatum("async");
3334  else if (list_member_int(sync_standbys, i))
3336  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3337  else
3338  values[10] = CStringGetTextDatum("potential");
3339 
3340  if (replyTime == 0)
3341  nulls[11] = true;
3342  else
3343  values[11] = TimestampTzGetDatum(replyTime);
3344 
3345  /* spill to disk */
3346  values[12] = Int64GetDatum(spillTxns);
3347  values[13] = Int64GetDatum(spillCount);
3348  values[14] = Int64GetDatum(spillBytes);
3349  }
3350 
3351  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3352  }
3353 
3354  /* clean up and return the tuplestore */
3355  tuplestore_donestoring(tupstore);
3356 
3357  return (Datum) 0;
3358 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:380
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:962
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:700
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
int64 spillCount
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
int64 spillBytes
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
int64 spillTxns
bool list_member_int(const List *list, int datum)
Definition: list.c:655
WalSndState state
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1699
#define ereport(elevel, rest)
Definition: elog.h:141
int max_wal_senders
Definition: walsender.c:120
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3145
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:121
static XLogRecPtr sentPtr
Definition: walsender.c:149
int allowedModes
Definition: execnodes.h:302
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4924
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:304
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:230
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:307
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:300
#define Int32GetDatum(X)
Definition: postgres.h:479
TupleDesc setDesc
Definition: execnodes.h:308
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:83
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3164
XLogRecPtr apply
Definition: pg_list.h:50

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1765 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1766 {
1767  bool changed = false;
1769 
1770  Assert(lsn != InvalidXLogRecPtr);
1771  SpinLockAcquire(&slot->mutex);
1772  if (slot->data.restart_lsn != lsn)
1773  {
1774  changed = true;
1775  slot->data.restart_lsn = lsn;
1776  }
1777  SpinLockRelease(&slot->mutex);
1778 
1779  if (changed)
1780  {
1783  }
1784 
1785  /*
1786  * One could argue that the slot should be saved to disk now, but that'd
1787  * be energy wasted - the worst lost information can do here is give us
1788  * wrong information in a statistics view - we'll just potentially be more
1789  * conservative in removing files.
1790  */
1791 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:132
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
XLogRecPtr restart_lsn
Definition: slot.h:72
slock_t mutex
Definition: slot.h:105
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1902 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1903 {
1904  bool changed = false;
1906 
1907  SpinLockAcquire(&slot->mutex);
1909 
1910  /*
1911  * For physical replication we don't need the interlock provided by xmin
1912  * and effective_xmin since the consequences of a missed increase are
1913  * limited to query cancellations, so set both at once.
1914  */
1915  if (!TransactionIdIsNormal(slot->data.xmin) ||
1916  !TransactionIdIsNormal(feedbackXmin) ||
1917  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1918  {
1919  changed = true;
1920  slot->data.xmin = feedbackXmin;
1921  slot->effective_xmin = feedbackXmin;
1922  }
1923  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1924  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1925  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1926  {
1927  changed = true;
1928  slot->data.catalog_xmin = feedbackCatalogXmin;
1929  slot->effective_catalog_xmin = feedbackCatalogXmin;
1930  }
1931  SpinLockRelease(&slot->mutex);
1932 
1933  if (changed)
1934  {
1937  }
1938 }
TransactionId xmin
Definition: proc.h:228
ReplicationSlotPersistentData data
Definition: slot.h:132
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:128
TransactionId catalog_xmin
Definition: slot.h:69
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:61
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:129
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:105
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1630 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1631 {
1632  unsigned char firstchar;
1633  int r;
1634  bool received = false;
1635 
1637 
1638  for (;;)
1639  {
1640  pq_startmsgread();
1641  r = pq_getbyte_if_available(&firstchar);
1642  if (r < 0)
1643  {
1644  /* unexpected error or EOF */
1646  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1647  errmsg("unexpected EOF on standby connection")));
1648  proc_exit(0);
1649  }
1650  if (r == 0)
1651  {
1652  /* no data available without blocking */
1653  pq_endmsgread();
1654  break;
1655  }
1656 
1657  /* Read the message contents */
1659  if (pq_getmessage(&reply_message, 0))
1660  {
1662  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1663  errmsg("unexpected EOF on standby connection")));
1664  proc_exit(0);
1665  }
1666 
1667  /*
1668  * If we already received a CopyDone from the frontend, the frontend
1669  * should not send us anything until we've closed our end of the COPY.
1670  * XXX: In theory, the frontend could already send the next command
1671  * before receiving the CopyDone, but libpq doesn't currently allow
1672  * that.
1673  */
1674  if (streamingDoneReceiving && firstchar != 'X')
1675  ereport(FATAL,
1676  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1677  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1678  firstchar)));
1679 
1680  /* Handle the very limited subset of commands expected in this phase */
1681  switch (firstchar)
1682  {
1683  /*
1684  * 'd' means a standby reply wrapped in a CopyData packet.
1685  */
1686  case 'd':
1688  received = true;
1689  break;
1690 
1691  /*
1692  * CopyDone means the standby requested to finish streaming.
1693  * Reply with CopyDone, if we had not sent that already.
1694  */
1695  case 'c':
1696  if (!streamingDoneSending)
1697  {
1698  pq_putmessage_noblock('c', NULL, 0);
1699  streamingDoneSending = true;
1700  }
1701 
1702  streamingDoneReceiving = true;
1703  received = true;
1704  break;
1705 
1706  /*
1707  * 'X' means that the standby is closing down the socket.
1708  */
1709  case 'X':
1710  proc_exit(0);
1711 
1712  default:
1713  ereport(FATAL,
1714  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1715  errmsg("invalid standby message type \"%c\"",
1716  firstchar)));
1717  }
1718  }
1719 
1720  /*
1721  * Save the last reply timestamp if we've received at least one reply.
1722  */
1723  if (received)
1724  {
1726  waiting_for_ping_response = false;
1727  }
1728 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1734
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:608
static TimestampTz last_processing
Definition: walsender.c:157
void pq_startmsgread(void)
Definition: pqcomm.c:1211
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1041
static bool streamingDoneSending
Definition: walsender.c:174
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
#define ereport(elevel, rest)
Definition: elog.h:141
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1273
static StringInfoData reply_message
Definition: walsender.c:153
void pq_endmsgread(void)
Definition: pqcomm.c:1235
static bool streamingDoneReceiving
Definition: walsender.c:175
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:166
static TimestampTz last_reply_timestamp
Definition: walsender.c:163

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1982 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, log_min_messages, WalSnd::mutex, MyPgXact, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1983 {
1984  TransactionId feedbackXmin;
1985  uint32 feedbackEpoch;
1986  TransactionId feedbackCatalogXmin;
1987  uint32 feedbackCatalogEpoch;
1988  TimestampTz replyTime;
1989 
1990  /*
1991  * Decipher the reply message. The caller already consumed the msgtype
1992  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1993  * of this message.
1994  */
1995  replyTime = pq_getmsgint64(&reply_message);
1996  feedbackXmin = pq_getmsgint(&reply_message, 4);
1997  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1998  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1999  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2000 
2001  if (log_min_messages <= DEBUG2)
2002  {
2003  char *replyTimeStr;
2004 
2005  /* Copy because timestamptz_to_str returns a static buffer */
2006  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2007 
2008  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2009  feedbackXmin,
2010  feedbackEpoch,
2011  feedbackCatalogXmin,
2012  feedbackCatalogEpoch,
2013  replyTimeStr);
2014 
2015  pfree(replyTimeStr);
2016  }
2017 
2018  /*
2019  * Update shared state for this WalSender process based on reply data from
2020  * standby.
2021  */
2022  {
2023  WalSnd *walsnd = MyWalSnd;
2024 
2025  SpinLockAcquire(&walsnd->mutex);
2026  walsnd->replyTime = replyTime;
2027  SpinLockRelease(&walsnd->mutex);
2028  }
2029 
2030  /*
2031  * Unset WalSender's xmins if the feedback message values are invalid.
2032  * This happens when the downstream turned hot_standby_feedback off.
2033  */
2034  if (!TransactionIdIsNormal(feedbackXmin)
2035  && !TransactionIdIsNormal(feedbackCatalogXmin))
2036  {
2038  if (MyReplicationSlot != NULL)
2039  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2040  return;
2041  }
2042 
2043  /*
2044  * Check that the provided xmin/epoch are sane, that is, not in the future
2045  * and not so far back as to be already wrapped around. Ignore if not.
2046  */
2047  if (TransactionIdIsNormal(feedbackXmin) &&
2048  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2049  return;
2050 
2051  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2052  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2053  return;
2054 
2055  /*
2056  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2057  * the xmin will be taken into account by GetOldestXmin. This will hold
2058  * back the removal of dead rows and thereby prevent the generation of
2059  * cleanup conflicts on the standby server.
2060  *
2061  * There is a small window for a race condition here: although we just
2062  * checked that feedbackXmin precedes nextXid, the nextXid could have
2063  * gotten advanced between our fetching it and applying the xmin below,
2064  * perhaps far enough to make feedbackXmin wrap around. In that case the
2065  * xmin we set here would be "in the future" and have no effect. No point
2066  * in worrying about this since it's too late to save the desired data
2067  * anyway. Assuming that the standby sends us an increasing sequence of
2068  * xmins, this could only happen during the first reply cycle, else our
2069  * own xmin would prevent nextXid from advancing so far.
2070  *
2071  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2072  * is assumed atomic, and there's no real need to prevent a concurrent
2073  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2074  * safe, and if we're moving it backwards, well, the data is at risk
2075  * already since a VACUUM could have just finished calling GetOldestXmin.)
2076  *
2077  * If we're using a replication slot we reserve the xmin via that,
2078  * otherwise via the walsender's PGXACT entry. We can only track the
2079  * catalog xmin separately when using a slot, so we store the least of the
2080  * two provided when not using a slot.
2081  *
2082  * XXX: It might make sense to generalize the ephemeral slot concept and
2083  * always use the slot mechanism to handle the feedback xmin.
2084  */
2085  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2086  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2087  else
2088  {
2089  if (TransactionIdIsNormal(feedbackCatalogXmin)
2090  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2091  MyPgXact->xmin = feedbackCatalogXmin;
2092  else
2093  MyPgXact->xmin = feedbackXmin;
2094  }
2095 }
uint32 TransactionId
Definition: c.h:514
TransactionId xmin
Definition: proc.h:228
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
slock_t mutex
PGXACT * MyPgXact
Definition: proc.c:68
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1951
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1056
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:359
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:153
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
int log_min_messages
Definition: guc.c:514
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:228
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1902
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1734 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1735 {
1736  char msgtype;
1737 
1738  /*
1739  * Check message type from the first byte.
1740  */
1741  msgtype = pq_getmsgbyte(&reply_message);
1742 
1743  switch (msgtype)
1744  {
1745  case 'r':
1747  break;
1748 
1749  case 'h':
1751  break;
1752 
1753  default:
1755  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1756  errmsg("unexpected message type \"%c\"", msgtype)));
1757  proc_exit(0);
1758  }
1759 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:608
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
static StringInfoData reply_message
Definition: walsender.c:153
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1797
int errmsg(const char *fmt,...)
Definition: elog.c:822
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1982

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1797 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), log_min_messages, LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1798 {
1799  XLogRecPtr writePtr,
1800  flushPtr,
1801  applyPtr;
1802  bool replyRequested;
1803  TimeOffset writeLag,
1804  flushLag,
1805  applyLag;
1806  bool clearLagTimes;
1807  TimestampTz now;
1808  TimestampTz replyTime;
1809 
1810  static bool fullyAppliedLastTime = false;
1811 
1812  /* the caller already consumed the msgtype byte */
1813  writePtr = pq_getmsgint64(&reply_message);
1814  flushPtr = pq_getmsgint64(&reply_message);
1815  applyPtr = pq_getmsgint64(&reply_message);
1816  replyTime = pq_getmsgint64(&reply_message);
1817  replyRequested = pq_getmsgbyte(&reply_message);
1818 
1819  if (log_min_messages <= DEBUG2)
1820  {
1821  char *replyTimeStr;
1822 
1823  /* Copy because timestamptz_to_str returns a static buffer */
1824  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1825 
1826  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1827  (uint32) (writePtr >> 32), (uint32) writePtr,
1828  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1829  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1830  replyRequested ? " (reply requested)" : "",
1831  replyTimeStr);
1832 
1833  pfree(replyTimeStr);
1834  }
1835 
1836  /* See if we can compute the round-trip lag for these positions. */
1837  now = GetCurrentTimestamp();
1838  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1839  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1840  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1841 
1842  /*
1843  * If the standby reports that it has fully replayed the WAL in two
1844  * consecutive reply messages, then the second such message must result
1845  * from wal_receiver_status_interval expiring on the standby. This is a
1846  * convenient time to forget the lag times measured when it last
1847  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1848  * until more WAL traffic arrives.
1849  */
1850  clearLagTimes = false;
1851  if (applyPtr == sentPtr)
1852  {
1853  if (fullyAppliedLastTime)
1854  clearLagTimes = true;
1855  fullyAppliedLastTime = true;
1856  }
1857  else
1858  fullyAppliedLastTime = false;
1859 
1860  /* Send a reply if the standby requested one. */
1861  if (replyRequested)
1862  WalSndKeepalive(false);
1863 
1864  /*
1865  * Update shared state for this WalSender process based on reply data from
1866  * standby.
1867  */
1868  {
1869  WalSnd *walsnd = MyWalSnd;
1870 
1871  SpinLockAcquire(&walsnd->mutex);
1872  walsnd->write = writePtr;
1873  walsnd->flush = flushPtr;
1874  walsnd->apply = applyPtr;
1875  if (writeLag != -1 || clearLagTimes)
1876  walsnd->writeLag = writeLag;
1877  if (flushLag != -1 || clearLagTimes)
1878  walsnd->flushLag = flushLag;
1879  if (applyLag != -1 || clearLagTimes)
1880  walsnd->applyLag = applyLag;
1881  walsnd->replyTime = replyTime;
1882  SpinLockRelease(&walsnd->mutex);
1883  }
1884 
1887 
1888  /*
1889  * Advance our local xmin horizon when the client confirmed a flush.
1890  */
1891  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1892  {
1895  else
1897  }
1898 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1056
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3366
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1765
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:359
#define SlotIsLogical(slot)
Definition: slot.h:154
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:153
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3489
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
int log_min_messages
Definition: guc.c:514
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1001
#define elog(elevel,...)
Definition: elog.h:228
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:426
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
bool am_cascading_walsender
Definition: walsender.c:115
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 435 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

436 {
438  char histfname[MAXFNAMELEN];
439  char path[MAXPGPATH];
440  int fd;
441  off_t histfilelen;
442  off_t bytesleft;
443  Size len;
444 
445  /*
446  * Reply with a result set with one row, and two columns. The first col is
447  * the name of the history file, 2nd is the contents.
448  */
449 
450  TLHistoryFileName(histfname, cmd->timeline);
451  TLHistoryFilePath(path, cmd->timeline);
452 
453  /* Send a RowDescription message */
454  pq_beginmessage(&buf, 'T');
455  pq_sendint16(&buf, 2); /* 2 fields */
456 
457  /* first field */
458  pq_sendstring(&buf, "filename"); /* col name */
459  pq_sendint32(&buf, 0); /* table oid */
460  pq_sendint16(&buf, 0); /* attnum */
461  pq_sendint32(&buf, TEXTOID); /* type oid */
462  pq_sendint16(&buf, -1); /* typlen */
463  pq_sendint32(&buf, 0); /* typmod */
464  pq_sendint16(&buf, 0); /* format code */
465 
466  /* second field */
467  pq_sendstring(&buf, "content"); /* col name */
468  pq_sendint32(&buf, 0); /* table oid */
469  pq_sendint16(&buf, 0); /* attnum */
470  pq_sendint32(&buf, BYTEAOID); /* type oid */
471  pq_sendint16(&buf, -1); /* typlen */
472  pq_sendint32(&buf, 0); /* typmod */
473  pq_sendint16(&buf, 0); /* format code */
474  pq_endmessage(&buf);
475 
476  /* Send a DataRow message */
477  pq_beginmessage(&buf, 'D');
478  pq_sendint16(&buf, 2); /* # of columns */
479  len = strlen(histfname);
480  pq_sendint32(&buf, len); /* col1 len */
481  pq_sendbytes(&buf, histfname, len);
482 
483  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
484  if (fd < 0)
485  ereport(ERROR,
487  errmsg("could not open file \"%s\": %m", path)));
488 
489  /* Determine file length and send it to client */
490  histfilelen = lseek(fd, 0, SEEK_END);
491  if (histfilelen < 0)
492  ereport(ERROR,
494  errmsg("could not seek to end of file \"%s\": %m", path)));
495  if (lseek(fd, 0, SEEK_SET) != 0)
496  ereport(ERROR,
498  errmsg("could not seek to beginning of file \"%s\": %m", path)));
499 
500  pq_sendint32(&buf, histfilelen); /* col2 len */
501 
502  bytesleft = histfilelen;
503  while (bytesleft > 0)
504  {
505  PGAlignedBlock rbuf;
506  int nread;
507 
509  nread = read(fd, rbuf.data, sizeof(rbuf));
511  if (nread < 0)
512  ereport(ERROR,
514  errmsg("could not read file \"%s\": %m",
515  path)));
516  else if (nread == 0)
517  ereport(ERROR,
519  errmsg("could not read file \"%s\": read %d of %zu",
520  path, nread, (Size) bytesleft)));
521 
522  pq_sendbytes(&buf, rbuf.data, nread);
523  bytesleft -= nread;
524  }
525 
526  if (CloseTransientFile(fd) != 0)
527  ereport(ERROR,
529  errmsg("could not close file \"%s\": %m", path)));
530 
531  pq_endmessage(&buf);
532 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
int errcode(int sqlerrcode)
Definition: elog.c:608
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1222
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1091
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:67
int errcode_for_file_access(void)
Definition: elog.c:631
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define MAXFNAMELEN
size_t Size
Definition: c.h:467
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:822
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1086 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1087 {
1089 
1090  /* make sure that our requirements are still fulfilled */
1092 
1094 
1095  ReplicationSlotAcquire(cmd->slotname, true);
1096 
1097  /*
1098  * Force a disconnect, so that the decoding code doesn't need to care
1099  * about an eventual switch from running in recovery, to running in a
1100  * normal environment. Client code is expected to handle reconnects.
1101  */
1103  {
1104  ereport(LOG,
1105  (errmsg("terminating walsender process after promotion")));
1106  got_STOPPING = true;
1107  }
1108 
1109  /*
1110  * Create our decoding context, making it start at the previously ack'ed
1111  * position.
1112  *
1113  * Do this before sending a CopyBothResponse message, so that any errors
1114  * are reported early.
1115  */
1117  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1121 
1122 
1124 
1125  /* Send a CopyBothResponse message, and start streaming */
1126  pq_beginmessage(&buf, 'W');
1127  pq_sendbyte(&buf, 0);
1128  pq_sendint16(&buf, 0);
1129  pq_endmessage(&buf);
1130  pq_flush();
1131 
1132 
1133  /* Start reading WAL from the oldest required WAL. */
1135 
1136  /*
1137  * Report the location after which we'll send out further commits as the
1138  * current sentPtr.
1139  */
1141 
1142  /* Also update the sent position status in shared memory */
1146 
1147  replication_active = true;
1148 
1150 
1151  /* Main loop of walsender */
1153 
1156 
1157  replication_active = false;
1158  if (got_STOPPING)
1159  proc_exit(0);
1161 
1162  /* Get out of COPY mode (CommandComplete). */
1163  EndCommand("COPY 0", DestRemote);
1164 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
void proc_exit(int code)
Definition: ipc.c:104
ReplicationSlotPersistentData data
Definition: slot.h:132
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7935
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:80
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:192
static char * buf
Definition: pg_test_fsync.c:67
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1202
static XLogRecPtr logical_startptr
Definition: walsender.c:193
void ReplicationSlotRelease(void)
Definition: slot.c:424
void SyncRepInitConfig(void)
Definition: syncrep.c:398
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2182
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1175
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:190
XLogRecPtr sentPtr
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:370
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
void WalSndSetState(WalSndState state)
Definition: walsender.c:3126
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:508
static void XLogSendLogical(void)
Definition: walsender.c:2772
XLogRecPtr restart_lsn
Definition: slot.h:72
int errmsg(const char *fmt,...)
Definition: elog.c:822
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:766
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
bool am_cascading_walsender
Definition: walsender.c:115
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1292

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 541 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

542 {
544  XLogRecPtr FlushPtr;
545 
546  if (ThisTimeLineID == 0)
547  ereport(ERROR,
548  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
549  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
550 
551  /*
552  * We assume here that we're logging enough information in the WAL for
553  * log-shipping, since this is checked in PostmasterMain().
554  *
555  * NOTE: wal_level can only change at shutdown, so in most cases it is
556  * difficult for there to be WAL data that we can still see that was
557  * written at wal_level='minimal'.
558  */
559 
560  if (cmd->slotname)
561  {
562  ReplicationSlotAcquire(cmd->slotname, true);
564  ereport(ERROR,
565  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
566  (errmsg("cannot use a logical replication slot for physical replication"))));
567  }
568 
569  /*
570  * Select the timeline. If it was given explicitly by the client, use
571  * that. Otherwise use the timeline of the last replayed record, which is
572  * kept in ThisTimeLineID.
573  */
575  {
576  /* this also updates ThisTimeLineID */
577  FlushPtr = GetStandbyFlushRecPtr();
578  }
579  else
580  FlushPtr = GetFlushRecPtr();
581 
582  if (cmd->timeline != 0)
583  {
584  XLogRecPtr switchpoint;
585 
586  sendTimeLine = cmd->timeline;
588  {
589  sendTimeLineIsHistoric = false;
591  }
592  else
593  {
594  List *timeLineHistory;
595 
596  sendTimeLineIsHistoric = true;
597 
598  /*
599  * Check that the timeline the client requested exists, and the
600  * requested start location is on that timeline.
601  */
602  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
603  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
605  list_free_deep(timeLineHistory);
606 
607  /*
608  * Found the requested timeline in the history. Check that
609  * requested startpoint is on that timeline in our history.
610  *
611  * This is quite loose on purpose. We only check that we didn't
612  * fork off the requested timeline before the switchpoint. We
613  * don't check that we switched *to* it before the requested
614  * starting point. This is because the client can legitimately
615  * request to start replication from the beginning of the WAL
616  * segment that contains switchpoint, but on the new timeline, so
617  * that it doesn't end up with a partial segment. If you ask for
618  * too old a starting point, you'll get an error later when we
619  * fail to find the requested WAL segment in pg_wal.
620  *
621  * XXX: we could be more strict here and only allow a startpoint
622  * that's older than the switchpoint, if it's still in the same
623  * WAL segment.
624  */
625  if (!XLogRecPtrIsInvalid(switchpoint) &&
626  switchpoint < cmd->startpoint)
627  {
628  ereport(ERROR,
629  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
630  (uint32) (cmd->startpoint >> 32),
631  (uint32) (cmd->startpoint),
632  cmd->timeline),
633  errdetail("This server's history forked from timeline %u at %X/%X.",
634  cmd->timeline,
635  (uint32) (switchpoint >> 32),
636  (uint32) (switchpoint))));
637  }
638  sendTimeLineValidUpto = switchpoint;
639  }
640  }
641  else
642  {
645  sendTimeLineIsHistoric = false;
646  }
647 
649 
650  /* If there is nothing to stream, don't even enter COPY mode */
652  {
653  /*
654  * When we first start replication the standby will be behind the
655  * primary. For some applications, for example synchronous
656  * replication, it is important to have a clear state for this initial
657  * catchup mode, so we can trigger actions when we change streaming
658  * state later. We may stay in this state for a long time, which is
659  * exactly why we want to be able to monitor whether or not we are
660  * still here.
661  */
663 
664  /* Send a CopyBothResponse message, and start streaming */
665  pq_beginmessage(&buf, 'W');
666  pq_sendbyte(&buf, 0);
667  pq_sendint16(&buf, 0);
668  pq_endmessage(&buf);
669  pq_flush();
670 
671  /*
672  * Don't allow a request to stream from a future point in WAL that
673  * hasn't been flushed to disk in this server yet.
674  */
675  if (FlushPtr < cmd->startpoint)
676  {
677  ereport(ERROR,
678  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
679  (uint32) (cmd->startpoint >> 32),
680  (uint32) (cmd->startpoint),
681  (uint32) (FlushPtr >> 32),
682  (uint32) (FlushPtr))));
683  }
684 
685  /* Start streaming from the requested point */
686  sentPtr = cmd->startpoint;
687 
688  /* Initialize shared memory status, too */
692 
694 
695  /* Main loop of walsender */
696  replication_active = true;
697 
699 
700  replication_active = false;
701  if (got_STOPPING)
702  proc_exit(0);
704 
706  }
707 
708  if (cmd->slotname)
710 
711  /*
712  * Copy is finished now. Send a single-row result set indicating the next
713  * timeline.
714  */
716  {
717  char startpos_str[8 + 1 + 8 + 1];
719  TupOutputState *tstate;
720  TupleDesc tupdesc;
721  Datum values[2];
722  bool nulls[2];
723 
724  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
725  (uint32) (sendTimeLineValidUpto >> 32),
727 
729  MemSet(nulls, false, sizeof(nulls));
730 
731  /*
732  * Need a tuple descriptor representing two columns. int8 may seem
733  * like a surprising data type for this, but in theory int4 would not
734  * be wide enough for this, as TimeLineID is unsigned.
735  */
736  tupdesc = CreateTemplateTupleDesc(2);
737  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
738  INT8OID, -1, 0);
739  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
740  TEXTOID, -1, 0);
741 
742  /* prepare for projection of tuple */
743  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
744 
745  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
746  values[1] = CStringGetTextDatum(startpos_str);
747 
748  /* send it to dest */
749  do_tup_output(tstate, values, nulls);
750 
751  end_tup_output(tstate);
752  }
753 
754  /* Send CommandComplete message */
755  pq_puttextmessage('C', "START_STREAMING");
756 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2467
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:608
#define MemSet(start, val, len)
Definition: c.h:962
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8267
void list_free_deep(List *list)
Definition: list.c:1391
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
static char * buf
Definition: pg_test_fsync.c:67
static bool streamingDoneSending
Definition: walsender.c:174
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:955
unsigned int uint32
Definition: c.h:359
void ReplicationSlotRelease(void)
Definition: slot.c:424
#define SlotIsLogical(slot)
Definition: slot.h:154
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1699
void SyncRepInitConfig(void)
Definition: syncrep.c:398
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2182
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:190
uintptr_t Datum
Definition: postgres.h:367
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
TimeLineID ThisTimeLineID
Definition: xlog.c:187
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:140
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
void WalSndSetState(WalSndState state)
Definition: walsender.c:3126
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:141
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:143
static bool streamingDoneReceiving
Definition: walsender.c:175
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2882
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
Definition: pg_list.h:50
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:142
bool am_cascading_walsender
Definition: walsender.c:115

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 1951 of file walsender.c.

References EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

1952 {
1953  FullTransactionId nextFullXid;
1954  TransactionId nextXid;
1955  uint32 nextEpoch;
1956 
1957  nextFullXid = ReadNextFullTransactionId();
1958  nextXid = XidFromFullTransactionId(nextFullXid);
1959  nextEpoch = EpochFromFullTransactionId(nextFullXid);
1960 
1961  if (xid <= nextXid)
1962  {
1963  if (epoch != nextEpoch)
1964  return false;
1965  }
1966  else
1967  {
1968  if (epoch + 1 != nextEpoch)
1969  return false;
1970  }
1971 
1972  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1973  return false; /* epoch OK, but it's wrapped around */
1974 
1975  return true;
1976 }
uint32 TransactionId
Definition: c.h:514
#define XidFromFullTransactionId(x)
Definition: transam.h:48
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:359
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ UpdateSpillStats()

static void UpdateSpillStats ( LogicalDecodingContext ctx)
static

Definition at line 3587 of file walsender.c.

References DEBUG2, elog, WalSnd::mutex, LogicalDecodingContext::reorder, WalSnd::spillBytes, ReorderBuffer::spillBytes, WalSnd::spillCount, ReorderBuffer::spillCount, WalSnd::spillTxns, ReorderBuffer::spillTxns, SpinLockAcquire, and SpinLockRelease.

Referenced by WalSndUpdateProgress().

3588 {
3589  ReorderBuffer *rb = ctx->reorder;
3590 
3592 
3593  MyWalSnd->spillTxns = rb->spillTxns;
3596 
3597  elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
3598  rb,
3599  (long long) rb->spillTxns,
3600  (long long) rb->spillCount,
3601  (long long) rb->spillBytes);
3602 
3604 }
struct ReorderBuffer * reorder
Definition: logical.h:42
slock_t mutex
int64 spillCount
int64 spillBytes
#define SpinLockAcquire(lock)
Definition: spin.h:62
int64 spillTxns
#define DEBUG2
Definition: elog.h:24
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define elog(elevel,...)
Definition: elog.h:228

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2155 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2156 {
2157  TimestampTz timeout;
2158 
2159  /* don't bail out if we're doing something that doesn't require timeouts */
2160  if (last_reply_timestamp <= 0)
2161  return;
2162 
2165 
2166  if (wal_sender_timeout > 0 && last_processing >= timeout)
2167  {
2168  /*
2169  * Since typically expiration of replication timeout means
2170  * communication problem, we don't send the error message to the
2171  * standby.
2172  */
2174  (errmsg("terminating walsender process due to replication timeout")));
2175 
2176  WalSndShutdown();
2177  }
2178 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:157
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndShutdown(void)
Definition: walsender.c:225
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:822
static TimestampTz last_reply_timestamp
Definition: walsender.c:163

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2105 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2106 {
2107  long sleeptime = 10000; /* 10 s */
2108 
2110  {
2111  TimestampTz wakeup_time;
2112  long sec_to_timeout;
2113  int microsec_to_timeout;
2114 
2115  /*
2116  * At the latest stop sleeping once wal_sender_timeout has been
2117  * reached.
2118  */
2121 
2122  /*
2123  * If no ping has been sent yet, wakeup when it's time to do so.
2124  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2125  * the timeout passed without a response.
2126  */
2129  wal_sender_timeout / 2);
2130 
2131  /* Compute relative time until wakeup. */
2132  TimestampDifference(now, wakeup_time,
2133  &sec_to_timeout, &microsec_to_timeout);
2134 
2135  sleeptime = sec_to_timeout * 1000 +
2136  microsec_to_timeout / 1000;
2137  }
2138 
2139  return sleeptime;
2140 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:41
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1657
static bool waiting_for_ping_response
Definition: walsender.c:166
static TimestampTz last_reply_timestamp
Definition: walsender.c:163
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2842 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2843 {
2844  XLogRecPtr replicatedPtr;
2845 
2846  /* ... let's just be real sure we're caught up ... */
2847  send_data();
2848 
2849  /*
2850  * To figure out whether all WAL has successfully been replicated, check
2851  * flush location if valid, write otherwise. Tools like pg_receivewal will
2852  * usually (unless in synchronous mode) return an invalid flush location.
2853  */
2854  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2856 
2857  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2858  !pq_is_send_pending())
2859  {
2860  /* Inform the standby that XLOG streaming is done */
2861  EndCommand("COPY 0", DestRemote);
2862  pq_flush();
2863 
2864  proc_exit(0);
2865  }
2867  {
2868  WalSndKeepalive(true);
2870  }
2871 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3366
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:178
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:166

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 299 of file walsender.c.

References close, ConditionVariableCancelSleep(), DestNone, DestRemote, got_SIGUSR2, got_STOPPING, LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_STARTUP, whereToSendOutput, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

300 {
304 
305  if (sendSeg->ws_file >= 0)
306  {
308  sendSeg->ws_file = -1;
309  }
310 
311  if (MyReplicationSlot != NULL)
313 
315 
316  replication_active = false;
317 
318  if (got_STOPPING || got_SIGUSR2)
319  proc_exit(0);
320 
321  /* Revert back to startup state */
323 }
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
void proc_exit(int code)
Definition: ipc.c:104
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:424
static WALOpenSegment * sendSeg
Definition: walsender.c:131
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:181
static volatile sig_atomic_t replication_active
Definition: walsender.c:190
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3126
void ReplicationSlotCleanup(void)
Definition: slot.c:479
void LWLockReleaseAll(void)
Definition: lwlock.c:1825
#define close(a)
Definition: win32.h:12

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3145 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3146 {
3147  switch (state)
3148  {
3149  case WALSNDSTATE_STARTUP:
3150  return "startup";
3151  case WALSNDSTATE_BACKUP:
3152  return "backup";
3153  case WALSNDSTATE_CATCHUP:
3154  return "catchup";
3155  case WALSNDSTATE_STREAMING:
3156  return "streaming";
3157  case WALSNDSTATE_STOPPING:
3158  return "stopping";
3159  }
3160  return "UNKNOWN";
3161 }
Definition: regguts.h:298

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3062 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3063 {
3064  int i;
3065 
3066  for (i = 0; i < max_wal_senders; i++)
3067  {
3068  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3069  pid_t pid;
3070 
3071  SpinLockAcquire(&walsnd->mutex);
3072  pid = walsnd->pid;
3073  SpinLockRelease(&walsnd->mutex);
3074 
3075  if (pid == 0)
3076  continue;
3077 
3079  }
3080 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:179
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3366 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3367 {
3368  elog(DEBUG2, "sending replication keepalive");
3369 
3370  /* construct the message... */
3372  pq_sendbyte(&output_message, 'k');
3375  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3376 
3377  /* ... and send it wrapped in CopyData */
3379 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
static StringInfoData output_message
Definition: walsender.c:152
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static XLogRecPtr sentPtr
Definition: walsender.c:149
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog(elevel,...)
Definition: elog.h:228

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3385 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3386 {
3387  TimestampTz ping_time;
3388 
3389  /*
3390  * Don't send keepalive messages if timeouts are globally disabled or
3391  * we're doing something not partaking in timeouts.
3392  */
3393  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3394  return;
3395 
3397  return;
3398 
3399  /*
3400  * If half of wal_sender_timeout has lapsed without receiving any reply
3401  * from the standby, send a keep-alive message to the standby requesting
3402  * an immediate reply.
3403  */
3405  wal_sender_timeout / 2);
3406  if (last_processing >= ping_time)
3407  {
3408  WalSndKeepalive(true);
3410 
3411  /* Try to flush pending output to the client */
3412  if (pq_flush_if_writable() != 0)
3413  WalSndShutdown();
3414  }
3415 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3366
static TimestampTz last_processing
Definition: walsender.c:157
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:225
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:166
static TimestampTz last_reply_timestamp
Definition: walsender.c:163

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2369 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2370 {
2371  WalSnd *walsnd = MyWalSnd;
2372 
2373  Assert(walsnd != NULL);
2374 
2375  MyWalSnd = NULL;
2376 
2377  SpinLockAcquire(&walsnd->mutex);
2378  /* clear latch while holding the spinlock, so it can safely be read */
2379  walsnd->latch = NULL;
2380  /* Mark WalSnd struct as no longer being in use. */
2381  walsnd->pid = 0;
2382  SpinLockRelease(&walsnd->mutex);
2383 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:739

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2957 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2958 {
2959  int save_errno = errno;
2960 
2961  got_SIGUSR2 = true;
2962  SetLatch(MyLatch);
2963 
2964  errno = save_errno;
2965 }
void SetLatch(Latch *latch)
Definition: latch.c:436
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:181
struct Latch * MyLatch
Definition: globals.c:54

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2182 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

2183 {
2184  /*
2185  * Initialize the last reply timestamp. That enables timeout processing
2186  * from hereon.
2187  */
2189  waiting_for_ping_response = false;
2190 
2191  /*
2192  * Loop until we reach the end of this timeline or the client requests to
2193  * stop streaming.
2194  */
2195  for (;;)
2196  {
2197  /* Clear any already-pending wakeups */
2199 
2201 
2202  /* Process any requests or signals received recently */
2203  if (ConfigReloadPending)
2204  {
2205  ConfigReloadPending = false;
2208  }
2209 
2210  /* Check for input from the client */
2212 
2213  /*
2214  * If we have received CopyDone from the client, sent CopyDone
2215  * ourselves, and the output buffer is empty, it's time to exit
2216  * streaming.
2217  */
2219  !pq_is_send_pending())
2220  break;
2221 
2222  /*
2223  * If we don't have any pending data in the output buffer, try to send
2224  * some more. If there is some, we don't bother to call send_data
2225  * again until we've flushed it ... but we'd better assume we are not
2226  * caught up.
2227  */
2228  if (!pq_is_send_pending())
2229  send_data();
2230  else
2231  WalSndCaughtUp = false;
2232 
2233  /* Try to flush pending output to the client */
2234  if (pq_flush_if_writable() != 0)
2235  WalSndShutdown();
2236 
2237  /* If nothing remains to be sent right now ... */
2239  {
2240  /*
2241  * If we're in catchup state, move to streaming. This is an
2242  * important state change for users to know about, since before
2243  * this point data loss might occur if the primary dies and we
2244  * need to failover to the standby. The state change is also
2245  * important for synchronous replication, since commits that
2246  * started to wait at that point might wait for some time.
2247  */
2249  {
2250  ereport(DEBUG1,
2251  (errmsg("\"%s\" has now caught up with upstream server",
2252  application_name)));
2254  }
2255 
2256  /*
2257  * When SIGUSR2 arrives, we send any outstanding logs up to the
2258  * shutdown checkpoint record (i.e., the latest record), wait for
2259  * them to be replicated to the standby, and exit. This may be a
2260  * normal termination at shutdown, or a promotion, the walsender
2261  * is not sure which.
2262  */
2263  if (got_SIGUSR2)
2264  WalSndDone(send_data);
2265  }
2266 
2267  /* Check for replication timeout. */
2269 
2270  /* Send keepalive if the time has come */
2272 
2273  /*
2274  * We don't block if not caught up, unless there is unsent data
2275  * pending in which case we'd better block until the socket is
2276  * write-ready. This test is only needed for the case where the
2277  * send_data callback handled a subset of the available data but then
2278  * pq_flush_if_writable flushed it all --- we should immediately try
2279  * to send more.
2280  */
2282  {
2283  long sleeptime;
2284  int wakeEvents;
2285 
2286  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2288 
2289  /*
2290  * Use fresh timestamp, not last_processing, to reduce the chance
2291  * of reaching wal_sender_timeout before sending a keepalive.
2292  */
2294 
2295  if (pq_is_send_pending())
2296  wakeEvents |= WL_SOCKET_WRITEABLE;
2297 
2298  /* Sleep until something happens or we time out */
2299  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2300  MyProcPort->sock, sleeptime,
2302  }
2303  }
2304 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2842
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:41
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:174
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:181
void SyncRepInitConfig(void)
Definition: syncrep.c:398
#define ereport(elevel, rest)
Definition: elog.h:141
static bool WalSndCaughtUp
Definition: walsender.c:178
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3385
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2155
static void WalSndShutdown(void)
Definition: walsender.c:225
WalSnd * MyWalSnd
Definition: walsender.c:111
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2105
void WalSndSetState(WalSndState state)
Definition: walsender.c:3126
static bool streamingDoneReceiving
Definition: walsender.c:175
char * application_name
Definition: guc.c:537
int errmsg(const char *fmt,...)
Definition: elog.c:822
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:166
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:163
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1630
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1175 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1176 {
1177  /* can't have sync rep confused by sending the same LSN several times */
1178  if (!last_write)
1179  lsn = InvalidXLogRecPtr;
1180 
1181  resetStringInfo(ctx->out);
1182 
1183  pq_sendbyte(ctx->out, 'w');
1184  pq_sendint64(ctx->out, lsn); /* dataStart */
1185  pq_sendint64(ctx->out, lsn); /* walEnd */
1186 
1187  /*
1188  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1189  * reserve space here.
1190  */
1191  pq_sendint64(ctx->out, 0); /* sendtime */
1192 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
StringInfo out
Definition: logical.h:70

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2912 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2913 {
2914  int i;
2915 
2916  for (i = 0; i < max_wal_senders; i++)
2917  {
2918  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2919 
2920  SpinLockAcquire(&walsnd->mutex);
2921  if (walsnd->pid == 0)
2922  {
2923  SpinLockRelease(&walsnd->mutex);
2924  continue;
2925  }
2926  walsnd->needreload = true;
2927  SpinLockRelease(&walsnd->mutex);
2928  }
2929 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSegmentOpen()

static int WalSndSegmentOpen ( XLogSegNo  nextSegNo,
WALSegmentContext segcxt,
TimeLineID tli_p 
)
static

Definition at line 2387 of file walsender.c.

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, WALOpenSegment::ws_segno, WALSegmentContext::ws_segsize, XLByteToSeg, XLogFileName, and XLogFilePath.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2389 {
2390  char path[MAXPGPATH];
2391  int fd;
2392 
2393  /*-------
2394  * When reading from a historic timeline, and there is a timeline switch
2395  * within this segment, read from the WAL segment belonging to the new
2396  * timeline.
2397  *
2398  * For example, imagine that this server is currently on timeline 5, and
2399  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2400  * 0/13002088. In pg_wal, we have these files:
2401  *
2402  * ...
2403  * 000000040000000000000012
2404  * 000000040000000000000013
2405  * 000000050000000000000013
2406  * 000000050000000000000014
2407  * ...
2408  *
2409  * In this situation, when requested to send the WAL from segment 0x13, on
2410  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2411  * recovery prefers files from newer timelines, so if the segment was
2412  * restored from the archive on this server, the file belonging to the old
2413  * timeline, 000000040000000000000013, might not exist. Their contents are
2414  * equal up to the switchpoint, because at a timeline switch, the used
2415  * portion of the old segment is copied to the new file. -------
2416  */
2417  *tli_p = sendTimeLine;
2419  {
2420  XLogSegNo endSegNo;
2421 
2422  XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
2423  if (sendSeg->ws_segno == endSegNo)
2424  *tli_p = sendTimeLineNextTLI;
2425  }
2426 
2427  XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
2428  fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2429  if (fd >= 0)
2430  return fd;
2431 
2432  /*
2433  * If the file is not found, assume it's because the standby asked for a
2434  * too old WAL segment that has already been removed or recycled.
2435  */
2436  if (errno == ENOENT)
2437  {
2438  char xlogfname[MAXFNAMELEN];
2439  int save_errno = errno;
2440 
2441  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2442  errno = save_errno;
2443  ereport(ERROR,
2445  errmsg("requested WAL segment %s has already been removed",
2446  xlogfname)));
2447  }
2448  else
2449  ereport(ERROR,
2451  errmsg("could not open file \"%s\": %m",
2452  path)));
2453  return -1; /* keep compiler quiet */
2454 }
int wal_segment_size
Definition: xlog.c:112
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1222
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
XLogSegNo ws_segno
Definition: xlogreader.h:38
int errcode_for_file_access(void)
Definition: elog.c:631
static WALOpenSegment * sendSeg
Definition: walsender.c:131
#define ereport(elevel, rest)
Definition: elog.h:141
#define MAXFNAMELEN
static TimeLineID sendTimeLine
Definition: walsender.c:140
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:141
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:143
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:981
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:822
static bool sendTimeLineIsHistoric
Definition: walsender.c:142
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3126 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3127 {
3128  WalSnd *walsnd = MyWalSnd;
3129 
3131 
3132  if (walsnd->state == state)
3133  return;
3134 
3135  SpinLockAcquire(&walsnd->mutex);
3136  walsnd->state = state;
3137  SpinLockRelease(&walsnd->mutex);
3138 }
slock_t mutex
bool am_walsender
Definition: walsender.c:114
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:739
Definition: regguts.h:298

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3001 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3002 {
3003  bool found;
3004  int i;
3005 
3006  WalSndCtl = (WalSndCtlData *)
3007  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3008 
3009  if (!found)
3010  {
3011  /* First time through, so initialize */
3013 
3014  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3016 
3017  for (i = 0; i < max_wal_senders; i++)
3018  {
3019  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3020 
3021  SpinLockInit(&walsnd->mutex);
3022  }
3023  }
3024 }
Size WalSndShmemSize(void)
Definition: walsender.c:2989
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
#define MemSet(start, val, len)
Definition: c.h:962
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:120
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 2989 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2990 {
2991  Size size = 0;
2992 
2993  size = offsetof(WalSndCtlData, walsnds);
2994  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2995 
2996  return size;
2997 }
int max_wal_senders
Definition: walsender.c:120
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:467
#define offsetof(type, field)
Definition: c.h:662

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 225 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndErrorCleanup(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

259 {
261 
262  /* Create a per-walsender data structure in shared memory */
264 
265  /*
266  * We don't currently need any ResourceOwner in a walsender process, but
267  * if we did, we could call CreateAuxProcessResourceOwner here.
268  */
269 
270  /*
271  * Let postmaster know that we're a WAL sender. Once we've declared us as
272  * a WAL sender process, postmaster will let us outlive the bgwriter and
273  * kill us last in the shutdown sequence, so we get a chance to stream all
274  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
275  * there's no going back, and we mustn't write any WAL records after this.
276  */
279 
280  /* Initialize empty timestamp buffer for lag tracking. */
282 
283  /* Make sure we can remember the current read position in XLOG. */
284  sendSeg = (WALOpenSegment *)
289 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2308
int wal_segment_size
Definition: xlog.c:112
static WALSegmentContext * sendCxt
Definition: walsender.c:132
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir)
Definition: xlogreader.c:209
bool RecoveryInProgress(void)
Definition: xlog.c:7935
static LagTracker * lag_tracker
Definition: walsender.c:215
static WALOpenSegment * sendSeg
Definition: walsender.c:131
MemoryContext TopMemoryContext
Definition: mcxt.c:44
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
bool am_cascading_walsender
Definition: walsender.c:115

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 2969 of file walsender.c.

References die, InitializeTimeouts(), PostgresSigHupHandler(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

2970 {
2971  /* Set up signal handlers */
2972  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2973  * file */
2974  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2975  pqsignal(SIGTERM, die); /* request shutdown */
2976  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2977  InitializeTimeouts(); /* establishes SIGALRM handler */
2980  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2981  * shutdown */
2982 
2983  /* Reset some signals that are accepted by postmaster but not here */
2985 }
void InitializeTimeouts(void)
Definition: timeout.c:346
#define SIGQUIT
Definition: win32_port.h:155
#define SIGUSR1
Definition: win32_port.h:166
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2957
#define SIGCHLD
Definition: win32_port.h:164
#define SIGPIPE
Definition: win32_port.h:159
#define SIGUSR2
Definition: win32_port.h:167
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2832
#define SIGHUP
Definition: win32_port.h:154
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2872
#define SIG_IGN
Definition: win32_port.h:151
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:149
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:260
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2734
#define die(msg)
Definition: pg_test_fsync.c:96

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1292 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), UpdateSpillStats(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1293 {
1294  static TimestampTz sendTime = 0;
1296 
1297  /*
1298  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1299  * avoid flooding the lag tracker when we commit frequently.
1300  */
1301 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1302  if (!TimestampDifferenceExceeds(sendTime, now,
1304  return;
1305 
1306  LagTrackerWrite(lsn, now);
1307  sendTime = now;
1308 
1309  /*
1310  * Update statistics about transactions that spilled to disk.
1311  */
1312  UpdateSpillStats(ctx);
1313 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
static void UpdateSpillStats(LogicalDecodingContext *ctx)
Definition: walsender.c:3587
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3424
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1323 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1324 {
1325  int wakeEvents;
1326  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1327 
1328  /*
1329  * Fast path to avoid acquiring the spinlock in case we already know we
1330  * have enough WAL available. This is particularly interesting if we're
1331  * far behind.
1332  */
1333  if (RecentFlushPtr != InvalidXLogRecPtr &&
1334  loc <= RecentFlushPtr)
1335  return RecentFlushPtr;
1336 
1337  /* Get a more recent flush pointer. */
1338  if (!RecoveryInProgress())
1339  RecentFlushPtr = GetFlushRecPtr();
1340  else
1341  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1342 
1343  for (;;)
1344  {
1345  long sleeptime;
1346 
1347  /* Clear any already-pending wakeups */
1349 
1351 
1352  /* Process any requests or signals received recently */
1353  if (ConfigReloadPending)
1354  {
1355  ConfigReloadPending = false;
1358  }
1359 
1360  /* Check for input from the client */
1362 
1363  /*
1364  * If we're shutting down, trigger pending WAL to be written out,
1365  * otherwise we'd possibly end up waiting for WAL that never gets
1366  * written, because walwriter has shut down already.
1367  */
1368  if (got_STOPPING)
1370 
1371  /* Update our idea of the currently flushed position. */
1372  if (!RecoveryInProgress())
1373  RecentFlushPtr = GetFlushRecPtr();
1374  else
1375  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1376 
1377  /*
1378  * If postmaster asked us to stop, don't wait anymore.
1379  *
1380  * It's important to do this check after the recomputation of
1381  * RecentFlushPtr, so we can send all remaining data before shutting
1382  * down.
1383  */
1384  if (got_STOPPING)
1385  break;
1386 
1387  /*
1388  * We only send regular messages to the client for full decoded
1389  * transactions, but a synchronous replication and walsender shutdown
1390  * possibly are waiting for a later location. So we send pings
1391  * containing the flush location every now and then.
1392  */
1393  if (MyWalSnd->flush < sentPtr &&
1394  MyWalSnd->write < sentPtr &&
1396  {
1397  WalSndKeepalive(false);
1399  }
1400 
1401  /* check whether we're done */
1402  if (loc <= RecentFlushPtr)
1403  break;
1404 
1405  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1406  WalSndCaughtUp = true;
1407 
1408  /*
1409  * Try to flush any pending output to the client.
1410  */
1411  if (pq_flush_if_writable() != 0)
1412  WalSndShutdown();
1413 
1414  /*
1415  * If we have received CopyDone from the client, sent CopyDone
1416  * ourselves, and the output buffer is empty, it's time to exit
1417  * streaming, so fail the current WAL fetch request.
1418  */
1420  !pq_is_send_pending())
1421  break;
1422 
1423  /* die if timeout was reached */
1425 
1426  /* Send keepalive if the time has come */
1428 
1429  /*
1430  * Sleep until something happens or we time out. Also wait for the
1431  * socket becoming writable, if there's still pending output.
1432  * Otherwise we might sit on sendable output data while waiting for
1433  * new WAL to be generated. (But if we have nothing to send, we don't
1434  * want to wake on socket-writable.)
1435  */
1437 
1438  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1440 
1441  if (pq_is_send_pending())
1442  wakeEvents |= WL_SOCKET_WRITEABLE;
1443 
1444  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1445  MyProcPort->sock, sleeptime,
1447  }
1448 
1449  /* reactivate latch so WalSndLoop knows to continue */
1450  SetLatch(MyLatch);
1451  return RecentFlushPtr;
1452 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8267
int sleeptime
Definition: pg_standby.c:41
bool RecoveryInProgress(void)
Definition: xlog.c:7935
void SetLatch(Latch *latch)
Definition: latch.c:436
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3366
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11172
bool XLogBackgroundFlush(void)
Definition: xlog.c:2994
static bool streamingDoneSending
Definition: walsender.c:174
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void SyncRepInitConfig(void)
Definition: syncrep.c:398
static bool WalSndCaughtUp
Definition: walsender.c:178
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3385
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2155
static void WalSndShutdown(void)
Definition: walsender.c:225
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2105
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool streamingDoneReceiving
Definition: walsender.c:175
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:166
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1630
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3088 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3089 {
3090  for (;;)
3091  {
3092  int i;
3093  bool all_stopped = true;
3094 
3095  for (i = 0; i < max_wal_senders; i++)
3096  {
3097  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3098 
3099  SpinLockAcquire(&walsnd->mutex);
3100 
3101  if (walsnd->pid == 0)
3102  {
3103  SpinLockRelease(&walsnd->mutex);
3104  continue;
3105  }
3106 
3107  if (walsnd->state != WALSNDSTATE_STOPPING)
3108  {
3109  all_stopped = false;
3110  SpinLockRelease(&walsnd->mutex);
3111  break;
3112  }
3113  SpinLockRelease(&walsnd->mutex);
3114  }
3115 
3116  /* safe to leave if confirmation is done for all WAL senders */
3117  if (all_stopped)
3118  return;
3119 
3120  pg_usleep(10000L); /* wait for 10 msec */
3121  }
3122 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3033 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3034 {
3035  int i;
3036 
3037  for (i = 0; i < max_wal_senders; i++)
3038  {
3039  Latch *latch;
3040  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3041 
3042  /*
3043  * Get latch pointer with spinlock held, for the unlikely case that
3044  * pointer reads aren't atomic (as they're 8 bytes).
3045  */
3046  SpinLockAcquire(&walsnd->mutex);
3047  latch = walsnd->latch;
3048  SpinLockRelease(&walsnd->mutex);
3049 
3050  if (latch != NULL)
3051  SetLatch(latch);
3052  }
3053 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:436
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1202 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1204 {
1205  TimestampTz now;
1206 
1207  /*
1208  * Fill the send timestamp last, so that it is taken as late as possible.
1209  * This is somewhat ugly, but the protocol is set as it's already used for
1210  * several releases by streaming physical replication.
1211  */
1213  now = GetCurrentTimestamp();
1214  pq_sendint64(&tmpbuf, now);
1215  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1216  tmpbuf.data, sizeof(int64));
1217 
1218  /* output previously gathered data in a CopyData packet */
1219  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1220 
1222 
1223  /* Try to flush pending output to the client */
1224  if (pq_flush_if_writable() != 0)
1225  WalSndShutdown();
1226 
1227  /* Try taking fast path unless we get too close to walsender timeout. */
1229  wal_sender_timeout / 2) &&
1230  !pq_is_send_pending())
1231  {
1232  return;
1233  }
1234 
1235  /* If we have pending write here, go to slow path */
1236  for (;;)
1237  {
1238  int wakeEvents;
1239  long sleeptime;
1240 
1241  /* Check for input from the client */
1243 
1244  /* die if timeout was reached */
1246 
1247  /* Send keepalive if the time has come */
1249 
1250  if (!pq_is_send_pending())
1251  break;
1252 
1254 
1255  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1257 
1258  /* Sleep until something happens or we time out */
1259  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1260  MyProcPort->sock, sleeptime,
1262 
1263  /* Clear any already-pending wakeups */
1265 
1267 
1268  /* Process any requests or signals received recently */
1269  if (ConfigReloadPending)
1270  {
1271  ConfigReloadPending = false;
1274  }
1275 
1276  /* Try to flush pending output to the client */
1277  if (pq_flush_if_writable() != 0)
1278  WalSndShutdown();
1279  }
1280 
1281  /* reactivate latch so WalSndLoop knows to continue */
1282  SetLatch(MyLatch);
1283 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
int wal_sender_timeout
Definition: walsender.c:122
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:41
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void SetLatch(Latch *latch)
Definition: latch.c:436
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void SyncRepInitConfig(void)
Definition: syncrep.c:398
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3385
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2155
static void WalSndShutdown(void)
Definition: walsender.c:225
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2105
static StringInfoData tmpbuf
Definition: walsender.c:154
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:70
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:163
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1630
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2772 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2773 {
2774  XLogRecord *record;
2775  char *errm;
2776  XLogRecPtr flushPtr;
2777 
2778  /*
2779  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2780  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2781  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2782  * didn't wait - i.e. when we're shutting down.
2783  */
2784  WalSndCaughtUp = false;
2785 
2788 
2789  /* xlog record was invalid */
2790  if (errm != NULL)
2791  elog(ERROR, "%s", errm);
2792 
2793  /*
2794  * We'll use the current flush point to determine whether we've caught up.
2795  */
2796  flushPtr = GetFlushRecPtr();
2797 
2798  if (record != NULL)
2799  {
2800  /*
2801  * Note the lack of any call to LagTrackerWrite() which is handled by
2802  * WalSndUpdateProgress which is called by output plugin through
2803  * logical decoding write api.
2804  */
2806 
2808  }
2809 
2810  /* Set flag if we're caught up. */
2811  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2812  WalSndCaughtUp = true;
2813 
2814  /*
2815  * If we're caught up and have been requested to stop, have WalSndLoop()
2816  * terminate the connection in an orderly manner, after writing out all
2817  * the pending data.
2818  */
2820  got_SIGUSR2 = true;
2821 
2822  /* Update shared memory status */
2823  {
2824  WalSnd *walsnd = MyWalSnd;
2825 
2826  SpinLockAcquire(&walsnd->mutex);
2827  walsnd->sentPtr = sentPtr;
2828  SpinLockRelease(&walsnd->mutex);
2829  }
2830 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8267
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:238
XLogRecPtr EndRecPtr
Definition: xlogreader.h:132
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:94
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:192
static XLogRecPtr logical_startptr
Definition: walsender.c:193
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:181
static bool WalSndCaughtUp
Definition: walsender.c:178
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:149
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogReaderState * reader
Definition: logical.h:41
#define elog(elevel,...)
Definition: elog.h:228

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 2467 of file walsender.c.

References am_cascading_walsender, Assert, CheckXLogRemoved(), close, StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, WalSnd::needreload, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, startptr, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, WALRead(), WALReadRaiseError(), WalSndCaughtUp, WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, and XLByteToSeg.

Referenced by StartReplication().

2468 {
2469  XLogRecPtr SendRqstPtr;
2471  XLogRecPtr endptr;
2472  Size nbytes;
2473  XLogSegNo segno;
2474  WALReadError errinfo;
2475 
2476  /* If requested switch the WAL sender to the stopping state. */
2477  if (got_STOPPING)
2479 
2481  {
2482  WalSndCaughtUp = true;
2483  return;
2484  }
2485 
2486  /* Figure out how far we can safely send the WAL. */
2488  {
2489  /*
2490  * Streaming an old timeline that's in this server's history, but is