PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   15
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static int WalSndSegmentOpen (XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static void UpdateSpillStats (LogicalDecodingContext *ctx)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static WALOpenSegmentsendSeg = NULL
 
static WALSegmentContextsendCxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 204 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   15

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 222 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 876 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

877 {
878  const char *snapshot_name = NULL;
879  char xloc[MAXFNAMELEN];
880  char *slot_name;
881  bool reserve_wal = false;
882  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
884  TupOutputState *tstate;
885  TupleDesc tupdesc;
886  Datum values[4];
887  bool nulls[4];
888 
890 
891  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
892 
893  /* setup state for XLogRead */
894  sendTimeLineIsHistoric = false;
896 
897  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
898  {
899  ReplicationSlotCreate(cmd->slotname, false,
901  }
902  else
903  {
905 
906  /*
907  * Initially create persistent slot as ephemeral - that allows us to
908  * nicely handle errors during initialization because it'll get
909  * dropped if this transaction fails. We'll make it persistent at the
910  * end. Temporary slots can be created as temporary from beginning as
911  * they get dropped on error as well.
912  */
913  ReplicationSlotCreate(cmd->slotname, true,
915  }
916 
917  if (cmd->kind == REPLICATION_KIND_LOGICAL)
918  {
920  bool need_full_snapshot = false;
921 
922  /*
923  * Do options check early so that we can bail before calling the
924  * DecodingContextFindStartpoint which can take long time.
925  */
926  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
927  {
928  if (IsTransactionBlock())
929  ereport(ERROR,
930  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
931  (errmsg("%s must not be called inside a transaction",
932  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
933 
934  need_full_snapshot = true;
935  }
936  else if (snapshot_action == CRS_USE_SNAPSHOT)
937  {
938  if (!IsTransactionBlock())
939  ereport(ERROR,
940  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
941  (errmsg("%s must be called inside a transaction",
942  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
943 
945  ereport(ERROR,
946  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
947  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
948  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
949 
950  if (FirstSnapshotSet)
951  ereport(ERROR,
952  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
953  (errmsg("%s must be called before any query",
954  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
955 
956  if (IsSubTransaction())
957  ereport(ERROR,
958  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
959  (errmsg("%s must not be called in a subtransaction",
960  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
961 
962  need_full_snapshot = true;
963  }
964 
965  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
970 
971  /*
972  * Signal that we don't need the timeout mechanism. We're just
973  * creating the replication slot and don't yet accept feedback
974  * messages or send keepalives. As we possibly need to wait for
975  * further WAL the walsender would otherwise possibly be killed too
976  * soon.
977  */
979 
980  /* build initial snapshot, might take a while */
982 
983  /*
984  * Export or use the snapshot if we've been asked to do so.
985  *
986  * NB. We will convert the snapbuild.c kind of snapshot to normal
987  * snapshot when doing this.
988  */
989  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
990  {
991  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
992  }
993  else if (snapshot_action == CRS_USE_SNAPSHOT)
994  {
995  Snapshot snap;
996 
999  }
1000 
1001  /* don't need the decoding context anymore */
1002  FreeDecodingContext(ctx);
1003 
1004  if (!cmd->temporary)
1006  }
1007  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1008  {
1010 
1012 
1013  /* Write this slot to disk if it's a permanent one. */
1014  if (!cmd->temporary)
1016  }
1017 
1018  snprintf(xloc, sizeof(xloc), "%X/%X",
1021 
1023  MemSet(nulls, false, sizeof(nulls));
1024 
1025  /*----------
1026  * Need a tuple descriptor representing four columns:
1027  * - first field: the slot name
1028  * - second field: LSN at which we became consistent
1029  * - third field: exported snapshot's name
1030  * - fourth field: output plugin
1031  *----------
1032  */
1033  tupdesc = CreateTemplateTupleDesc(4);
1034  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1035  TEXTOID, -1, 0);
1036  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1037  TEXTOID, -1, 0);
1038  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1039  TEXTOID, -1, 0);
1040  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1041  TEXTOID, -1, 0);
1042 
1043  /* prepare for projection of tuples */
1044  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1045 
1046  /* slot_name */
1047  slot_name = NameStr(MyReplicationSlot->data.name);
1048  values[0] = CStringGetTextDatum(slot_name);
1049 
1050  /* consistent wal location */
1051  values[1] = CStringGetTextDatum(xloc);
1052 
1053  /* snapshot name, or NULL if none */
1054  if (snapshot_name != NULL)
1055  values[2] = CStringGetTextDatum(snapshot_name);
1056  else
1057  nulls[2] = true;
1058 
1059  /* plugin, or NULL if none */
1060  if (cmd->plugin != NULL)
1061  values[3] = CStringGetTextDatum(cmd->plugin);
1062  else
1063  nulls[3] = true;
1064 
1065  /* send it to dest */
1066  do_tup_output(tstate, values, nulls);
1067  end_tup_output(tstate);
1068 
1070 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:823
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:38
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2225
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define MemSet(start, val, len)
Definition: c.h:962
void ReplicationSlotSave(void)
Definition: slot.c:645
ReplicationSlotPersistentData data
Definition: slot.h:132
XLogRecPtr confirmed_flush
Definition: slot.h:80
ReplicationKind kind
Definition: replnodes.h:56
bool IsTransactionBlock(void)
Definition: xact.c:4635
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:631
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:462
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:205
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
void ReplicationSlotPersist(void)
Definition: slot.c:680
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1203
unsigned int uint32
Definition: c.h:359
void ReplicationSlotRelease(void)
Definition: slot.c:424
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1176
#define MAXFNAMELEN
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:227
uintptr_t Datum
Definition: postgres.h:367
TimeLineID ThisTimeLineID
Definition: xlog.c:187
struct SnapBuild * snapshot_builder
Definition: logical.h:43
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:141
#define Assert(condition)
Definition: c.h:739
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:508
int XactIsoLevel
Definition: xact.c:74
bool IsSubTransaction(void)
Definition: xact.c:4708
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:539
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
CRSSnapshotAction
Definition: walsender.h:20
#define NameStr(name)
Definition: c.h:616
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:767
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1293

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1076 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1077 {
1078  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1079  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1080 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1462 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, generate_unaccent_rules::dest, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_IDLE, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1463 {
1464  int parse_rc;
1465  Node *cmd_node;
1466  MemoryContext cmd_context;
1467  MemoryContext old_context;
1468 
1469  /*
1470  * If WAL sender has been told that shutdown is getting close, switch its
1471  * status accordingly to handle the next replication commands correctly.
1472  */
1473  if (got_STOPPING)
1475 
1476  /*
1477  * Throw error if in stopping mode. We need prevent commands that could
1478  * generate WAL while the shutdown checkpoint is being written. To be
1479  * safe, we just prohibit all new commands.
1480  */
1482  ereport(ERROR,
1483  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1484 
1485  /*
1486  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1487  * command arrives. Clean up the old stuff if there's anything.
1488  */
1490 
1492 
1494  "Replication command context",
1496  old_context = MemoryContextSwitchTo(cmd_context);
1497 
1498  replication_scanner_init(cmd_string);
1499  parse_rc = replication_yyparse();
1500  if (parse_rc != 0)
1501  ereport(ERROR,
1502  (errcode(ERRCODE_SYNTAX_ERROR),
1503  (errmsg_internal("replication command parser returned %d",
1504  parse_rc))));
1505 
1506  cmd_node = replication_parse_result;
1507 
1508  /*
1509  * Log replication command if log_replication_commands is enabled. Even
1510  * when it's disabled, log the command with DEBUG1 level for backward
1511  * compatibility. Note that SQL commands are not logged here, and will be
1512  * logged later if log_statement is enabled.
1513  */
1514  if (cmd_node->type != T_SQLCmd)
1516  (errmsg("received replication command: %s", cmd_string)));
1517 
1518  /*
1519  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1520  * called outside of transaction the snapshot should be cleared here.
1521  */
1522  if (!IsTransactionBlock())
1524 
1525  /*
1526  * For aborted transactions, don't allow anything except pure SQL, the
1527  * exec_simple_query() will handle it correctly.
1528  */
1529  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1530  ereport(ERROR,
1531  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1532  errmsg("current transaction is aborted, "
1533  "commands ignored until end of transaction block")));
1534 
1536 
1537  /*
1538  * Allocate buffers that will be used for each outgoing and incoming
1539  * message. We do this just once per command to reduce palloc overhead.
1540  */
1544 
1545  /* Report to pgstat that this process is running */
1547 
1548  switch (cmd_node->type)
1549  {
1550  case T_IdentifySystemCmd:
1551  IdentifySystem();
1552  break;
1553 
1554  case T_BaseBackupCmd:
1555  PreventInTransactionBlock(true, "BASE_BACKUP");
1556  SendBaseBackup((BaseBackupCmd *) cmd_node);
1557  break;
1558 
1561  break;
1562 
1565  break;
1566 
1567  case T_StartReplicationCmd:
1568  {
1569  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1570 
1571  PreventInTransactionBlock(true, "START_REPLICATION");
1572 
1573  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1574  StartReplication(cmd);
1575  else
1577  break;
1578  }
1579 
1580  case T_TimeLineHistoryCmd:
1581  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1583  break;
1584 
1585  case T_VariableShowStmt:
1586  {
1588  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1589 
1590  /* syscache access needs a transaction environment */
1592  GetPGVariable(n->name, dest);
1594  }
1595  break;
1596 
1597  case T_SQLCmd:
1598  if (MyDatabaseId == InvalidOid)
1599  ereport(ERROR,
1600  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1601 
1602  /* Report to pgstat that this process is now idle */
1604 
1605  /* Tell the caller that this wasn't a WalSender command. */
1606  return false;
1607 
1608  default:
1609  elog(ERROR, "unrecognized replication command node tag: %u",
1610  cmd_node->type);
1611  }
1612 
1613  /* done */
1614  MemoryContextSwitchTo(old_context);
1615  MemoryContextDelete(cmd_context);
1616 
1617  /* Send CommandComplete message */
1618  EndCommand("SELECT", DestRemote);
1619 
1620  /* Report to pgstat that this process is now idle */
1622 
1623  return true;
1624 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3114
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:436
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1076
void CommitTransactionCommand(void)
Definition: xact.c:2898
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:375
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:525
static StringInfoData output_message
Definition: walsender.c:153
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:183
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8814
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4635
ReplicationKind kind
Definition: replnodes.h:82
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:747
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
WalSndState state
NodeTag type
Definition: nodes.h:527
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:542
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3331
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define ereport(elevel, rest)
Definition: elog.h:141
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static StringInfoData reply_message
Definition: walsender.c:154
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1087
Oid MyDatabaseId
Definition: globals.c:85
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
void WalSndSetState(WalSndState state)
Definition: walsender.c:3137
void StartTransactionCommand(void)
Definition: xact.c:2797
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:876
static StringInfoData tmpbuf
Definition: walsender.c:155
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void IdentifySystem(void)
Definition: walsender.c:347
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:692
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2894 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2895 {
2896  XLogRecPtr replayPtr;
2897  TimeLineID replayTLI;
2898  XLogRecPtr receivePtr;
2900  XLogRecPtr result;
2901 
2902  /*
2903  * We can safely send what's already been replayed. Also, if walreceiver
2904  * is streaming WAL from the same timeline, we can send anything that it
2905  * has streamed, but hasn't been replayed yet.
2906  */
2907 
2908  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2909  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2910 
2911  ThisTimeLineID = replayTLI;
2912 
2913  result = replayPtr;
2914  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2915  result = receivePtr;
2916 
2917  return result;
2918 }
uint32 TimeLineID
Definition: xlogdefs.h:52
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11193
static TimeLineID receiveTLI
Definition: xlog.c:209
TimeLineID ThisTimeLineID
Definition: xlog.c:187
uint64 XLogRecPtr
Definition: xlogdefs.h:21

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 2947 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

2948 {
2950 
2951  /*
2952  * If replication has not yet started, die like with SIGTERM. If
2953  * replication is active, only set a flag and wake up the main loop. It
2954  * will send any outstanding WAL, wait for it to be replicated to the
2955  * standby, and then exit gracefully.
2956  */
2957  if (!replication_active)
2958  kill(MyProcPid, SIGTERM);
2959  else
2960  got_STOPPING = true;
2961 }
int MyProcPid
Definition: globals.c:40
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:183
#define kill(pid, sig)
Definition: win32_port.h:426
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:191
#define Assert(condition)
Definition: c.h:739

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 347 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

348 {
349  char sysid[32];
350  char xloc[MAXFNAMELEN];
351  XLogRecPtr logptr;
352  char *dbname = NULL;
354  TupOutputState *tstate;
355  TupleDesc tupdesc;
356  Datum values[4];
357  bool nulls[4];
358 
359  /*
360  * Reply with a result set with one row, four columns. First col is system
361  * ID, second is timeline ID, third is current xlog location and the
362  * fourth contains the database name if we are connected to one.
363  */
364 
365  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
367 
370  {
371  /* this also updates ThisTimeLineID */
372  logptr = GetStandbyFlushRecPtr();
373  }
374  else
375  logptr = GetFlushRecPtr();
376 
377  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
378 
379  if (MyDatabaseId != InvalidOid)
380  {
382 
383  /* syscache access needs a transaction env. */
385  /* make dbname live outside TX context */
389  /* CommitTransactionCommand switches to TopMemoryContext */
391  }
392 
394  MemSet(nulls, false, sizeof(nulls));
395 
396  /* need a tuple descriptor representing four columns */
397  tupdesc = CreateTemplateTupleDesc(4);
398  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
399  TEXTOID, -1, 0);
400  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
401  INT4OID, -1, 0);
402  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
403  TEXTOID, -1, 0);
404  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
405  TEXTOID, -1, 0);
406 
407  /* prepare for projection of tuples */
408  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
409 
410  /* column 1: system identifier */
411  values[0] = CStringGetTextDatum(sysid);
412 
413  /* column 2: timeline */
414  values[1] = Int32GetDatum(ThisTimeLineID);
415 
416  /* column 3: wal location */
417  values[2] = CStringGetTextDatum(xloc);
418 
419  /* column 4: database name, or NULL if none */
420  if (dbname)
421  values[3] = CStringGetTextDatum(dbname);
422  else
423  nulls[3] = true;
424 
425  /* send it to dest */
426  do_tup_output(tstate, values, nulls);
427 
428  end_tup_output(tstate);
429 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void CommitTransactionCommand(void)
Definition: xact.c:2898
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:962
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8262
bool RecoveryInProgress(void)
Definition: xlog.c:7930
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
unsigned int uint32
Definition: c.h:359
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:187
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2797
char * dbname
Definition: streamutil.c:50
static Datum values[MAXATTR]
Definition: bootstrap.c:167
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4798
#define Int32GetDatum(X)
Definition: postgres.h:479
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2894
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:402
bool am_cascading_walsender
Definition: walsender.c:116

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2309 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, WalSnd::spillBytes, WalSnd::spillCount, WalSnd::spillTxns, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2310 {
2311  int i;
2312 
2313  /*
2314  * WalSndCtl should be set up already (we inherit this by fork() or
2315  * EXEC_BACKEND mechanism from the postmaster).
2316  */
2317  Assert(WalSndCtl != NULL);
2318  Assert(MyWalSnd == NULL);
2319 
2320  /*
2321  * Find a free walsender slot and reserve it. This must not fail due to
2322  * the prior check for free WAL senders in InitProcess().
2323  */
2324  for (i = 0; i < max_wal_senders; i++)
2325  {
2326  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2327 
2328  SpinLockAcquire(&walsnd->mutex);
2329 
2330  if (walsnd->pid != 0)
2331  {
2332  SpinLockRelease(&walsnd->mutex);
2333  continue;
2334  }
2335  else
2336  {
2337  /*
2338  * Found a free slot. Reserve it for us.
2339  */
2340  walsnd->pid = MyProcPid;
2341  walsnd->sentPtr = InvalidXLogRecPtr;
2342  walsnd->write = InvalidXLogRecPtr;
2343  walsnd->flush = InvalidXLogRecPtr;
2344  walsnd->apply = InvalidXLogRecPtr;
2345  walsnd->writeLag = -1;
2346  walsnd->flushLag = -1;
2347  walsnd->applyLag = -1;
2348  walsnd->state = WALSNDSTATE_STARTUP;
2349  walsnd->latch = &MyProc->procLatch;
2350  walsnd->replyTime = 0;
2351  walsnd->spillTxns = 0;
2352  walsnd->spillCount = 0;
2353  walsnd->spillBytes = 0;
2354  SpinLockRelease(&walsnd->mutex);
2355  /* don't need the lock anymore */
2356  MyWalSnd = (WalSnd *) walsnd;
2357 
2358  break;
2359  }
2360  }
2361 
2362  Assert(MyWalSnd != NULL);
2363 
2364  /* Arrange to clean up at walsender exit */
2366 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:40
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2370
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:104
int64 spillCount
int64 spillBytes
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
Latch * latch
int64 spillTxns
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:739
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3499 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3500 {
3501  TimestampTz time = 0;
3502 
3503  /* Read all unread samples up to this LSN or end of buffer. */
3504  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3505  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3506  {
3507  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3508  lag_tracker->last_read[head] =
3510  lag_tracker->read_heads[head] =
3512  }
3513 
3514  /*
3515  * If the lag tracker is empty, that means the standby has processed
3516  * everything we've ever sent so we should now clear 'last_read'. If we
3517  * didn't do that, we'd risk using a stale and irrelevant sample for
3518  * interpolation at the beginning of the next burst of WAL after a period
3519  * of idleness.
3520  */
3522  lag_tracker->last_read[head].time = 0;
3523 
3524  if (time > now)
3525  {
3526  /* If the clock somehow went backwards, treat as not found. */
3527  return -1;
3528  }
3529  else if (time == 0)
3530  {
3531  /*
3532  * We didn't cross a time. If there is a future sample that we
3533  * haven't reached yet, and we've already reached at least one sample,
3534  * let's interpolate the local flushed time. This is mainly useful
3535  * for reporting a completely stuck apply position as having
3536  * increasing lag, since otherwise we'd have to wait for it to
3537  * eventually start moving again and cross one of our samples before
3538  * we can show the lag increasing.
3539  */
3541  {
3542  /* There are no future samples, so we can't interpolate. */
3543  return -1;
3544  }
3545  else if (lag_tracker->last_read[head].time != 0)
3546  {
3547  /* We can interpolate between last_read and the next sample. */
3548  double fraction;
3549  WalTimeSample prev = lag_tracker->last_read[head];
3551 
3552  if (lsn < prev.lsn)
3553  {
3554  /*
3555  * Reported LSNs shouldn't normally go backwards, but it's
3556  * possible when there is a timeline change. Treat as not
3557  * found.
3558  */
3559  return -1;
3560  }
3561 
3562  Assert(prev.lsn < next.lsn);
3563 
3564  if (prev.time > next.time)
3565  {
3566  /* If the clock somehow went backwards, treat as not found. */
3567  return -1;
3568  }
3569 
3570  /* See how far we are between the previous and next samples. */
3571  fraction =
3572  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3573 
3574  /* Scale the local flush time proportionally. */
3575  time = (TimestampTz)
3576  ((double) prev.time + (next.time - prev.time) * fraction);
3577  }
3578  else
3579  {
3580  /*
3581  * We have only a future sample, implying that we were entirely
3582  * caught up but and now there is a new burst of WAL and the
3583  * standby hasn't processed the first sample yet. Until the
3584  * standby reaches the future sample the best we can do is report
3585  * the hypothetical lag if that sample were to be replayed now.
3586  */
3587  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3588  }
3589  }
3590 
3591  /* Return the elapsed time since local flush time in microseconds. */
3592  Assert(time != 0);
3593  return now - time;
3594 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:210
static int32 next
Definition: blutils.c:217
int write_head
Definition: walsender.c:211
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:213
TimestampTz time
Definition: walsender.c:200
static LagTracker * lag_tracker
Definition: walsender.c:216
XLogRecPtr lsn
Definition: walsender.c:199
#define Assert(condition)
Definition: c.h:739
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:212
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:204
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3434 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3435 {
3436  bool buffer_full;
3437  int new_write_head;
3438  int i;
3439 
3440  if (!am_walsender)
3441  return;
3442 
3443  /*
3444  * If the lsn hasn't advanced since last time, then do nothing. This way
3445  * we only record a new sample when new WAL has been written.
3446  */
3447  if (lag_tracker->last_lsn == lsn)
3448  return;
3449  lag_tracker->last_lsn = lsn;
3450 
3451  /*
3452  * If advancing the write head of the circular buffer would crash into any
3453  * of the read heads, then the buffer is full. In other words, the
3454  * slowest reader (presumably apply) is the one that controls the release
3455  * of space.
3456  */
3457  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3458  buffer_full = false;
3459  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3460  {
3461  if (new_write_head == lag_tracker->read_heads[i])
3462  buffer_full = true;
3463  }
3464 
3465  /*
3466  * If the buffer is full, for now we just rewind by one slot and overwrite
3467  * the last sample, as a simple (if somewhat uneven) way to lower the
3468  * sampling rate. There may be better adaptive compaction algorithms.
3469  */
3470  if (buffer_full)
3471  {
3472  new_write_head = lag_tracker->write_head;
3473  if (lag_tracker->write_head > 0)
3475  else
3477  }
3478 
3479  /* Store a sample at the current write head position. */
3481  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3482  lag_tracker->write_head = new_write_head;
3483 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:210
int write_head
Definition: walsender.c:211
TimestampTz time
Definition: walsender.c:200
static LagTracker * lag_tracker
Definition: walsender.c:216
bool am_walsender
Definition: walsender.c:115
XLogRecPtr lsn
Definition: walsender.c:199
XLogRecPtr last_lsn
Definition: walsender.c:209
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:212
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:204
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 767 of file walsender.c.

References CheckXLogRemoved(), XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WALRead(), WALReadRaiseError(), WalSndSegmentOpen(), WalSndWaitForWal(), WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

769 {
770  XLogRecPtr flushptr;
771  int count;
772  WALReadError errinfo;
773  XLogSegNo segno;
774 
775  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
777  sendTimeLine = state->currTLI;
779  sendTimeLineNextTLI = state->nextTLI;
780 
781  /* make sure we have enough WAL available */
782  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
783 
784  /* fail if not (implies we are going to shut down) */
785  if (flushptr < targetPagePtr + reqLen)
786  return -1;
787 
788  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
789  count = XLOG_BLCKSZ; /* more than one block available */
790  else
791  count = flushptr - targetPagePtr; /* part of the page available */
792 
793  /* now actually read the data, we know it's there */
794  if (!WALRead(cur_page,
795  targetPagePtr,
796  XLOG_BLCKSZ,
797  sendSeg->ws_tli, /* Pass the current TLI because only
798  * WalSndSegmentOpen controls whether new
799  * TLI is needed. */
800  sendSeg,
801  sendCxt,
803  &errinfo))
804  WALReadRaiseError(&errinfo);
805 
806  /*
807  * After reading into the buffer, check that what we read was valid. We do
808  * this after reading, because even though the segment was present when we
809  * opened it, it might get recycled or removed while we read it. The
810  * read() succeeds in that case, but the data we tried to read might
811  * already have been overwritten with new WAL records.
812  */
813  XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
815 
816  return count;
817 }
static WALSegmentContext * sendCxt
Definition: walsender.c:133
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:944
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:681
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3859
uint64 XLogSegNo
Definition: xlogdefs.h:41
bool WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, WALSegmentContext *segcxt, WALSegmentOpen openSegment, WALReadError *errinfo)
Definition: xlogreader.c:1033
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:192
static WALOpenSegment * sendSeg
Definition: walsender.c:132
static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p)
Definition: walsender.c:2388
TimeLineID nextTLI
Definition: xlogreader.h:198
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1324
TimeLineID ThisTimeLineID
Definition: xlog.c:187
TimeLineID currTLI
Definition: xlogreader.h:182
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
TimeLineID ws_tli
Definition: xlogreader.h:39
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3175 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3176 {
3177  Interval *result = palloc(sizeof(Interval));
3178 
3179  result->month = 0;
3180  result->day = 0;
3181  result->time = offset;
3182 
3183  return result;
3184 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:949

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 823 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

826 {
827  ListCell *lc;
828  bool snapshot_action_given = false;
829  bool reserve_wal_given = false;
830 
831  /* Parse options */
832  foreach(lc, cmd->options)
833  {
834  DefElem *defel = (DefElem *) lfirst(lc);
835 
836  if (strcmp(defel->defname, "export_snapshot") == 0)
837  {
838  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
839  ereport(ERROR,
840  (errcode(ERRCODE_SYNTAX_ERROR),
841  errmsg("conflicting or redundant options")));
842 
843  snapshot_action_given = true;
844  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
846  }
847  else if (strcmp(defel->defname, "use_snapshot") == 0)
848  {
849  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
850  ereport(ERROR,
851  (errcode(ERRCODE_SYNTAX_ERROR),
852  errmsg("conflicting or redundant options")));
853 
854  snapshot_action_given = true;
855  *snapshot_action = CRS_USE_SNAPSHOT;
856  }
857  else if (strcmp(defel->defname, "reserve_wal") == 0)
858  {
859  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
860  ereport(ERROR,
861  (errcode(ERRCODE_SYNTAX_ERROR),
862  errmsg("conflicting or redundant options")));
863 
864  reserve_wal_given = true;
865  *reserve_wal = true;
866  }
867  else
868  elog(ERROR, "unrecognized option: %s", defel->defname);
869  }
870 }
int errcode(int sqlerrcode)
Definition: elog.c:608
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
char * defname
Definition: parsenodes.h:730

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3191 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, Int64GetDatum(), IntervalPGetDatum, is_member_of_role(), IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, WalSnd::spillBytes, WalSnd::spillCount, WalSnd::spillTxns, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3192 {
3193 #define PG_STAT_GET_WAL_SENDERS_COLS 15
3194  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3195  TupleDesc tupdesc;
3196  Tuplestorestate *tupstore;
3197  MemoryContext per_query_ctx;
3198  MemoryContext oldcontext;
3199  List *sync_standbys;
3200  int i;
3201 
3202  /* check to see if caller supports us returning a tuplestore */
3203  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3204  ereport(ERROR,
3205  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3206  errmsg("set-valued function called in context that cannot accept a set")));
3207  if (!(rsinfo->allowedModes & SFRM_Materialize))
3208  ereport(ERROR,
3209  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3210  errmsg("materialize mode required, but it is not allowed in this context")));
3211 
3212  /* Build a tuple descriptor for our result type */
3213  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3214  elog(ERROR, "return type must be a row type");
3215 
3216  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3217  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3218 
3219  tupstore = tuplestore_begin_heap(true, false, work_mem);
3220  rsinfo->returnMode = SFRM_Materialize;
3221  rsinfo->setResult = tupstore;
3222  rsinfo->setDesc = tupdesc;
3223 
3224  MemoryContextSwitchTo(oldcontext);
3225 
3226  /*
3227  * Get the currently active synchronous standbys.
3228  */
3229  LWLockAcquire(SyncRepLock, LW_SHARED);
3230  sync_standbys = SyncRepGetSyncStandbys(NULL);
3231  LWLockRelease(SyncRepLock);
3232 
3233  for (i = 0; i < max_wal_senders; i++)
3234  {
3235  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3237  XLogRecPtr write;
3238  XLogRecPtr flush;
3239  XLogRecPtr apply;
3240  TimeOffset writeLag;
3241  TimeOffset flushLag;
3242  TimeOffset applyLag;
3243  int priority;
3244  int pid;
3246  TimestampTz replyTime;
3247  int64 spillTxns;
3248  int64 spillCount;
3249  int64 spillBytes;
3251  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3252 
3253  SpinLockAcquire(&walsnd->mutex);
3254  if (walsnd->pid == 0)
3255  {
3256  SpinLockRelease(&walsnd->mutex);
3257  continue;
3258  }
3259  pid = walsnd->pid;
3260  sentPtr = walsnd->sentPtr;
3261  state = walsnd->state;
3262  write = walsnd->write;
3263  flush = walsnd->flush;
3264  apply = walsnd->apply;
3265  writeLag = walsnd->writeLag;
3266  flushLag = walsnd->flushLag;
3267  applyLag = walsnd->applyLag;
3268  priority = walsnd->sync_standby_priority;
3269  replyTime = walsnd->replyTime;
3270  spillTxns = walsnd->spillTxns;
3271  spillCount = walsnd->spillCount;
3272  spillBytes = walsnd->spillBytes;
3273  SpinLockRelease(&walsnd->mutex);
3274 
3275  memset(nulls, 0, sizeof(nulls));
3276  values[0] = Int32GetDatum(pid);
3277 
3278  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3279  {
3280  /*
3281  * Only superusers and members of pg_read_all_stats can see
3282  * details. Other users only get the pid value to know it's a
3283  * walsender, but no details.
3284  */
3285  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3286  }
3287  else
3288  {
3289  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3290 
3291  if (XLogRecPtrIsInvalid(sentPtr))
3292  nulls[2] = true;
3293  values[2] = LSNGetDatum(sentPtr);
3294 
3295  if (XLogRecPtrIsInvalid(write))
3296  nulls[3] = true;
3297  values[3] = LSNGetDatum(write);
3298 
3299  if (XLogRecPtrIsInvalid(flush))
3300  nulls[4] = true;
3301  values[4] = LSNGetDatum(flush);
3302 
3303  if (XLogRecPtrIsInvalid(apply))
3304  nulls[5] = true;
3305  values[5] = LSNGetDatum(apply);
3306 
3307  /*
3308  * Treat a standby such as a pg_basebackup background process
3309  * which always returns an invalid flush location, as an
3310  * asynchronous standby.
3311  */
3312  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3313 
3314  if (writeLag < 0)
3315  nulls[6] = true;
3316  else
3317  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3318 
3319  if (flushLag < 0)
3320  nulls[7] = true;
3321  else
3322  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3323 
3324  if (applyLag < 0)
3325  nulls[8] = true;
3326  else
3327  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3328 
3329  values[9] = Int32GetDatum(priority);
3330 
3331  /*
3332  * More easily understood version of standby state. This is purely
3333  * informational.
3334  *
3335  * In quorum-based sync replication, the role of each standby
3336  * listed in synchronous_standby_names can be changing very
3337  * frequently. Any standbys considered as "sync" at one moment can
3338  * be switched to "potential" ones at the next moment. So, it's
3339  * basically useless to report "sync" or "potential" as their sync
3340  * states. We report just "quorum" for them.
3341  */
3342  if (priority == 0)
3343  values[10] = CStringGetTextDatum("async");
3344  else if (list_member_int(sync_standbys, i))
3346  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3347  else
3348  values[10] = CStringGetTextDatum("potential");
3349 
3350  if (replyTime == 0)
3351  nulls[11] = true;
3352  else
3353  values[11] = TimestampTzGetDatum(replyTime);
3354 
3355  /* spill to disk */
3356  values[12] = Int64GetDatum(spillTxns);
3357  values[13] = Int64GetDatum(spillCount);
3358  values[14] = Int64GetDatum(spillBytes);
3359  }
3360 
3361  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3362  }
3363 
3364  /* clean up and return the tuplestore */
3365  tuplestore_donestoring(tupstore);
3366 
3367  return (Datum) 0;
3368 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:380
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:962
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:700
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
int64 spillCount
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
int64 spillBytes
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
int64 spillTxns
bool list_member_int(const List *list, int datum)
Definition: list.c:655
WalSndState state
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1699
#define ereport(elevel, rest)
Definition: elog.h:141
int max_wal_senders
Definition: walsender.c:121
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3156
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:121
static XLogRecPtr sentPtr
Definition: walsender.c:150
int allowedModes
Definition: execnodes.h:302
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4924
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:304
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:230
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:307
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:300
#define Int32GetDatum(X)
Definition: postgres.h:479
TupleDesc setDesc
Definition: execnodes.h:308
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:83
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3175
XLogRecPtr apply
Definition: pg_list.h:50

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1766 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1767 {
1768  bool changed = false;
1770 
1771  Assert(lsn != InvalidXLogRecPtr);
1772  SpinLockAcquire(&slot->mutex);
1773  if (slot->data.restart_lsn != lsn)
1774  {
1775  changed = true;
1776  slot->data.restart_lsn = lsn;
1777  }
1778  SpinLockRelease(&slot->mutex);
1779 
1780  if (changed)
1781  {
1784  }
1785 
1786  /*
1787  * One could argue that the slot should be saved to disk now, but that'd
1788  * be energy wasted - the worst lost information can do here is give us
1789  * wrong information in a statistics view - we'll just potentially be more
1790  * conservative in removing files.
1791  */
1792 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:132
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
XLogRecPtr restart_lsn
Definition: slot.h:72
slock_t mutex
Definition: slot.h:105
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1903 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1904 {
1905  bool changed = false;
1907 
1908  SpinLockAcquire(&slot->mutex);
1910 
1911  /*
1912  * For physical replication we don't need the interlock provided by xmin
1913  * and effective_xmin since the consequences of a missed increase are
1914  * limited to query cancellations, so set both at once.
1915  */
1916  if (!TransactionIdIsNormal(slot->data.xmin) ||
1917  !TransactionIdIsNormal(feedbackXmin) ||
1918  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1919  {
1920  changed = true;
1921  slot->data.xmin = feedbackXmin;
1922  slot->effective_xmin = feedbackXmin;
1923  }
1924  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1925  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1926  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1927  {
1928  changed = true;
1929  slot->data.catalog_xmin = feedbackCatalogXmin;
1930  slot->effective_catalog_xmin = feedbackCatalogXmin;
1931  }
1932  SpinLockRelease(&slot->mutex);
1933 
1934  if (changed)
1935  {
1938  }
1939 }
TransactionId xmin
Definition: proc.h:228
ReplicationSlotPersistentData data
Definition: slot.h:132
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:128
TransactionId catalog_xmin
Definition: slot.h:69
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:61
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:129
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:105
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1631 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1632 {
1633  unsigned char firstchar;
1634  int r;
1635  bool received = false;
1636 
1638 
1639  for (;;)
1640  {
1641  pq_startmsgread();
1642  r = pq_getbyte_if_available(&firstchar);
1643  if (r < 0)
1644  {
1645  /* unexpected error or EOF */
1647  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1648  errmsg("unexpected EOF on standby connection")));
1649  proc_exit(0);
1650  }
1651  if (r == 0)
1652  {
1653  /* no data available without blocking */
1654  pq_endmsgread();
1655  break;
1656  }
1657 
1658  /* Read the message contents */
1660  if (pq_getmessage(&reply_message, 0))
1661  {
1663  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1664  errmsg("unexpected EOF on standby connection")));
1665  proc_exit(0);
1666  }
1667 
1668  /*
1669  * If we already received a CopyDone from the frontend, the frontend
1670  * should not send us anything until we've closed our end of the COPY.
1671  * XXX: In theory, the frontend could already send the next command
1672  * before receiving the CopyDone, but libpq doesn't currently allow
1673  * that.
1674  */
1675  if (streamingDoneReceiving && firstchar != 'X')
1676  ereport(FATAL,
1677  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1678  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1679  firstchar)));
1680 
1681  /* Handle the very limited subset of commands expected in this phase */
1682  switch (firstchar)
1683  {
1684  /*
1685  * 'd' means a standby reply wrapped in a CopyData packet.
1686  */
1687  case 'd':
1689  received = true;
1690  break;
1691 
1692  /*
1693  * CopyDone means the standby requested to finish streaming.
1694  * Reply with CopyDone, if we had not sent that already.
1695  */
1696  case 'c':
1697  if (!streamingDoneSending)
1698  {
1699  pq_putmessage_noblock('c', NULL, 0);
1700  streamingDoneSending = true;
1701  }
1702 
1703  streamingDoneReceiving = true;
1704  received = true;
1705  break;
1706 
1707  /*
1708  * 'X' means that the standby is closing down the socket.
1709  */
1710  case 'X':
1711  proc_exit(0);
1712 
1713  default:
1714  ereport(FATAL,
1715  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1716  errmsg("invalid standby message type \"%c\"",
1717  firstchar)));
1718  }
1719  }
1720 
1721  /*
1722  * Save the last reply timestamp if we've received at least one reply.
1723  */
1724  if (received)
1725  {
1727  waiting_for_ping_response = false;
1728  }
1729 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1735
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:608
static TimestampTz last_processing
Definition: walsender.c:158
void pq_startmsgread(void)
Definition: pqcomm.c:1211
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1041
static bool streamingDoneSending
Definition: walsender.c:175
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
#define ereport(elevel, rest)
Definition: elog.h:141
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1273
static StringInfoData reply_message
Definition: walsender.c:154
void pq_endmsgread(void)
Definition: pqcomm.c:1235
static bool streamingDoneReceiving
Definition: walsender.c:176
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:167
static TimestampTz last_reply_timestamp
Definition: walsender.c:164

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1983 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, log_min_messages, WalSnd::mutex, MyPgXact, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1984 {
1985  TransactionId feedbackXmin;
1986  uint32 feedbackEpoch;
1987  TransactionId feedbackCatalogXmin;
1988  uint32 feedbackCatalogEpoch;
1989  TimestampTz replyTime;
1990 
1991  /*
1992  * Decipher the reply message. The caller already consumed the msgtype
1993  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1994  * of this message.
1995  */
1996  replyTime = pq_getmsgint64(&reply_message);
1997  feedbackXmin = pq_getmsgint(&reply_message, 4);
1998  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1999  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2000  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2001 
2002  if (log_min_messages <= DEBUG2)
2003  {
2004  char *replyTimeStr;
2005 
2006  /* Copy because timestamptz_to_str returns a static buffer */
2007  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2008 
2009  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2010  feedbackXmin,
2011  feedbackEpoch,
2012  feedbackCatalogXmin,
2013  feedbackCatalogEpoch,
2014  replyTimeStr);
2015 
2016  pfree(replyTimeStr);
2017  }
2018 
2019  /*
2020  * Update shared state for this WalSender process based on reply data from
2021  * standby.
2022  */
2023  {
2024  WalSnd *walsnd = MyWalSnd;
2025 
2026  SpinLockAcquire(&walsnd->mutex);
2027  walsnd->replyTime = replyTime;
2028  SpinLockRelease(&walsnd->mutex);
2029  }
2030 
2031  /*
2032  * Unset WalSender's xmins if the feedback message values are invalid.
2033  * This happens when the downstream turned hot_standby_feedback off.
2034  */
2035  if (!TransactionIdIsNormal(feedbackXmin)
2036  && !TransactionIdIsNormal(feedbackCatalogXmin))
2037  {
2039  if (MyReplicationSlot != NULL)
2040  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2041  return;
2042  }
2043 
2044  /*
2045  * Check that the provided xmin/epoch are sane, that is, not in the future
2046  * and not so far back as to be already wrapped around. Ignore if not.
2047  */
2048  if (TransactionIdIsNormal(feedbackXmin) &&
2049  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2050  return;
2051 
2052  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2053  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2054  return;
2055 
2056  /*
2057  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2058  * the xmin will be taken into account by GetOldestXmin. This will hold
2059  * back the removal of dead rows and thereby prevent the generation of
2060  * cleanup conflicts on the standby server.
2061  *
2062  * There is a small window for a race condition here: although we just
2063  * checked that feedbackXmin precedes nextXid, the nextXid could have
2064  * gotten advanced between our fetching it and applying the xmin below,
2065  * perhaps far enough to make feedbackXmin wrap around. In that case the
2066  * xmin we set here would be "in the future" and have no effect. No point
2067  * in worrying about this since it's too late to save the desired data
2068  * anyway. Assuming that the standby sends us an increasing sequence of
2069  * xmins, this could only happen during the first reply cycle, else our
2070  * own xmin would prevent nextXid from advancing so far.
2071  *
2072  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2073  * is assumed atomic, and there's no real need to prevent a concurrent
2074  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2075  * safe, and if we're moving it backwards, well, the data is at risk
2076  * already since a VACUUM could have just finished calling GetOldestXmin.)
2077  *
2078  * If we're using a replication slot we reserve the xmin via that,
2079  * otherwise via the walsender's PGXACT entry. We can only track the
2080  * catalog xmin separately when using a slot, so we store the least of the
2081  * two provided when not using a slot.
2082  *
2083  * XXX: It might make sense to generalize the ephemeral slot concept and
2084  * always use the slot mechanism to handle the feedback xmin.
2085  */
2086  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2087  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2088  else
2089  {
2090  if (TransactionIdIsNormal(feedbackCatalogXmin)
2091  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2092  MyPgXact->xmin = feedbackCatalogXmin;
2093  else
2094  MyPgXact->xmin = feedbackXmin;
2095  }
2096 }
uint32 TransactionId
Definition: c.h:514
TransactionId xmin
Definition: proc.h:228
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
slock_t mutex
PGXACT * MyPgXact
Definition: proc.c:68
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1952
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1056
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:359
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:154
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
int log_min_messages
Definition: guc.c:518
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:228
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1903
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1735 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1736 {
1737  char msgtype;
1738 
1739  /*
1740  * Check message type from the first byte.
1741  */
1742  msgtype = pq_getmsgbyte(&reply_message);
1743 
1744  switch (msgtype)
1745  {
1746  case 'r':
1748  break;
1749 
1750  case 'h':
1752  break;
1753 
1754  default:
1756  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1757  errmsg("unexpected message type \"%c\"", msgtype)));
1758  proc_exit(0);
1759  }
1760 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:608
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
static StringInfoData reply_message
Definition: walsender.c:154
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1798
int errmsg(const char *fmt,...)
Definition: elog.c:822
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1983

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1798 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), log_min_messages, LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1799 {
1800  XLogRecPtr writePtr,
1801  flushPtr,
1802  applyPtr;
1803  bool replyRequested;
1804  TimeOffset writeLag,
1805  flushLag,
1806  applyLag;
1807  bool clearLagTimes;
1808  TimestampTz now;
1809  TimestampTz replyTime;
1810 
1811  static bool fullyAppliedLastTime = false;
1812 
1813  /* the caller already consumed the msgtype byte */
1814  writePtr = pq_getmsgint64(&reply_message);
1815  flushPtr = pq_getmsgint64(&reply_message);
1816  applyPtr = pq_getmsgint64(&reply_message);
1817  replyTime = pq_getmsgint64(&reply_message);
1818  replyRequested = pq_getmsgbyte(&reply_message);
1819 
1820  if (log_min_messages <= DEBUG2)
1821  {
1822  char *replyTimeStr;
1823 
1824  /* Copy because timestamptz_to_str returns a static buffer */
1825  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1826 
1827  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1828  (uint32) (writePtr >> 32), (uint32) writePtr,
1829  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1830  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1831  replyRequested ? " (reply requested)" : "",
1832  replyTimeStr);
1833 
1834  pfree(replyTimeStr);
1835  }
1836 
1837  /* See if we can compute the round-trip lag for these positions. */
1838  now = GetCurrentTimestamp();
1839  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1840  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1841  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1842 
1843  /*
1844  * If the standby reports that it has fully replayed the WAL in two
1845  * consecutive reply messages, then the second such message must result
1846  * from wal_receiver_status_interval expiring on the standby. This is a
1847  * convenient time to forget the lag times measured when it last
1848  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1849  * until more WAL traffic arrives.
1850  */
1851  clearLagTimes = false;
1852  if (applyPtr == sentPtr)
1853  {
1854  if (fullyAppliedLastTime)
1855  clearLagTimes = true;
1856  fullyAppliedLastTime = true;
1857  }
1858  else
1859  fullyAppliedLastTime = false;
1860 
1861  /* Send a reply if the standby requested one. */
1862  if (replyRequested)
1863  WalSndKeepalive(false);
1864 
1865  /*
1866  * Update shared state for this WalSender process based on reply data from
1867  * standby.
1868  */
1869  {
1870  WalSnd *walsnd = MyWalSnd;
1871 
1872  SpinLockAcquire(&walsnd->mutex);
1873  walsnd->write = writePtr;
1874  walsnd->flush = flushPtr;
1875  walsnd->apply = applyPtr;
1876  if (writeLag != -1 || clearLagTimes)
1877  walsnd->writeLag = writeLag;
1878  if (flushLag != -1 || clearLagTimes)
1879  walsnd->flushLag = flushLag;
1880  if (applyLag != -1 || clearLagTimes)
1881  walsnd->applyLag = applyLag;
1882  walsnd->replyTime = replyTime;
1883  SpinLockRelease(&walsnd->mutex);
1884  }
1885 
1888 
1889  /*
1890  * Advance our local xmin horizon when the client confirmed a flush.
1891  */
1892  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1893  {
1896  else
1898  }
1899 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1056
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3376
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1766
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:359
#define SlotIsLogical(slot)
Definition: slot.h:154
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:154
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3499
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:150
int log_min_messages
Definition: guc.c:518
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1001
#define elog(elevel,...)
Definition: elog.h:228
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:426
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
bool am_cascading_walsender
Definition: walsender.c:116
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 436 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

437 {
439  char histfname[MAXFNAMELEN];
440  char path[MAXPGPATH];
441  int fd;
442  off_t histfilelen;
443  off_t bytesleft;
444  Size len;
445 
446  /*
447  * Reply with a result set with one row, and two columns. The first col is
448  * the name of the history file, 2nd is the contents.
449  */
450 
451  TLHistoryFileName(histfname, cmd->timeline);
452  TLHistoryFilePath(path, cmd->timeline);
453 
454  /* Send a RowDescription message */
455  pq_beginmessage(&buf, 'T');
456  pq_sendint16(&buf, 2); /* 2 fields */
457 
458  /* first field */
459  pq_sendstring(&buf, "filename"); /* col name */
460  pq_sendint32(&buf, 0); /* table oid */
461  pq_sendint16(&buf, 0); /* attnum */
462  pq_sendint32(&buf, TEXTOID); /* type oid */
463  pq_sendint16(&buf, -1); /* typlen */
464  pq_sendint32(&buf, 0); /* typmod */
465  pq_sendint16(&buf, 0); /* format code */
466 
467  /* second field */
468  pq_sendstring(&buf, "content"); /* col name */
469  pq_sendint32(&buf, 0); /* table oid */
470  pq_sendint16(&buf, 0); /* attnum */
471  pq_sendint32(&buf, BYTEAOID); /* type oid */
472  pq_sendint16(&buf, -1); /* typlen */
473  pq_sendint32(&buf, 0); /* typmod */
474  pq_sendint16(&buf, 0); /* format code */
475  pq_endmessage(&buf);
476 
477  /* Send a DataRow message */
478  pq_beginmessage(&buf, 'D');
479  pq_sendint16(&buf, 2); /* # of columns */
480  len = strlen(histfname);
481  pq_sendint32(&buf, len); /* col1 len */
482  pq_sendbytes(&buf, histfname, len);
483 
484  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
485  if (fd < 0)
486  ereport(ERROR,
488  errmsg("could not open file \"%s\": %m", path)));
489 
490  /* Determine file length and send it to client */
491  histfilelen = lseek(fd, 0, SEEK_END);
492  if (histfilelen < 0)
493  ereport(ERROR,
495  errmsg("could not seek to end of file \"%s\": %m", path)));
496  if (lseek(fd, 0, SEEK_SET) != 0)
497  ereport(ERROR,
499  errmsg("could not seek to beginning of file \"%s\": %m", path)));
500 
501  pq_sendint32(&buf, histfilelen); /* col2 len */
502 
503  bytesleft = histfilelen;
504  while (bytesleft > 0)
505  {
506  PGAlignedBlock rbuf;
507  int nread;
508 
510  nread = read(fd, rbuf.data, sizeof(rbuf));
512  if (nread < 0)
513  ereport(ERROR,
515  errmsg("could not read file \"%s\": %m",
516  path)));
517  else if (nread == 0)
518  ereport(ERROR,
520  errmsg("could not read file \"%s\": read %d of %zu",
521  path, nread, (Size) bytesleft)));
522 
523  pq_sendbytes(&buf, rbuf.data, nread);
524  bytesleft -= nread;
525  }
526 
527  if (CloseTransientFile(fd) != 0)
528  ereport(ERROR,
530  errmsg("could not close file \"%s\": %m", path)));
531 
532  pq_endmessage(&buf);
533 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
int errcode(int sqlerrcode)
Definition: elog.c:608
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1221
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1091
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:67
int errcode_for_file_access(void)
Definition: elog.c:631
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1344
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define MAXFNAMELEN
size_t Size
Definition: c.h:467
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1320
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:822
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1087 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1088 {
1090 
1091  /* make sure that our requirements are still fulfilled */
1093 
1095 
1096  ReplicationSlotAcquire(cmd->slotname, true);
1097 
1098  /*
1099  * Force a disconnect, so that the decoding code doesn't need to care
1100  * about an eventual switch from running in recovery, to running in a
1101  * normal environment. Client code is expected to handle reconnects.
1102  */
1104  {
1105  ereport(LOG,
1106  (errmsg("terminating walsender process after promotion")));
1107  got_STOPPING = true;
1108  }
1109 
1110  /*
1111  * Create our decoding context, making it start at the previously ack'ed
1112  * position.
1113  *
1114  * Do this before sending a CopyBothResponse message, so that any errors
1115  * are reported early.
1116  */
1118  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1122 
1123 
1125 
1126  /* Send a CopyBothResponse message, and start streaming */
1127  pq_beginmessage(&buf, 'W');
1128  pq_sendbyte(&buf, 0);
1129  pq_sendint16(&buf, 0);
1130  pq_endmessage(&buf);
1131  pq_flush();
1132 
1133 
1134  /* Start reading WAL from the oldest required WAL. */
1136 
1137  /*
1138  * Report the location after which we'll send out further commits as the
1139  * current sentPtr.
1140  */
1142 
1143  /* Also update the sent position status in shared memory */
1147 
1148  replication_active = true;
1149 
1151 
1152  /* Main loop of walsender */
1154 
1157 
1158  replication_active = false;
1159  if (got_STOPPING)
1160  proc_exit(0);
1162 
1163  /* Get out of COPY mode (CommandComplete). */
1164  EndCommand("COPY 0", DestRemote);
1165 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:183
void proc_exit(int code)
Definition: ipc.c:104
ReplicationSlotPersistentData data
Definition: slot.h:132
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7930
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:80
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:193
static char * buf
Definition: pg_test_fsync.c:67
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1203
static XLogRecPtr logical_startptr
Definition: walsender.c:194
void ReplicationSlotRelease(void)
Definition: slot.c:424
void SyncRepInitConfig(void)
Definition: syncrep.c:398
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2183
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1176
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:191
XLogRecPtr sentPtr
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:370
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:150
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
void WalSndSetState(WalSndState state)
Definition: walsender.c:3137
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:508
static void XLogSendLogical(void)
Definition: walsender.c:2773
XLogRecPtr restart_lsn
Definition: slot.h:72
int errmsg(const char *fmt,...)
Definition: elog.c:822
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:767
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
bool am_cascading_walsender
Definition: walsender.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1293

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 542 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

543 {
545  XLogRecPtr FlushPtr;
546 
547  if (ThisTimeLineID == 0)
548  ereport(ERROR,
549  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
550  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
551 
552  /*
553  * We assume here that we're logging enough information in the WAL for
554  * log-shipping, since this is checked in PostmasterMain().
555  *
556  * NOTE: wal_level can only change at shutdown, so in most cases it is
557  * difficult for there to be WAL data that we can still see that was
558  * written at wal_level='minimal'.
559  */
560 
561  if (cmd->slotname)
562  {
563  ReplicationSlotAcquire(cmd->slotname, true);
565  ereport(ERROR,
566  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
567  (errmsg("cannot use a logical replication slot for physical replication"))));
568  }
569 
570  /*
571  * Select the timeline. If it was given explicitly by the client, use
572  * that. Otherwise use the timeline of the last replayed record, which is
573  * kept in ThisTimeLineID.
574  */
576  {
577  /* this also updates ThisTimeLineID */
578  FlushPtr = GetStandbyFlushRecPtr();
579  }
580  else
581  FlushPtr = GetFlushRecPtr();
582 
583  if (cmd->timeline != 0)
584  {
585  XLogRecPtr switchpoint;
586 
587  sendTimeLine = cmd->timeline;
589  {
590  sendTimeLineIsHistoric = false;
592  }
593  else
594  {
595  List *timeLineHistory;
596 
597  sendTimeLineIsHistoric = true;
598 
599  /*
600  * Check that the timeline the client requested exists, and the
601  * requested start location is on that timeline.
602  */
603  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
604  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
606  list_free_deep(timeLineHistory);
607 
608  /*
609  * Found the requested timeline in the history. Check that
610  * requested startpoint is on that timeline in our history.
611  *
612  * This is quite loose on purpose. We only check that we didn't
613  * fork off the requested timeline before the switchpoint. We
614  * don't check that we switched *to* it before the requested
615  * starting point. This is because the client can legitimately
616  * request to start replication from the beginning of the WAL
617  * segment that contains switchpoint, but on the new timeline, so
618  * that it doesn't end up with a partial segment. If you ask for
619  * too old a starting point, you'll get an error later when we
620  * fail to find the requested WAL segment in pg_wal.
621  *
622  * XXX: we could be more strict here and only allow a startpoint
623  * that's older than the switchpoint, if it's still in the same
624  * WAL segment.
625  */
626  if (!XLogRecPtrIsInvalid(switchpoint) &&
627  switchpoint < cmd->startpoint)
628  {
629  ereport(ERROR,
630  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
631  (uint32) (cmd->startpoint >> 32),
632  (uint32) (cmd->startpoint),
633  cmd->timeline),
634  errdetail("This server's history forked from timeline %u at %X/%X.",
635  cmd->timeline,
636  (uint32) (switchpoint >> 32),
637  (uint32) (switchpoint))));
638  }
639  sendTimeLineValidUpto = switchpoint;
640  }
641  }
642  else
643  {
646  sendTimeLineIsHistoric = false;
647  }
648 
650 
651  /* If there is nothing to stream, don't even enter COPY mode */
653  {
654  /*
655  * When we first start replication the standby will be behind the
656  * primary. For some applications, for example synchronous
657  * replication, it is important to have a clear state for this initial
658  * catchup mode, so we can trigger actions when we change streaming
659  * state later. We may stay in this state for a long time, which is
660  * exactly why we want to be able to monitor whether or not we are
661  * still here.
662  */
664 
665  /* Send a CopyBothResponse message, and start streaming */
666  pq_beginmessage(&buf, 'W');
667  pq_sendbyte(&buf, 0);
668  pq_sendint16(&buf, 0);
669  pq_endmessage(&buf);
670  pq_flush();
671 
672  /*
673  * Don't allow a request to stream from a future point in WAL that
674  * hasn't been flushed to disk in this server yet.
675  */
676  if (FlushPtr < cmd->startpoint)
677  {
678  ereport(ERROR,
679  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
680  (uint32) (cmd->startpoint >> 32),
681  (uint32) (cmd->startpoint),
682  (uint32) (FlushPtr >> 32),
683  (uint32) (FlushPtr))));
684  }
685 
686  /* Start streaming from the requested point */
687  sentPtr = cmd->startpoint;
688 
689  /* Initialize shared memory status, too */
693 
695 
696  /* Main loop of walsender */
697  replication_active = true;
698 
700 
701  replication_active = false;
702  if (got_STOPPING)
703  proc_exit(0);
705 
707  }
708 
709  if (cmd->slotname)
711 
712  /*
713  * Copy is finished now. Send a single-row result set indicating the next
714  * timeline.
715  */
717  {
718  char startpos_str[8 + 1 + 8 + 1];
720  TupOutputState *tstate;
721  TupleDesc tupdesc;
722  Datum values[2];
723  bool nulls[2];
724 
725  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
726  (uint32) (sendTimeLineValidUpto >> 32),
728 
730  MemSet(nulls, false, sizeof(nulls));
731 
732  /*
733  * Need a tuple descriptor representing two columns. int8 may seem
734  * like a surprising data type for this, but in theory int4 would not
735  * be wide enough for this, as TimeLineID is unsigned.
736  */
737  tupdesc = CreateTemplateTupleDesc(2);
738  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
739  INT8OID, -1, 0);
740  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
741  TEXTOID, -1, 0);
742 
743  /* prepare for projection of tuple */
744  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
745 
746  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
747  values[1] = CStringGetTextDatum(startpos_str);
748 
749  /* send it to dest */
750  do_tup_output(tstate, values, nulls);
751 
752  end_tup_output(tstate);
753  }
754 
755  /* Send CommandComplete message */
756  pq_puttextmessage('C', "START_STREAMING");
757 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
#define pq_flush()
Definition: libpq.h:39
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2468
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:183
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:608
#define MemSet(start, val, len)
Definition: c.h:962
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8262
void list_free_deep(List *list)
Definition: list.c:1391
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
static char * buf
Definition: pg_test_fsync.c:67
static bool streamingDoneSending
Definition: walsender.c:175
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:955
unsigned int uint32
Definition: c.h:359
void ReplicationSlotRelease(void)
Definition: slot.c:424
#define SlotIsLogical(slot)
Definition: slot.h:154
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1699
void SyncRepInitConfig(void)
Definition: syncrep.c:398
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2183
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:191
uintptr_t Datum
Definition: postgres.h:367
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:150
TimeLineID ThisTimeLineID
Definition: xlog.c:187
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
void WalSndSetState(WalSndState state)
Definition: walsender.c:3137
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
static bool streamingDoneReceiving
Definition: walsender.c:176
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:83
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2894
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
Definition: pg_list.h:50
#define snprintf
Definition: port.h:192
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
bool am_cascading_walsender
Definition: walsender.c:116

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 1952 of file walsender.c.

References EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

1953 {
1954  FullTransactionId nextFullXid;
1955  TransactionId nextXid;
1956  uint32 nextEpoch;
1957 
1958  nextFullXid = ReadNextFullTransactionId();
1959  nextXid = XidFromFullTransactionId(nextFullXid);
1960  nextEpoch = EpochFromFullTransactionId(nextFullXid);
1961 
1962  if (xid <= nextXid)
1963  {
1964  if (epoch != nextEpoch)
1965  return false;
1966  }
1967  else
1968  {
1969  if (epoch + 1 != nextEpoch)
1970  return false;
1971  }
1972 
1973  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1974  return false; /* epoch OK, but it's wrapped around */
1975 
1976  return true;
1977 }
uint32 TransactionId
Definition: c.h:514
#define XidFromFullTransactionId(x)
Definition: transam.h:48
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:359
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ UpdateSpillStats()

static void UpdateSpillStats ( LogicalDecodingContext ctx)
static

Definition at line 3597 of file walsender.c.

References DEBUG2, elog, WalSnd::mutex, LogicalDecodingContext::reorder, WalSnd::spillBytes, ReorderBuffer::spillBytes, WalSnd::spillCount, ReorderBuffer::spillCount, WalSnd::spillTxns, ReorderBuffer::spillTxns, SpinLockAcquire, and SpinLockRelease.

Referenced by WalSndUpdateProgress().

3598 {
3599  ReorderBuffer *rb = ctx->reorder;
3600 
3602 
3603  MyWalSnd->spillTxns = rb->spillTxns;
3606 
3607  elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
3608  rb,
3609  (long long) rb->spillTxns,
3610  (long long) rb->spillCount,
3611  (long long) rb->spillBytes);
3612 
3614 }
struct ReorderBuffer * reorder
Definition: logical.h:42
slock_t mutex
int64 spillCount
int64 spillBytes
#define SpinLockAcquire(lock)
Definition: spin.h:62
int64 spillTxns
#define DEBUG2
Definition: elog.h:24
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define elog(elevel,...)
Definition: elog.h:228

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2156 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2157 {
2158  TimestampTz timeout;
2159 
2160  /* don't bail out if we're doing something that doesn't require timeouts */
2161  if (last_reply_timestamp <= 0)
2162  return;
2163 
2166 
2167  if (wal_sender_timeout > 0 && last_processing >= timeout)
2168  {
2169  /*
2170  * Since typically expiration of replication timeout means
2171  * communication problem, we don't send the error message to the
2172  * standby.
2173  */
2175  (errmsg("terminating walsender process due to replication timeout")));
2176 
2177  WalSndShutdown();
2178  }
2179 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:158
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:141
static void WalSndShutdown(void)
Definition: walsender.c:226
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:822
static TimestampTz last_reply_timestamp
Definition: walsender.c:164

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2106 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2107 {
2108  long sleeptime = 10000; /* 10 s */
2109 
2111  {
2112  TimestampTz wakeup_time;
2113  long sec_to_timeout;
2114  int microsec_to_timeout;
2115 
2116  /*
2117  * At the latest stop sleeping once wal_sender_timeout has been
2118  * reached.
2119  */
2122 
2123  /*
2124  * If no ping has been sent yet, wakeup when it's time to do so.
2125  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2126  * the timeout passed without a response.
2127  */
2130  wal_sender_timeout / 2);
2131 
2132  /* Compute relative time until wakeup. */
2133  TimestampDifference(now, wakeup_time,
2134  &sec_to_timeout, &microsec_to_timeout);
2135 
2136  sleeptime = sec_to_timeout * 1000 +
2137  microsec_to_timeout / 1000;
2138  }
2139 
2140  return sleeptime;
2141 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:41
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1657
static bool waiting_for_ping_response
Definition: walsender.c:167
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2854 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2855 {
2856  XLogRecPtr replicatedPtr;
2857 
2858  /* ... let's just be real sure we're caught up ... */
2859  send_data();
2860 
2861  /*
2862  * To figure out whether all WAL has successfully been replicated, check
2863  * flush location if valid, write otherwise. Tools like pg_receivewal will
2864  * usually (unless in synchronous mode) return an invalid flush location.
2865  */
2866  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2868 
2869  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2870  !pq_is_send_pending())
2871  {
2872  /* Inform the standby that XLOG streaming is done */
2873  EndCommand("COPY 0", DestRemote);
2874  pq_flush();
2875 
2876  proc_exit(0);
2877  }
2879  {
2880  WalSndKeepalive(true);
2882  }
2883 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3376
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:179
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:150
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:167

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 300 of file walsender.c.

References close, ConditionVariableCancelSleep(), DestNone, DestRemote, got_SIGUSR2, got_STOPPING, LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_STARTUP, whereToSendOutput, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

301 {
305 
306  if (sendSeg->ws_file >= 0)
307  {
309  sendSeg->ws_file = -1;
310  }
311 
312  if (MyReplicationSlot != NULL)
314 
316 
317  replication_active = false;
318 
319  if (got_STOPPING || got_SIGUSR2)
320  proc_exit(0);
321 
322  /* Revert back to startup state */
324 }
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:183
void proc_exit(int code)
Definition: ipc.c:104
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:424
static WALOpenSegment * sendSeg
Definition: walsender.c:132
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1344
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:182
static volatile sig_atomic_t replication_active
Definition: walsender.c:191
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3137
void ReplicationSlotCleanup(void)
Definition: slot.c:479
void LWLockReleaseAll(void)
Definition: lwlock.c:1825
#define close(a)
Definition: win32.h:12

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3156 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3157 {
3158  switch (state)
3159  {
3160  case WALSNDSTATE_STARTUP:
3161  return "startup";
3162  case WALSNDSTATE_BACKUP:
3163  return "backup";
3164  case WALSNDSTATE_CATCHUP:
3165  return "catchup";
3166  case WALSNDSTATE_STREAMING:
3167  return "streaming";
3168  case WALSNDSTATE_STOPPING:
3169  return "stopping";
3170  }
3171  return "UNKNOWN";
3172 }
Definition: regguts.h:298

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3073 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3074 {
3075  int i;
3076 
3077  for (i = 0; i < max_wal_senders; i++)
3078  {
3079  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3080  pid_t pid;
3081 
3082  SpinLockAcquire(&walsnd->mutex);
3083  pid = walsnd->pid;
3084  SpinLockRelease(&walsnd->mutex);
3085 
3086  if (pid == 0)
3087  continue;
3088 
3090  }
3091 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3376 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3377 {
3378  elog(DEBUG2, "sending replication keepalive");
3379 
3380  /* construct the message... */
3382  pq_sendbyte(&output_message, 'k');
3385  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3386 
3387  /* ... and send it wrapped in CopyData */
3389 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
static StringInfoData output_message
Definition: walsender.c:153
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static XLogRecPtr sentPtr
Definition: walsender.c:150
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog(elevel,...)
Definition: elog.h:228

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3395 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3396 {
3397  TimestampTz ping_time;
3398 
3399  /*
3400  * Don't send keepalive messages if timeouts are globally disabled or
3401  * we're doing something not partaking in timeouts.
3402  */
3403  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3404  return;
3405 
3407  return;
3408 
3409  /*
3410  * If half of wal_sender_timeout has lapsed without receiving any reply
3411  * from the standby, send a keep-alive message to the standby requesting
3412  * an immediate reply.
3413  */
3415  wal_sender_timeout / 2);
3416  if (last_processing >= ping_time)
3417  {
3418  WalSndKeepalive(true);
3420 
3421  /* Try to flush pending output to the client */
3422  if (pq_flush_if_writable() != 0)
3423  WalSndShutdown();
3424  }
3425 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3376
static TimestampTz last_processing
Definition: walsender.c:158
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:226
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:167
static TimestampTz last_reply_timestamp
Definition: walsender.c:164

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2370 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2371 {
2372  WalSnd *walsnd = MyWalSnd;
2373 
2374  Assert(walsnd != NULL);
2375 
2376  MyWalSnd = NULL;
2377 
2378  SpinLockAcquire(&walsnd->mutex);
2379  /* clear latch while holding the spinlock, so it can safely be read */
2380  walsnd->latch = NULL;
2381  /* Mark WalSnd struct as no longer being in use. */
2382  walsnd->pid = 0;
2383  SpinLockRelease(&walsnd->mutex);
2384 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:739

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2969 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2970 {
2971  int save_errno = errno;
2972 
2973  got_SIGUSR2 = true;
2974  SetLatch(MyLatch);
2975 
2976  errno = save_errno;
2977 }
void SetLatch(Latch *latch)
Definition: latch.c:436
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:182
struct Latch * MyLatch
Definition: globals.c:54

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2183 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

2184 {
2185  /*
2186  * Initialize the last reply timestamp. That enables timeout processing
2187  * from hereon.
2188  */
2190  waiting_for_ping_response = false;
2191 
2192  /*
2193  * Loop until we reach the end of this timeline or the client requests to
2194  * stop streaming.
2195  */
2196  for (;;)
2197  {
2198  /* Clear any already-pending wakeups */
2200 
2202 
2203  /* Process any requests or signals received recently */
2204  if (ConfigReloadPending)
2205  {
2206  ConfigReloadPending = false;
2209  }
2210 
2211  /* Check for input from the client */
2213 
2214  /*
2215  * If we have received CopyDone from the client, sent CopyDone
2216  * ourselves, and the output buffer is empty, it's time to exit
2217  * streaming.
2218  */
2220  !pq_is_send_pending())
2221  break;
2222 
2223  /*
2224  * If we don't have any pending data in the output buffer, try to send
2225  * some more. If there is some, we don't bother to call send_data
2226  * again until we've flushed it ... but we'd better assume we are not
2227  * caught up.
2228  */
2229  if (!pq_is_send_pending())
2230  send_data();
2231  else
2232  WalSndCaughtUp = false;
2233 
2234  /* Try to flush pending output to the client */
2235  if (pq_flush_if_writable() != 0)
2236  WalSndShutdown();
2237 
2238  /* If nothing remains to be sent right now ... */
2240  {
2241  /*
2242  * If we're in catchup state, move to streaming. This is an
2243  * important state change for users to know about, since before
2244  * this point data loss might occur if the primary dies and we
2245  * need to failover to the standby. The state change is also
2246  * important for synchronous replication, since commits that
2247  * started to wait at that point might wait for some time.
2248  */
2250  {
2251  ereport(DEBUG1,
2252  (errmsg("\"%s\" has now caught up with upstream server",
2253  application_name)));
2255  }
2256 
2257  /*
2258  * When SIGUSR2 arrives, we send any outstanding logs up to the
2259  * shutdown checkpoint record (i.e., the latest record), wait for
2260  * them to be replicated to the standby, and exit. This may be a
2261  * normal termination at shutdown, or a promotion, the walsender
2262  * is not sure which.
2263  */
2264  if (got_SIGUSR2)
2265  WalSndDone(send_data);
2266  }
2267 
2268  /* Check for replication timeout. */
2270 
2271  /* Send keepalive if the time has come */
2273 
2274  /*
2275  * We don't block if not caught up, unless there is unsent data
2276  * pending in which case we'd better block until the socket is
2277  * write-ready. This test is only needed for the case where the
2278  * send_data callback handled a subset of the available data but then
2279  * pq_flush_if_writable flushed it all --- we should immediately try
2280  * to send more.
2281  */
2283  {
2284  long sleeptime;
2285  int wakeEvents;
2286 
2287  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2289 
2290  /*
2291  * Use fresh timestamp, not last_processing, to reduce the chance
2292  * of reaching wal_sender_timeout before sending a keepalive.
2293  */
2295 
2296  if (pq_is_send_pending())
2297  wakeEvents |= WL_SOCKET_WRITEABLE;
2298 
2299  /* Sleep until something happens or we time out */
2300  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2301  MyProcPort->sock, sleeptime,
2303  }
2304  }
2305 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2854
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:41
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:175
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:182
void SyncRepInitConfig(void)
Definition: syncrep.c:398
#define ereport(elevel, rest)
Definition: elog.h:141
static bool WalSndCaughtUp
Definition: walsender.c:179
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3395
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2156
static void WalSndShutdown(void)
Definition: walsender.c:226
WalSnd * MyWalSnd
Definition: walsender.c:112
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2106
void WalSndSetState(WalSndState state)
Definition: walsender.c:3137
static bool streamingDoneReceiving
Definition: walsender.c:176
char * application_name
Definition: guc.c:541
int errmsg(const char *fmt,...)
Definition: elog.c:822
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:167
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1631
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1176 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1177 {
1178  /* can't have sync rep confused by sending the same LSN several times */
1179  if (!last_write)
1180  lsn = InvalidXLogRecPtr;
1181 
1182  resetStringInfo(ctx->out);
1183 
1184  pq_sendbyte(ctx->out, 'w');
1185  pq_sendint64(ctx->out, lsn); /* dataStart */
1186  pq_sendint64(ctx->out, lsn); /* walEnd */
1187 
1188  /*
1189  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1190  * reserve space here.
1191  */
1192  pq_sendint64(ctx->out, 0); /* sendtime */
1193 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
StringInfo out
Definition: logical.h:70

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2924 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2925 {
2926  int i;
2927 
2928  for (i = 0; i < max_wal_senders; i++)
2929  {
2930  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2931 
2932  SpinLockAcquire(&walsnd->mutex);
2933  if (walsnd->pid == 0)
2934  {
2935  SpinLockRelease(&walsnd->mutex);
2936  continue;
2937  }
2938  walsnd->needreload = true;
2939  SpinLockRelease(&walsnd->mutex);
2940  }
2941 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSegmentOpen()

static int WalSndSegmentOpen ( XLogSegNo  nextSegNo,
WALSegmentContext segcxt,
TimeLineID tli_p 
)
static

Definition at line 2388 of file walsender.c.

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, WALOpenSegment::ws_segno, WALSegmentContext::ws_segsize, XLByteToSeg, XLogFileName, and XLogFilePath.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2390 {
2391  char path[MAXPGPATH];
2392  int fd;
2393 
2394  /*-------
2395  * When reading from a historic timeline, and there is a timeline switch
2396  * within this segment, read from the WAL segment belonging to the new
2397  * timeline.
2398  *
2399  * For example, imagine that this server is currently on timeline 5, and
2400  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2401  * 0/13002088. In pg_wal, we have these files:
2402  *
2403  * ...
2404  * 000000040000000000000012
2405  * 000000040000000000000013
2406  * 000000050000000000000013
2407  * 000000050000000000000014
2408  * ...
2409  *
2410  * In this situation, when requested to send the WAL from segment 0x13, on
2411  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2412  * recovery prefers files from newer timelines, so if the segment was
2413  * restored from the archive on this server, the file belonging to the old
2414  * timeline, 000000040000000000000013, might not exist. Their contents are
2415  * equal up to the switchpoint, because at a timeline switch, the used
2416  * portion of the old segment is copied to the new file. -------
2417  */
2418  *tli_p = sendTimeLine;
2420  {
2421  XLogSegNo endSegNo;
2422 
2423  XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
2424  if (sendSeg->ws_segno == endSegNo)
2425  *tli_p = sendTimeLineNextTLI;
2426  }
2427 
2428  XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
2429  fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2430  if (fd >= 0)
2431  return fd;
2432 
2433  /*
2434  * If the file is not found, assume it's because the standby asked for a
2435  * too old WAL segment that has already been removed or recycled.
2436  */
2437  if (errno == ENOENT)
2438  {
2439  char xlogfname[MAXFNAMELEN];
2440  int save_errno = errno;
2441 
2442  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2443  errno = save_errno;
2444  ereport(ERROR,
2446  errmsg("requested WAL segment %s has already been removed",
2447  xlogfname)));
2448  }
2449  else
2450  ereport(ERROR,
2452  errmsg("could not open file \"%s\": %m",
2453  path)));
2454  return -1; /* keep compiler quiet */
2455 }
int wal_segment_size
Definition: xlog.c:112
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1221
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
XLogSegNo ws_segno
Definition: xlogreader.h:38
int errcode_for_file_access(void)
Definition: elog.c:631
static WALOpenSegment * sendSeg
Definition: walsender.c:132
#define ereport(elevel, rest)
Definition: elog.h:141
#define MAXFNAMELEN
static TimeLineID sendTimeLine
Definition: walsender.c:141
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:981
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:822
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3137 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3138 {
3139  WalSnd *walsnd = MyWalSnd;
3140 
3142 
3143  if (walsnd->state == state)
3144  return;
3145 
3146  SpinLockAcquire(&walsnd->mutex);
3147  walsnd->state = state;
3148  SpinLockRelease(&walsnd->mutex);
3149 }
slock_t mutex
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:739
Definition: regguts.h:298

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3012 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3013 {
3014  bool found;
3015  int i;
3016 
3017  WalSndCtl = (WalSndCtlData *)
3018  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3019 
3020  if (!found)
3021  {
3022  /* First time through, so initialize */
3024 
3025  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3027 
3028  for (i = 0; i < max_wal_senders; i++)
3029  {
3030  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3031 
3032  SpinLockInit(&walsnd->mutex);
3033  }
3034  }
3035 }
Size WalSndShmemSize(void)
Definition: walsender.c:3000
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:962
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3000 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3001 {
3002  Size size = 0;
3003 
3004  size = offsetof(WalSndCtlData, walsnds);
3005  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3006 
3007  return size;
3008 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:467
#define offsetof(type, field)
Definition: c.h:662

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 226 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndErrorCleanup(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

260 {
262 
263  /* Create a per-walsender data structure in shared memory */
265 
266  /*
267  * We don't currently need any ResourceOwner in a walsender process, but
268  * if we did, we could call CreateAuxProcessResourceOwner here.
269  */
270 
271  /*
272  * Let postmaster know that we're a WAL sender. Once we've declared us as
273  * a WAL sender process, postmaster will let us outlive the bgwriter and
274  * kill us last in the shutdown sequence, so we get a chance to stream all
275  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
276  * there's no going back, and we mustn't write any WAL records after this.
277  */
280 
281  /* Initialize empty timestamp buffer for lag tracking. */
283 
284  /* Make sure we can remember the current read position in XLOG. */
285  sendSeg = (WALOpenSegment *)
290 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2309
int wal_segment_size
Definition: xlog.c:112
static WALSegmentContext * sendCxt
Definition: walsender.c:133
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir)
Definition: xlogreader.c:209
bool RecoveryInProgress(void)
Definition: xlog.c:7930
static LagTracker * lag_tracker
Definition: walsender.c:216
static WALOpenSegment * sendSeg
Definition: walsender.c:132
MemoryContext TopMemoryContext
Definition: mcxt.c:44
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
bool am_cascading_walsender
Definition: walsender.c:116

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 2981 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

2982 {
2983  /* Set up signal handlers */
2985  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2986  pqsignal(SIGTERM, die); /* request shutdown */
2987  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2988  InitializeTimeouts(); /* establishes SIGALRM handler */
2991  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2992  * shutdown */
2993 
2994  /* Reset some signals that are accepted by postmaster but not here */
2996 }
void InitializeTimeouts(void)
Definition: timeout.c:346
#define SIGQUIT
Definition: win32_port.h:155
#define SIGUSR1
Definition: win32_port.h:166
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2969
#define SIGCHLD
Definition: win32_port.h:164
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:159
#define SIGUSR2
Definition: win32_port.h:167
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2833
#define SIGHUP
Definition: win32_port.h:154
#define SIG_IGN
Definition: win32_port.h:151
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:149
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:533
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2735
#define die(msg)
Definition: pg_test_fsync.c:96

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1293 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), UpdateSpillStats(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1294 {
1295  static TimestampTz sendTime = 0;
1297 
1298  /*
1299  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1300  * avoid flooding the lag tracker when we commit frequently.
1301  */
1302 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1303  if (!TimestampDifferenceExceeds(sendTime, now,
1305  return;
1306 
1307  LagTrackerWrite(lsn, now);
1308  sendTime = now;
1309 
1310  /*
1311  * Update statistics about transactions that spilled to disk.
1312  */
1313  UpdateSpillStats(ctx);
1314 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
static void UpdateSpillStats(LogicalDecodingContext *ctx)
Definition: walsender.c:3597
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3434
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1324 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1325 {
1326  int wakeEvents;
1327  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1328 
1329  /*
1330  * Fast path to avoid acquiring the spinlock in case we already know we
1331  * have enough WAL available. This is particularly interesting if we're
1332  * far behind.
1333  */
1334  if (RecentFlushPtr != InvalidXLogRecPtr &&
1335  loc <= RecentFlushPtr)
1336  return RecentFlushPtr;
1337 
1338  /* Get a more recent flush pointer. */
1339  if (!RecoveryInProgress())
1340  RecentFlushPtr = GetFlushRecPtr();
1341  else
1342  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1343 
1344  for (;;)
1345  {
1346  long sleeptime;
1347 
1348  /* Clear any already-pending wakeups */
1350 
1352 
1353  /* Process any requests or signals received recently */
1354  if (ConfigReloadPending)
1355  {
1356  ConfigReloadPending = false;
1359  }
1360 
1361  /* Check for input from the client */
1363 
1364  /*
1365  * If we're shutting down, trigger pending WAL to be written out,
1366  * otherwise we'd possibly end up waiting for WAL that never gets
1367  * written, because walwriter has shut down already.
1368  */
1369  if (got_STOPPING)
1371 
1372  /* Update our idea of the currently flushed position. */
1373  if (!RecoveryInProgress())
1374  RecentFlushPtr = GetFlushRecPtr();
1375  else
1376  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1377 
1378  /*
1379  * If postmaster asked us to stop, don't wait anymore.
1380  *
1381  * It's important to do this check after the recomputation of
1382  * RecentFlushPtr, so we can send all remaining data before shutting
1383  * down.
1384  */
1385  if (got_STOPPING)
1386  break;
1387 
1388  /*
1389  * We only send regular messages to the client for full decoded
1390  * transactions, but a synchronous replication and walsender shutdown
1391  * possibly are waiting for a later location. So we send pings
1392  * containing the flush location every now and then.
1393  */
1394  if (MyWalSnd->flush < sentPtr &&
1395  MyWalSnd->write < sentPtr &&
1397  {
1398  WalSndKeepalive(false);
1400  }
1401 
1402  /* check whether we're done */
1403  if (loc <= RecentFlushPtr)
1404  break;
1405 
1406  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1407  WalSndCaughtUp = true;
1408 
1409  /*
1410  * Try to flush any pending output to the client.
1411  */
1412  if (pq_flush_if_writable() != 0)
1413  WalSndShutdown();
1414 
1415  /*
1416  * If we have received CopyDone from the client, sent CopyDone
1417  * ourselves, and the output buffer is empty, it's time to exit
1418  * streaming, so fail the current WAL fetch request.
1419  */
1421  !pq_is_send_pending())
1422  break;
1423 
1424  /* die if timeout was reached */
1426 
1427  /* Send keepalive if the time has come */
1429 
1430  /*
1431  * Sleep until something happens or we time out. Also wait for the
1432  * socket becoming writable, if there's still pending output.
1433  * Otherwise we might sit on sendable output data while waiting for
1434  * new WAL to be generated. (But if we have nothing to send, we don't
1435  * want to wake on socket-writable.)
1436  */
1438 
1439  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1441 
1442  if (pq_is_send_pending())
1443  wakeEvents |= WL_SOCKET_WRITEABLE;
1444 
1445  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1446  MyProcPort->sock, sleeptime,
1448  }
1449 
1450  /* reactivate latch so WalSndLoop knows to continue */
1451  SetLatch(MyLatch);
1452  return RecentFlushPtr;
1453 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:183
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8262
int sleeptime
Definition: pg_standby.c:41
bool RecoveryInProgress(void)
Definition: xlog.c:7930
void SetLatch(Latch *latch)
Definition: latch.c:436
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3376
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11193
bool XLogBackgroundFlush(void)
Definition: xlog.c:2993
static bool streamingDoneSending
Definition: walsender.c:175
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void SyncRepInitConfig(void)
Definition: syncrep.c:398
static bool WalSndCaughtUp
Definition: walsender.c:179
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3395
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2156
static void WalSndShutdown(void)
Definition: walsender.c:226
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:150
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2106
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool streamingDoneReceiving
Definition: walsender.c:176
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:167
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1631
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3099 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3100 {
3101  for (;;)
3102  {
3103  int i;
3104  bool all_stopped = true;
3105 
3106  for (i = 0; i < max_wal_senders; i++)
3107  {
3108  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3109 
3110  SpinLockAcquire(&walsnd->mutex);
3111 
3112  if (walsnd->pid == 0)
3113  {
3114  SpinLockRelease(&walsnd->mutex);
3115  continue;
3116  }
3117 
3118  if (walsnd->state != WALSNDSTATE_STOPPING)
3119  {
3120  all_stopped = false;
3121  SpinLockRelease(&walsnd->mutex);
3122  break;
3123  }
3124  SpinLockRelease(&walsnd->mutex);
3125  }
3126 
3127  /* safe to leave if confirmation is done for all WAL senders */
3128  if (all_stopped)
3129  return;
3130 
3131  pg_usleep(10000L); /* wait for 10 msec */
3132  }
3133 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3044 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3045 {
3046  int i;
3047 
3048  for (i = 0; i < max_wal_senders; i++)
3049  {
3050  Latch *latch;
3051  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3052 
3053  /*
3054  * Get latch pointer with spinlock held, for the unlikely case that
3055  * pointer reads aren't atomic (as they're 8 bytes).
3056  */
3057  SpinLockAcquire(&walsnd->mutex);
3058  latch = walsnd->latch;
3059  SpinLockRelease(&walsnd->mutex);
3060 
3061  if (latch != NULL)
3062  SetLatch(latch);
3063  }
3064 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:436
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1203 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1205 {
1206  TimestampTz now;
1207 
1208  /*
1209  * Fill the send timestamp last, so that it is taken as late as possible.
1210  * This is somewhat ugly, but the protocol is set as it's already used for
1211  * several releases by streaming physical replication.
1212  */
1214  now = GetCurrentTimestamp();
1215  pq_sendint64(&tmpbuf, now);
1216  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1217  tmpbuf.data, sizeof(int64));
1218 
1219  /* output previously gathered data in a CopyData packet */
1220  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1221 
1223 
1224  /* Try to flush pending output to the client */
1225  if (pq_flush_if_writable() != 0)
1226  WalSndShutdown();
1227 
1228  /* Try taking fast path unless we get too close to walsender timeout. */
1230  wal_sender_timeout / 2) &&
1231  !pq_is_send_pending())
1232  {
1233  return;
1234  }
1235 
1236  /* If we have pending write here, go to slow path */
1237  for (;;)
1238  {
1239  int wakeEvents;
1240  long sleeptime;
1241 
1242  /* Check for input from the client */
1244 
1245  /* die if timeout was reached */
1247 
1248  /* Send keepalive if the time has come */
1250 
1251  if (!pq_is_send_pending())
1252  break;
1253 
1255 
1256  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1258 
1259  /* Sleep until something happens or we time out */
1260  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1261  MyProcPort->sock, sleeptime,
1263 
1264  /* Clear any already-pending wakeups */
1266 
1268 
1269  /* Process any requests or signals received recently */
1270  if (ConfigReloadPending)
1271  {
1272  ConfigReloadPending = false;
1275  }
1276 
1277  /* Try to flush pending output to the client */
1278  if (pq_flush_if_writable() != 0)
1279  WalSndShutdown();
1280  }
1281 
1282  /* reactivate latch so WalSndLoop knows to continue */
1283  SetLatch(MyLatch);
1284 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
int wal_sender_timeout
Definition: walsender.c:123
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:41
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void SetLatch(Latch *latch)
Definition: latch.c:436
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:519
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void SyncRepInitConfig(void)
Definition: syncrep.c:398
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3395
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2156
static void WalSndShutdown(void)
Definition: walsender.c:226
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2106
static StringInfoData tmpbuf
Definition: walsender.c:155
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:70
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1631
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2773 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2774 {
2775  XLogRecord *record;
2776  char *errm;
2777 
2778  /*
2779  * We'll use the current flush point to determine whether we've caught up.
2780  * This variable is static in order to cache it across calls. Caching is
2781  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2782  * spinlock.
2783  */
2784  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2785 
2786  /*
2787  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2788  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2789  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2790  * didn't wait - i.e. when we're shutting down.
2791  */
2792  WalSndCaughtUp = false;
2793 
2796 
2797  /* xlog record was invalid */
2798  if (errm != NULL)
2799  elog(ERROR, "%s", errm);
2800 
2801  if (record != NULL)
2802  {
2803  /*
2804  * Note the lack of any call to LagTrackerWrite() which is handled by
2805  * WalSndUpdateProgress which is called by output plugin through
2806  * logical decoding write api.
2807  */
2809 
2811  }
2812 
2813  /*
2814  * If first time through in this session, initialize flushPtr. Otherwise,
2815  * we only need to update flushPtr if EndRecPtr is past it.
2816  */
2817  if (flushPtr == InvalidXLogRecPtr)
2818  flushPtr = GetFlushRecPtr();
2819  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2820  flushPtr = GetFlushRecPtr();
2821 
2822  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
2823  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2824  WalSndCaughtUp = true;
2825 
2826  /*
2827  * If we're caught up and have been requested to stop, have WalSndLoop()
2828  * terminate the connection in an orderly manner, after writing out all
2829  * the pending data.
2830  */
2832  got_SIGUSR2 = true;
2833 
2834  /* Update shared memory status */
2835  {
2836  WalSnd *walsnd = MyWalSnd;
2837 
2838  SpinLockAcquire(&walsnd->mutex);
2839  walsnd->sentPtr = sentPtr;
2840  SpinLockRelease(&walsnd->mutex);
2841  }
2842 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:183
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8262
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:238
XLogRecPtr EndRecPtr
Definition: xlogreader.h:132
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:94
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:193
static XLogRecPtr logical_startptr
Definition: walsender.c:194
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:182
static bool WalSndCaughtUp
Definition: walsender.c:179
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:150
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogReaderState * reader
Definition: logical.h:41
#define elog(elevel,...)
Definition: elog.h:228

◆ XLogSendPhysical()