PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void ProcessPendingWrites (void)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 218 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 236 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1413 of file walsender.c.

1414 {
1415  bool failover_given = false;
1416  bool two_phase_given = false;
1417  bool failover;
1418  bool two_phase;
1419 
1420  /* Parse options */
1421  foreach_ptr(DefElem, defel, cmd->options)
1422  {
1423  if (strcmp(defel->defname, "failover") == 0)
1424  {
1425  if (failover_given)
1426  ereport(ERROR,
1427  (errcode(ERRCODE_SYNTAX_ERROR),
1428  errmsg("conflicting or redundant options")));
1429  failover_given = true;
1430  failover = defGetBoolean(defel);
1431  }
1432  else if (strcmp(defel->defname, "two_phase") == 0)
1433  {
1434  if (two_phase_given)
1435  ereport(ERROR,
1436  (errcode(ERRCODE_SYNTAX_ERROR),
1437  errmsg("conflicting or redundant options")));
1438  two_phase_given = true;
1439  two_phase = defGetBoolean(defel);
1440  }
1441  else
1442  elog(ERROR, "unrecognized option: %s", defel->defname);
1443  }
1444 
1446  failover_given ? &failover : NULL,
1447  two_phase_given ? &two_phase : NULL);
1448 }
bool defGetBoolean(DefElem *def)
Definition: define.c:107
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool two_phase
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:807

References defGetBoolean(), elog, ereport, errcode(), errmsg(), ERROR, foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1199 of file walsender.c.

1200 {
1201  const char *snapshot_name = NULL;
1202  char xloc[MAXFNAMELEN];
1203  char *slot_name;
1204  bool reserve_wal = false;
1205  bool two_phase = false;
1206  bool failover = false;
1207  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1208  DestReceiver *dest;
1209  TupOutputState *tstate;
1210  TupleDesc tupdesc;
1211  Datum values[4];
1212  bool nulls[4] = {0};
1213 
1215 
1216  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
1217  &failover);
1218 
1219  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1220  {
1221  ReplicationSlotCreate(cmd->slotname, false,
1223  false, false, false);
1224 
1225  if (reserve_wal)
1226  {
1228 
1230 
1231  /* Write this slot to disk if it's a permanent one. */
1232  if (!cmd->temporary)
1234  }
1235  }
1236  else
1237  {
1239  bool need_full_snapshot = false;
1240 
1242 
1244 
1245  /*
1246  * Initially create persistent slot as ephemeral - that allows us to
1247  * nicely handle errors during initialization because it'll get
1248  * dropped if this transaction fails. We'll make it persistent at the
1249  * end. Temporary slots can be created as temporary from beginning as
1250  * they get dropped on error as well.
1251  */
1252  ReplicationSlotCreate(cmd->slotname, true,
1254  two_phase, failover, false);
1255 
1256  /*
1257  * Do options check early so that we can bail before calling the
1258  * DecodingContextFindStartpoint which can take long time.
1259  */
1260  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1261  {
1262  if (IsTransactionBlock())
1263  ereport(ERROR,
1264  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1265  (errmsg("%s must not be called inside a transaction",
1266  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1267 
1268  need_full_snapshot = true;
1269  }
1270  else if (snapshot_action == CRS_USE_SNAPSHOT)
1271  {
1272  if (!IsTransactionBlock())
1273  ereport(ERROR,
1274  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1275  (errmsg("%s must be called inside a transaction",
1276  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1277 
1279  ereport(ERROR,
1280  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1281  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1282  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1283  if (!XactReadOnly)
1284  ereport(ERROR,
1285  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1286  (errmsg("%s must be called in a read-only transaction",
1287  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1288 
1289  if (FirstSnapshotSet)
1290  ereport(ERROR,
1291  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1292  (errmsg("%s must be called before any query",
1293  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1294 
1295  if (IsSubTransaction())
1296  ereport(ERROR,
1297  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1298  (errmsg("%s must not be called in a subtransaction",
1299  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1300 
1301  need_full_snapshot = true;
1302  }
1303 
1304  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1306  XL_ROUTINE(.page_read = logical_read_xlog_page,
1307  .segment_open = WalSndSegmentOpen,
1308  .segment_close = wal_segment_close),
1311 
1312  /*
1313  * Signal that we don't need the timeout mechanism. We're just
1314  * creating the replication slot and don't yet accept feedback
1315  * messages or send keepalives. As we possibly need to wait for
1316  * further WAL the walsender would otherwise possibly be killed too
1317  * soon.
1318  */
1320 
1321  /* build initial snapshot, might take a while */
1323 
1324  /*
1325  * Export or use the snapshot if we've been asked to do so.
1326  *
1327  * NB. We will convert the snapbuild.c kind of snapshot to normal
1328  * snapshot when doing this.
1329  */
1330  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1331  {
1332  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1333  }
1334  else if (snapshot_action == CRS_USE_SNAPSHOT)
1335  {
1336  Snapshot snap;
1337 
1340  }
1341 
1342  /* don't need the decoding context anymore */
1343  FreeDecodingContext(ctx);
1344 
1345  if (!cmd->temporary)
1347  }
1348 
1349  snprintf(xloc, sizeof(xloc), "%X/%X",
1351 
1353 
1354  /*----------
1355  * Need a tuple descriptor representing four columns:
1356  * - first field: the slot name
1357  * - second field: LSN at which we became consistent
1358  * - third field: exported snapshot's name
1359  * - fourth field: output plugin
1360  */
1361  tupdesc = CreateTemplateTupleDesc(4);
1362  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1363  TEXTOID, -1, 0);
1364  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1365  TEXTOID, -1, 0);
1366  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1367  TEXTOID, -1, 0);
1368  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1369  TEXTOID, -1, 0);
1370 
1371  /* prepare for projection of tuples */
1372  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1373 
1374  /* slot_name */
1375  slot_name = NameStr(MyReplicationSlot->data.name);
1376  values[0] = CStringGetTextDatum(slot_name);
1377 
1378  /* consistent wal location */
1379  values[1] = CStringGetTextDatum(xloc);
1380 
1381  /* snapshot name, or NULL if none */
1382  if (snapshot_name != NULL)
1383  values[2] = CStringGetTextDatum(snapshot_name);
1384  else
1385  nulls[2] = true;
1386 
1387  /* plugin, or NULL if none */
1388  if (cmd->plugin != NULL)
1389  values[3] = CStringGetTextDatum(cmd->plugin);
1390  else
1391  nulls[3] = true;
1392 
1393  /* send it to dest */
1394  do_tup_output(tstate, values, nulls);
1395  end_tup_output(tstate);
1396 
1398 }
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:150
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define NameStr(name)
Definition: c.h:746
#define Assert(condition)
Definition: c.h:858
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
Definition: execTuples.c:2362
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2420
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2342
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:694
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:650
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:109
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:330
#define NIL
Definition: pg_list.h:68
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:309
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1038
void ReplicationSlotReserveWal(void)
Definition: slot.c:1429
void ReplicationSlotPersist(void)
Definition: slot.c:1055
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
void ReplicationSlotSave(void)
Definition: slot.c:1020
void ReplicationSlotRelease(void)
Definition: slot.c:652
@ RS_PERSISTENT
Definition: slot.h:38
@ RS_EPHEMERAL
Definition: slot.h:39
@ RS_TEMPORARY
Definition: slot.h:40
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:678
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:579
bool FirstSnapshotSet
Definition: snapmgr.c:135
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:1840
PGPROC * MyProc
Definition: proc.c:67
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:107
ReplicationSlotPersistentData data
Definition: slot.h:181
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:67
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:726
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition: walsender.c:1122
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2991
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1575
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1671
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:1049
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1548
static TimestampTz last_reply_timestamp
Definition: walsender.c:179
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
bool XactReadOnly
Definition: xact.c:81
int XactIsoLevel
Definition: xact.c:78
bool IsSubTransaction(void)
Definition: xact.c:5037
bool IsTransactionBlock(void)
Definition: xact.c:4964
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:842

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1404 of file walsender.c.

1405 {
1406  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1407 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:784

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1980 of file walsender.c.

1981 {
1982  int parse_rc;
1983  Node *cmd_node;
1984  const char *cmdtag;
1985  MemoryContext cmd_context;
1986  MemoryContext old_context;
1987 
1988  /*
1989  * If WAL sender has been told that shutdown is getting close, switch its
1990  * status accordingly to handle the next replication commands correctly.
1991  */
1992  if (got_STOPPING)
1994 
1995  /*
1996  * Throw error if in stopping mode. We need prevent commands that could
1997  * generate WAL while the shutdown checkpoint is being written. To be
1998  * safe, we just prohibit all new commands.
1999  */
2001  ereport(ERROR,
2002  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2003  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2004 
2005  /*
2006  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2007  * command arrives. Clean up the old stuff if there's anything.
2008  */
2010 
2012 
2013  /*
2014  * Prepare to parse and execute the command.
2015  */
2017  "Replication command context",
2019  old_context = MemoryContextSwitchTo(cmd_context);
2020 
2021  replication_scanner_init(cmd_string);
2022 
2023  /*
2024  * Is it a WalSender command?
2025  */
2027  {
2028  /* Nope; clean up and get out. */
2030 
2031  MemoryContextSwitchTo(old_context);
2032  MemoryContextDelete(cmd_context);
2033 
2034  /* XXX this is a pretty random place to make this check */
2035  if (MyDatabaseId == InvalidOid)
2036  ereport(ERROR,
2037  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2038  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2039 
2040  /* Tell the caller that this wasn't a WalSender command. */
2041  return false;
2042  }
2043 
2044  /*
2045  * Looks like a WalSender command, so parse it.
2046  */
2047  parse_rc = replication_yyparse();
2048  if (parse_rc != 0)
2049  ereport(ERROR,
2050  (errcode(ERRCODE_SYNTAX_ERROR),
2051  errmsg_internal("replication command parser returned %d",
2052  parse_rc)));
2054 
2055  cmd_node = replication_parse_result;
2056 
2057  /*
2058  * Report query to various monitoring facilities. For this purpose, we
2059  * report replication commands just like SQL commands.
2060  */
2061  debug_query_string = cmd_string;
2062 
2064 
2065  /*
2066  * Log replication command if log_replication_commands is enabled. Even
2067  * when it's disabled, log the command with DEBUG1 level for backward
2068  * compatibility.
2069  */
2071  (errmsg("received replication command: %s", cmd_string)));
2072 
2073  /*
2074  * Disallow replication commands in aborted transaction blocks.
2075  */
2077  ereport(ERROR,
2078  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2079  errmsg("current transaction is aborted, "
2080  "commands ignored until end of transaction block")));
2081 
2083 
2084  /*
2085  * Allocate buffers that will be used for each outgoing and incoming
2086  * message. We do this just once per command to reduce palloc overhead.
2087  */
2091 
2092  switch (cmd_node->type)
2093  {
2094  case T_IdentifySystemCmd:
2095  cmdtag = "IDENTIFY_SYSTEM";
2096  set_ps_display(cmdtag);
2097  IdentifySystem();
2098  EndReplicationCommand(cmdtag);
2099  break;
2100 
2101  case T_ReadReplicationSlotCmd:
2102  cmdtag = "READ_REPLICATION_SLOT";
2103  set_ps_display(cmdtag);
2105  EndReplicationCommand(cmdtag);
2106  break;
2107 
2108  case T_BaseBackupCmd:
2109  cmdtag = "BASE_BACKUP";
2110  set_ps_display(cmdtag);
2111  PreventInTransactionBlock(true, cmdtag);
2113  EndReplicationCommand(cmdtag);
2114  break;
2115 
2116  case T_CreateReplicationSlotCmd:
2117  cmdtag = "CREATE_REPLICATION_SLOT";
2118  set_ps_display(cmdtag);
2120  EndReplicationCommand(cmdtag);
2121  break;
2122 
2123  case T_DropReplicationSlotCmd:
2124  cmdtag = "DROP_REPLICATION_SLOT";
2125  set_ps_display(cmdtag);
2127  EndReplicationCommand(cmdtag);
2128  break;
2129 
2130  case T_AlterReplicationSlotCmd:
2131  cmdtag = "ALTER_REPLICATION_SLOT";
2132  set_ps_display(cmdtag);
2134  EndReplicationCommand(cmdtag);
2135  break;
2136 
2137  case T_StartReplicationCmd:
2138  {
2139  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2140 
2141  cmdtag = "START_REPLICATION";
2142  set_ps_display(cmdtag);
2143  PreventInTransactionBlock(true, cmdtag);
2144 
2145  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2146  StartReplication(cmd);
2147  else
2149 
2150  /* dupe, but necessary per libpqrcv_endstreaming */
2151  EndReplicationCommand(cmdtag);
2152 
2153  Assert(xlogreader != NULL);
2154  break;
2155  }
2156 
2157  case T_TimeLineHistoryCmd:
2158  cmdtag = "TIMELINE_HISTORY";
2159  set_ps_display(cmdtag);
2160  PreventInTransactionBlock(true, cmdtag);
2162  EndReplicationCommand(cmdtag);
2163  break;
2164 
2165  case T_VariableShowStmt:
2166  {
2168  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2169 
2170  cmdtag = "SHOW";
2171  set_ps_display(cmdtag);
2172 
2173  /* syscache access needs a transaction environment */
2175  GetPGVariable(n->name, dest);
2177  EndReplicationCommand(cmdtag);
2178  }
2179  break;
2180 
2181  case T_UploadManifestCmd:
2182  cmdtag = "UPLOAD_MANIFEST";
2183  set_ps_display(cmdtag);
2184  PreventInTransactionBlock(true, cmdtag);
2185  UploadManifest();
2186  EndReplicationCommand(cmdtag);
2187  break;
2188 
2189  default:
2190  elog(ERROR, "unrecognized replication command node tag: %u",
2191  cmd_node->type);
2192  }
2193 
2194  /* done */
2195  MemoryContextSwitchTo(old_context);
2196  MemoryContextDelete(cmd_context);
2197 
2198  /*
2199  * We need not update ps display or pg_stat_activity, because PostgresMain
2200  * will reset those to "idle". But we must reset debug_query_string to
2201  * ensure it doesn't become a dangling pointer.
2202  */
2203  debug_query_string = NULL;
2204 
2205  return true;
2206 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:989
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:93
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const char * debug_query_string
Definition: postgres.c:88
#define InvalidOid
Definition: postgres_ext.h:36
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
MemoryContextSwitchTo(old_ctx)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:739
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1413
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:587
WalSnd * MyWalSnd
Definition: walsender.c:112
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:488
static StringInfoData tmpbuf
Definition: walsender.c:170
static void IdentifySystem(void)
Definition: walsender.c:407
static StringInfoData reply_message
Definition: walsender.c:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3812
static StringInfoData output_message
Definition: walsender.c:168
static void UploadManifest(void)
Definition: walsender.c:677
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:198
bool log_replication_commands
Definition: walsender.c:125
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1199
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1455
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:147
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1404
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:817
static XLogReaderState * xlogreader
Definition: walsender.c:137
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3628
void StartTransactionCommand(void)
Definition: xact.c:3039
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:406
void CommitTransactionCommand(void)
Definition: xact.c:3137

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3507 of file walsender.c.

3508 {
3509  XLogRecPtr replayPtr;
3510  TimeLineID replayTLI;
3511  XLogRecPtr receivePtr;
3513  XLogRecPtr result;
3514 
3516 
3517  /*
3518  * We can safely send what's already been replayed. Also, if walreceiver
3519  * is streaming WAL from the same timeline, we can send anything that it
3520  * has streamed, but hasn't been replayed yet.
3521  */
3522 
3523  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3524  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3525 
3526  if (tli)
3527  *tli = replayTLI;
3528 
3529  result = replayPtr;
3530  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3531  result = receivePtr;
3532 
3533  return result;
3534 }
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1651
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:116
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:264
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t *  offset,
IncrementalBackupInfo ib 
)
static

Definition at line 741 of file walsender.c.

743 {
744  int mtype;
745  int maxmsglen;
746 
748 
749  pq_startmsgread();
750  mtype = pq_getbyte();
751  if (mtype == EOF)
752  ereport(ERROR,
753  (errcode(ERRCODE_CONNECTION_FAILURE),
754  errmsg("unexpected EOF on client connection with an open transaction")));
755 
756  switch (mtype)
757  {
758  case 'd': /* CopyData */
759  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
760  break;
761  case 'c': /* CopyDone */
762  case 'f': /* CopyFail */
763  case 'H': /* Flush */
764  case 'S': /* Sync */
765  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
766  break;
767  default:
768  ereport(ERROR,
769  (errcode(ERRCODE_PROTOCOL_VIOLATION),
770  errmsg("unexpected message type 0x%02X during COPY from stdin",
771  mtype)));
772  maxmsglen = 0; /* keep compiler quiet */
773  break;
774  }
775 
776  /* Now collect the message body */
777  if (pq_getmessage(buf, maxmsglen))
778  ereport(ERROR,
779  (errcode(ERRCODE_CONNECTION_FAILURE),
780  errmsg("unexpected EOF on client connection with an open transaction")));
782 
783  /* Process the message */
784  switch (mtype)
785  {
786  case 'd': /* CopyData */
787  AppendIncrementalManifestData(ib, buf->data, buf->len);
788  return true;
789 
790  case 'c': /* CopyDone */
791  return false;
792 
793  case 'H': /* Sync */
794  case 'S': /* Flush */
795  /* Ignore these while in CopyOut mode as we do elsewhere. */
796  return true;
797 
798  case 'f':
799  ereport(ERROR,
800  (errcode(ERRCODE_QUERY_CANCELED),
801  errmsg("COPY from stdin failed: %s",
802  pq_getmsgstring(buf))));
803  }
804 
805  /* Not reached. */
806  Assert(false);
807  return false;
808 }
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:141
#define RESUME_CANCEL_INTERRUPTS()
Definition: miscadmin.h:143
static char * buf
Definition: pg_test_fsync.c:73
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1203
int pq_getbyte(void)
Definition: pqcomm.c:964
void pq_startmsgread(void)
Definition: pqcomm.c:1141
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579

References AppendIncrementalManifestData(), Assert, buf, ereport, errcode(), errmsg(), ERROR, HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3563 of file walsender.c.

3564 {
3566 
3567  /*
3568  * If replication has not yet started, die like with SIGTERM. If
3569  * replication is active, only set a flag and wake up the main loop. It
3570  * will send any outstanding WAL, wait for it to be replicated to the
3571  * standby, and then exit gracefully.
3572  */
3573  if (!replication_active)
3574  kill(MyProcPid, SIGTERM);
3575  else
3576  got_STOPPING = true;
3577 }
int MyProcPid
Definition: globals.c:46
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:206
#define kill(pid, sig)
Definition: win32_port.h:503

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 407 of file walsender.c.

408 {
409  char sysid[32];
410  char xloc[MAXFNAMELEN];
411  XLogRecPtr logptr;
412  char *dbname = NULL;
414  TupOutputState *tstate;
415  TupleDesc tupdesc;
416  Datum values[4];
417  bool nulls[4] = {0};
418  TimeLineID currTLI;
419 
420  /*
421  * Reply with a result set with one row, four columns. First col is system
422  * ID, second is timeline ID, third is current xlog location and the
423  * fourth contains the database name if we are connected to one.
424  */
425 
426  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
428 
431  logptr = GetStandbyFlushRecPtr(&currTLI);
432  else
433  logptr = GetFlushRecPtr(&currTLI);
434 
435  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
436 
437  if (MyDatabaseId != InvalidOid)
438  {
440 
441  /* syscache access needs a transaction env. */
444  /* copy dbname out of TX context */
447  }
448 
450 
451  /* need a tuple descriptor representing four columns */
452  tupdesc = CreateTemplateTupleDesc(4);
453  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
454  TEXTOID, -1, 0);
455  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
456  INT8OID, -1, 0);
457  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
458  TEXTOID, -1, 0);
459  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
460  TEXTOID, -1, 0);
461 
462  /* prepare for projection of tuples */
463  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
464 
465  /* column 1: system identifier */
466  values[0] = CStringGetTextDatum(sysid);
467 
468  /* column 2: timeline */
469  values[1] = Int64GetDatum(currTLI);
470 
471  /* column 3: wal location */
472  values[2] = CStringGetTextDatum(xloc);
473 
474  /* column 4: database name, or NULL if none */
475  if (dbname)
477  else
478  nulls[3] = true;
479 
480  /* send it to dest */
481  do_tup_output(tstate, values, nulls);
482 
483  end_tup_output(tstate);
484 }
#define UINT64_FORMAT
Definition: c.h:549
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3184
struct cursor * cur
Definition: ecpg.c:28
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1683
char * dbname
Definition: streamutil.c:52
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3507
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4561
bool RecoveryInProgress(void)
Definition: xlog.c:6333
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6498

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2896 of file walsender.c.

2897 {
2898  int i;
2899 
2900  /*
2901  * WalSndCtl should be set up already (we inherit this by fork() or
2902  * EXEC_BACKEND mechanism from the postmaster).
2903  */
2904  Assert(WalSndCtl != NULL);
2905  Assert(MyWalSnd == NULL);
2906 
2907  /*
2908  * Find a free walsender slot and reserve it. This must not fail due to
2909  * the prior check for free WAL senders in InitProcess().
2910  */
2911  for (i = 0; i < max_wal_senders; i++)
2912  {
2913  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2914 
2915  SpinLockAcquire(&walsnd->mutex);
2916 
2917  if (walsnd->pid != 0)
2918  {
2919  SpinLockRelease(&walsnd->mutex);
2920  continue;
2921  }
2922  else
2923  {
2924  /*
2925  * Found a free slot. Reserve it for us.
2926  */
2927  walsnd->pid = MyProcPid;
2928  walsnd->state = WALSNDSTATE_STARTUP;
2929  walsnd->sentPtr = InvalidXLogRecPtr;
2930  walsnd->needreload = false;
2931  walsnd->write = InvalidXLogRecPtr;
2932  walsnd->flush = InvalidXLogRecPtr;
2933  walsnd->apply = InvalidXLogRecPtr;
2934  walsnd->writeLag = -1;
2935  walsnd->flushLag = -1;
2936  walsnd->applyLag = -1;
2937  walsnd->sync_standby_priority = 0;
2938  walsnd->latch = &MyProc->procLatch;
2939  walsnd->replyTime = 0;
2940 
2941  /*
2942  * The kind assignment is done here and not in StartReplication()
2943  * and StartLogicalReplication(). Indeed, the logical walsender
2944  * needs to read WAL records (like snapshot of running
2945  * transactions) during the slot creation. So it needs to be woken
2946  * up based on its kind.
2947  *
2948  * The kind assignment could also be done in StartReplication(),
2949  * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2950  * seems better to set it on one place.
2951  */
2952  if (MyDatabaseId == InvalidOid)
2953  walsnd->kind = REPLICATION_KIND_PHYSICAL;
2954  else
2955  walsnd->kind = REPLICATION_KIND_LOGICAL;
2956 
2957  SpinLockRelease(&walsnd->mutex);
2958  /* don't need the lock anymore */
2959  MyWalSnd = (WalSnd *) walsnd;
2960 
2961  break;
2962  }
2963  }
2964 
2965  Assert(MyWalSnd != NULL);
2966 
2967  /* Arrange to clean up at walsender exit */
2969 }
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:73
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
Latch procLatch
Definition: proc.h:164
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
Latch * latch
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:121
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2973
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidOid, InvalidXLogRecPtr, WalSnd::kind, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyDatabaseId, MyProc, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4163 of file walsender.c.

4164 {
4165  TimestampTz time = 0;
4166 
4167  /* Read all unread samples up to this LSN or end of buffer. */
4168  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4169  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
4170  {
4171  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
4172  lag_tracker->last_read[head] =
4174  lag_tracker->read_heads[head] =
4176  }
4177 
4178  /*
4179  * If the lag tracker is empty, that means the standby has processed
4180  * everything we've ever sent so we should now clear 'last_read'. If we
4181  * didn't do that, we'd risk using a stale and irrelevant sample for
4182  * interpolation at the beginning of the next burst of WAL after a period
4183  * of idleness.
4184  */
4186  lag_tracker->last_read[head].time = 0;
4187 
4188  if (time > now)
4189  {
4190  /* If the clock somehow went backwards, treat as not found. */
4191  return -1;
4192  }
4193  else if (time == 0)
4194  {
4195  /*
4196  * We didn't cross a time. If there is a future sample that we
4197  * haven't reached yet, and we've already reached at least one sample,
4198  * let's interpolate the local flushed time. This is mainly useful
4199  * for reporting a completely stuck apply position as having
4200  * increasing lag, since otherwise we'd have to wait for it to
4201  * eventually start moving again and cross one of our samples before
4202  * we can show the lag increasing.
4203  */
4205  {
4206  /* There are no future samples, so we can't interpolate. */
4207  return -1;
4208  }
4209  else if (lag_tracker->last_read[head].time != 0)
4210  {
4211  /* We can interpolate between last_read and the next sample. */
4212  double fraction;
4213  WalTimeSample prev = lag_tracker->last_read[head];
4215 
4216  if (lsn < prev.lsn)
4217  {
4218  /*
4219  * Reported LSNs shouldn't normally go backwards, but it's
4220  * possible when there is a timeline change. Treat as not
4221  * found.
4222  */
4223  return -1;
4224  }
4225 
4226  Assert(prev.lsn < next.lsn);
4227 
4228  if (prev.time > next.time)
4229  {
4230  /* If the clock somehow went backwards, treat as not found. */
4231  return -1;
4232  }
4233 
4234  /* See how far we are between the previous and next samples. */
4235  fraction =
4236  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4237 
4238  /* Scale the local flush time proportionally. */
4239  time = (TimestampTz)
4240  ((double) prev.time + (next.time - prev.time) * fraction);
4241  }
4242  else
4243  {
4244  /*
4245  * We have only a future sample, implying that we were entirely
4246  * caught up but and now there is a new burst of WAL and the
4247  * standby hasn't processed the first sample yet. Until the
4248  * standby reaches the future sample the best we can do is report
4249  * the hypothetical lag if that sample were to be replayed now.
4250  */
4251  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
4252  }
4253  }
4254 
4255  /* Return the elapsed time since local flush time in microseconds. */
4256  Assert(time != 0);
4257  return now - time;
4258 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
static int32 next
Definition: blutils.c:222
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:224
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:226
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:227
int write_head
Definition: walsender.c:225
TimestampTz time
Definition: walsender.c:214
XLogRecPtr lsn
Definition: walsender.c:213
static LagTracker * lag_tracker
Definition: walsender.c:230
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:218

References Assert, LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4098 of file walsender.c.

4099 {
4100  bool buffer_full;
4101  int new_write_head;
4102  int i;
4103 
4104  if (!am_walsender)
4105  return;
4106 
4107  /*
4108  * If the lsn hasn't advanced since last time, then do nothing. This way
4109  * we only record a new sample when new WAL has been written.
4110  */
4111  if (lag_tracker->last_lsn == lsn)
4112  return;
4113  lag_tracker->last_lsn = lsn;
4114 
4115  /*
4116  * If advancing the write head of the circular buffer would crash into any
4117  * of the read heads, then the buffer is full. In other words, the
4118  * slowest reader (presumably apply) is the one that controls the release
4119  * of space.
4120  */
4121  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
4122  buffer_full = false;
4123  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4124  {
4125  if (new_write_head == lag_tracker->read_heads[i])
4126  buffer_full = true;
4127  }
4128 
4129  /*
4130  * If the buffer is full, for now we just rewind by one slot and overwrite
4131  * the last sample, as a simple (if somewhat uneven) way to lower the
4132  * sampling rate. There may be better adaptive compaction algorithms.
4133  */
4134  if (buffer_full)
4135  {
4136  new_write_head = lag_tracker->write_head;
4137  if (lag_tracker->write_head > 0)
4139  else
4141  }
4142 
4143  /* Store a sample at the current write head position. */
4145  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
4146  lag_tracker->write_head = new_write_head;
4147 }
XLogRecPtr last_lsn
Definition: walsender.c:223
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 1049 of file walsender.c.

1051 {
1052  XLogRecPtr flushptr;
1053  int count;
1054  WALReadError errinfo;
1055  XLogSegNo segno;
1056  TimeLineID currTLI;
1057 
1058  /*
1059  * Make sure we have enough WAL available before retrieving the current
1060  * timeline.
1061  */
1062  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
1063 
1064  /* Fail if not enough (implies we are going to shut down) */
1065  if (flushptr < targetPagePtr + reqLen)
1066  return -1;
1067 
1068  /*
1069  * Since logical decoding is also permitted on a standby server, we need
1070  * to check if the server is in recovery to decide how to get the current
1071  * timeline ID (so that it also covers the promotion or timeline change
1072  * cases). We must determine am_cascading_walsender after waiting for the
1073  * required WAL so that it is correct when the walsender wakes up after a
1074  * promotion.
1075  */
1077 
1079  GetXLogReplayRecPtr(&currTLI);
1080  else
1081  currTLI = GetWALInsertionTimeLine();
1082 
1083  XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
1084  sendTimeLineIsHistoric = (state->currTLI != currTLI);
1085  sendTimeLine = state->currTLI;
1086  sendTimeLineValidUpto = state->currTLIValidUntil;
1087  sendTimeLineNextTLI = state->nextTLI;
1088 
1089  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1090  count = XLOG_BLCKSZ; /* more than one block available */
1091  else
1092  count = flushptr - targetPagePtr; /* part of the page available */
1093 
1094  /* now actually read the data, we know it's there */
1095  if (!WALRead(state,
1096  cur_page,
1097  targetPagePtr,
1098  count,
1099  currTLI, /* Pass the current TLI because only
1100  * WalSndSegmentOpen controls whether new TLI
1101  * is needed. */
1102  &errinfo))
1103  WALReadRaiseError(&errinfo);
1104 
1105  /*
1106  * After reading into the buffer, check that what we read was valid. We do
1107  * this after reading, because even though the segment was present when we
1108  * opened it, it might get recycled or removed while we read it. The
1109  * read() succeeds in that case, but the data we tried to read might
1110  * already have been overwritten with new WAL records.
1111  */
1112  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1113  CheckXLogRemoved(segno, state->seg.ws_tli);
1114 
1115  return count;
1116 }
Definition: regguts.h:323
static TimeLineID sendTimeLine
Definition: walsender.c:156
static bool sendTimeLineIsHistoric
Definition: walsender.c:158
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1821
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:157
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:159
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6519
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3720
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1503
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:718
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1020

References am_cascading_walsender, CheckXLogRemoved(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1761 of file walsender.c.

1762 {
1763  int elevel = got_STOPPING ? ERROR : WARNING;
1764  bool failover_slot;
1765 
1766  failover_slot = (replication_active && MyReplicationSlot->data.failover);
1767 
1768  /*
1769  * Note that after receiving the shutdown signal, an ERROR is reported if
1770  * any slots are dropped, invalidated, or inactive. This measure is taken
1771  * to prevent the walsender from waiting indefinitely.
1772  */
1773  if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
1774  {
1775  *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
1776  return true;
1777  }
1778 
1779  *wait_event = 0;
1780  return false;
1781 }
#define WARNING
Definition: elog.h:36
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2608

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1793 of file walsender.c.

1795 {
1796  /* Check if we need to wait for WALs to be flushed to disk */
1797  if (target_lsn > flushed_lsn)
1798  {
1799  *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
1800  return true;
1801  }
1802 
1803  /* Check if the standby slots have caught up to the flushed position */
1804  return NeedToWaitForStandbys(flushed_lsn, wait_event);
1805 }
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1761

References NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3850 of file walsender.c.

3851 {
3852  Interval *result = palloc(sizeof(Interval));
3853 
3854  result->month = 0;
3855  result->day = 0;
3856  result->time = offset;
3857 
3858  return result;
3859 }
void * palloc(Size size)
Definition: mcxt.c:1317
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase,
bool failover 
)
static

Definition at line 1122 of file walsender.c.

1126 {
1127  ListCell *lc;
1128  bool snapshot_action_given = false;
1129  bool reserve_wal_given = false;
1130  bool two_phase_given = false;
1131  bool failover_given = false;
1132 
1133  /* Parse options */
1134  foreach(lc, cmd->options)
1135  {
1136  DefElem *defel = (DefElem *) lfirst(lc);
1137 
1138  if (strcmp(defel->defname, "snapshot") == 0)
1139  {
1140  char *action;
1141 
1142  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1143  ereport(ERROR,
1144  (errcode(ERRCODE_SYNTAX_ERROR),
1145  errmsg("conflicting or redundant options")));
1146 
1147  action = defGetString(defel);
1148  snapshot_action_given = true;
1149 
1150  if (strcmp(action, "export") == 0)
1151  *snapshot_action = CRS_EXPORT_SNAPSHOT;
1152  else if (strcmp(action, "nothing") == 0)
1153  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1154  else if (strcmp(action, "use") == 0)
1155  *snapshot_action = CRS_USE_SNAPSHOT;
1156  else
1157  ereport(ERROR,
1158  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1159  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1160  defel->defname, action)));
1161  }
1162  else if (strcmp(defel->defname, "reserve_wal") == 0)
1163  {
1164  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1165  ereport(ERROR,
1166  (errcode(ERRCODE_SYNTAX_ERROR),
1167  errmsg("conflicting or redundant options")));
1168 
1169  reserve_wal_given = true;
1170  *reserve_wal = defGetBoolean(defel);
1171  }
1172  else if (strcmp(defel->defname, "two_phase") == 0)
1173  {
1174  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1175  ereport(ERROR,
1176  (errcode(ERRCODE_SYNTAX_ERROR),
1177  errmsg("conflicting or redundant options")));
1178  two_phase_given = true;
1179  *two_phase = defGetBoolean(defel);
1180  }
1181  else if (strcmp(defel->defname, "failover") == 0)
1182  {
1183  if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1184  ereport(ERROR,
1185  (errcode(ERRCODE_SYNTAX_ERROR),
1186  errmsg("conflicting or redundant options")));
1187  failover_given = true;
1188  *failover = defGetBoolean(defel);
1189  }
1190  else
1191  elog(ERROR, "unrecognized option: %s", defel->defname);
1192  }
1193 }
char * defGetString(DefElem *def)
Definition: define.c:48
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:817
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3866 of file walsender.c.

3867 {
3868 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3869  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3870  SyncRepStandbyData *sync_standbys;
3871  int num_standbys;
3872  int i;
3873 
3874  InitMaterializedSRF(fcinfo, 0);
3875 
3876  /*
3877  * Get the currently active synchronous standbys. This could be out of
3878  * date before we're done, but we'll use the data anyway.
3879  */
3880  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3881 
3882  for (i = 0; i < max_wal_senders; i++)
3883  {
3884  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3885  XLogRecPtr sent_ptr;
3886  XLogRecPtr write;
3887  XLogRecPtr flush;
3888  XLogRecPtr apply;
3889  TimeOffset writeLag;
3890  TimeOffset flushLag;
3891  TimeOffset applyLag;
3892  int priority;
3893  int pid;
3895  TimestampTz replyTime;
3896  bool is_sync_standby;
3898  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3899  int j;
3900 
3901  /* Collect data from shared memory */
3902  SpinLockAcquire(&walsnd->mutex);
3903  if (walsnd->pid == 0)
3904  {
3905  SpinLockRelease(&walsnd->mutex);
3906  continue;
3907  }
3908  pid = walsnd->pid;
3909  sent_ptr = walsnd->sentPtr;
3910  state = walsnd->state;
3911  write = walsnd->write;
3912  flush = walsnd->flush;
3913  apply = walsnd->apply;
3914  writeLag = walsnd->writeLag;
3915  flushLag = walsnd->flushLag;
3916  applyLag = walsnd->applyLag;
3917  priority = walsnd->sync_standby_priority;
3918  replyTime = walsnd->replyTime;
3919  SpinLockRelease(&walsnd->mutex);
3920 
3921  /*
3922  * Detect whether walsender is/was considered synchronous. We can
3923  * provide some protection against stale data by checking the PID
3924  * along with walsnd_index.
3925  */
3926  is_sync_standby = false;
3927  for (j = 0; j < num_standbys; j++)
3928  {
3929  if (sync_standbys[j].walsnd_index == i &&
3930  sync_standbys[j].pid == pid)
3931  {
3932  is_sync_standby = true;
3933  break;
3934  }
3935  }
3936 
3937  values[0] = Int32GetDatum(pid);
3938 
3939  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3940  {
3941  /*
3942  * Only superusers and roles with privileges of pg_read_all_stats
3943  * can see details. Other users only get the pid value to know
3944  * it's a walsender, but no details.
3945  */
3946  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3947  }
3948  else
3949  {
3951 
3952  if (XLogRecPtrIsInvalid(sent_ptr))
3953  nulls[2] = true;
3954  values[2] = LSNGetDatum(sent_ptr);
3955 
3957  nulls[3] = true;
3958  values[3] = LSNGetDatum(write);
3959 
3960  if (XLogRecPtrIsInvalid(flush))
3961  nulls[4] = true;
3962  values[4] = LSNGetDatum(flush);
3963 
3964  if (XLogRecPtrIsInvalid(apply))
3965  nulls[5] = true;
3966  values[5] = LSNGetDatum(apply);
3967 
3968  /*
3969  * Treat a standby such as a pg_basebackup background process
3970  * which always returns an invalid flush location, as an
3971  * asynchronous standby.
3972  */
3973  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3974 
3975  if (writeLag < 0)
3976  nulls[6] = true;
3977  else
3979 
3980  if (flushLag < 0)
3981  nulls[7] = true;
3982  else
3984 
3985  if (applyLag < 0)
3986  nulls[8] = true;
3987  else
3989 
3990  values[9] = Int32GetDatum(priority);
3991 
3992  /*
3993  * More easily understood version of standby state. This is purely
3994  * informational.
3995  *
3996  * In quorum-based sync replication, the role of each standby
3997  * listed in synchronous_standby_names can be changing very
3998  * frequently. Any standbys considered as "sync" at one moment can
3999  * be switched to "potential" ones at the next moment. So, it's
4000  * basically useless to report "sync" or "potential" as their sync
4001  * states. We report just "quorum" for them.
4002  */
4003  if (priority == 0)
4004  values[10] = CStringGetTextDatum("async");
4005  else if (is_sync_standby)
4007  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4008  else
4009  values[10] = CStringGetTextDatum("potential");
4010 
4011  if (replyTime == 0)
4012  nulls[11] = true;
4013  else
4014  values[11] = TimestampTzGetDatum(replyTime);
4015  }
4016 
4017  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4018  values, nulls);
4019  }
4020 
4021  return (Datum) 0;
4022 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5268
#define MemSet(start, val, len)
Definition: c.h:1020
int64 TimeOffset
Definition: timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:74
Oid GetUserId(void)
Definition: miscinit.c:514
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
TupleDesc setDesc
Definition: execnodes.h:342
Tuplestorestate * setResult
Definition: execnodes.h:341
uint8 syncrep_method
Definition: syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:711
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3850
#define PG_STAT_GET_WAL_SENDERS_COLS
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3831
WalSndState
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2357 of file walsender.c.

2358 {
2359  bool changed = false;
2361 
2362  Assert(lsn != InvalidXLogRecPtr);
2363  SpinLockAcquire(&slot->mutex);
2364  if (slot->data.restart_lsn != lsn)
2365  {
2366  changed = true;
2367  slot->data.restart_lsn = lsn;
2368  }
2369  SpinLockRelease(&slot->mutex);
2370 
2371  if (changed)
2372  {
2376  }
2377 
2378  /*
2379  * One could argue that the slot should be saved to disk now, but that'd
2380  * be energy wasted - the worst thing lost information could cause here is
2381  * to give wrong information in a statistics view - we'll just potentially
2382  * be more conservative in removing files.
2383  */
2384 }
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1133
XLogRecPtr restart_lsn
Definition: slot.h:96
slock_t mutex
Definition: slot.h:154
void PhysicalWakeupLogicalWalSnd(void)
Definition: walsender.c:1736

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2495 of file walsender.c.

2496 {
2497  bool changed = false;
2499 
2500  SpinLockAcquire(&slot->mutex);
2502 
2503  /*
2504  * For physical replication we don't need the interlock provided by xmin
2505  * and effective_xmin since the consequences of a missed increase are
2506  * limited to query cancellations, so set both at once.
2507  */
2508  if (!TransactionIdIsNormal(slot->data.xmin) ||
2509  !TransactionIdIsNormal(feedbackXmin) ||
2510  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2511  {
2512  changed = true;
2513  slot->data.xmin = feedbackXmin;
2514  slot->effective_xmin = feedbackXmin;
2515  }
2516  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2517  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2518  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2519  {
2520  changed = true;
2521  slot->data.catalog_xmin = feedbackCatalogXmin;
2522  slot->effective_catalog_xmin = feedbackCatalogXmin;
2523  }
2524  SpinLockRelease(&slot->mutex);
2525 
2526  if (changed)
2527  {
2530  }
2531 }
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1077
TransactionId xmin
Definition: proc.h:172
TransactionId xmin
Definition: slot.h:85
TransactionId catalog_xmin
Definition: slot.h:93
TransactionId effective_catalog_xmin
Definition: slot.h:178
TransactionId effective_xmin
Definition: slot.h:177
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1736 of file walsender.c.

1737 {
1739 
1740  /*
1741  * If we are running in a standby, there is no need to wake up walsenders.
1742  * This is because we do not support syncing slots to cascading standbys,
1743  * so, there are no walsenders waiting for standbys to catch up.
1744  */
1745  if (RecoveryInProgress())
1746  return;
1747 
1750 }
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2575
#define SlotIsPhysical(slot)
Definition: slot.h:212
ConditionVariable wal_confirm_rcv_cv

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1617 of file walsender.c.

1618 {
1619  for (;;)
1620  {
1621  long sleeptime;
1622 
1623  /* Check for input from the client */
1625 
1626  /* die if timeout was reached */
1628 
1629  /* Send keepalive if the time has come */
1631 
1632  if (!pq_is_send_pending())
1633  break;
1634 
1636 
1637  /* Sleep until something happens or we time out */
1639  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1640 
1641  /* Clear any already-pending wakeups */
1643 
1645 
1646  /* Process any requests or signals received recently */
1647  if (ConfigReloadPending)
1648  {
1649  ConfigReloadPending = false;
1652  }
1653 
1654  /* Try to flush pending output to the client */
1655  if (pq_flush_if_writable() != 0)
1656  WalSndShutdown();
1657  }
1658 
1659  /* reactivate latch so WalSndLoop knows to continue */
1660  SetLatch(MyLatch);
1661 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
struct Latch * MyLatch
Definition: globals.c:62
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:632
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_SOCKET_WRITEABLE
Definition: latch.h:129
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:402
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3690
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2743
static void ProcessRepliesIfAny(void)
Definition: walsender.c:2213
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:4060
static void WalSndShutdown(void)
Definition: walsender.c:240
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2699

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2213 of file walsender.c.

2214 {
2215  unsigned char firstchar;
2216  int maxmsglen;
2217  int r;
2218  bool received = false;
2219 
2221 
2222  /*
2223  * If we already received a CopyDone from the frontend, any subsequent
2224  * message is the beginning of a new command, and should be processed in
2225  * the main processing loop.
2226  */
2227  while (!streamingDoneReceiving)
2228  {
2229  pq_startmsgread();
2230  r = pq_getbyte_if_available(&firstchar);
2231  if (r < 0)
2232  {
2233  /* unexpected error or EOF */
2235  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2236  errmsg("unexpected EOF on standby connection")));
2237  proc_exit(0);
2238  }
2239  if (r == 0)
2240  {
2241  /* no data available without blocking */
2242  pq_endmsgread();
2243  break;
2244  }
2245 
2246  /* Validate message type and set packet size limit */
2247  switch (firstchar)
2248  {
2249  case PqMsg_CopyData:
2250  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
2251  break;
2252  case PqMsg_CopyDone:
2253  case PqMsg_Terminate:
2254  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
2255  break;
2256  default:
2257  ereport(FATAL,
2258  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2259  errmsg("invalid standby message type \"%c\"",
2260  firstchar)));
2261  maxmsglen = 0; /* keep compiler quiet */
2262  break;
2263  }
2264 
2265  /* Read the message contents */
2267  if (pq_getmessage(&reply_message, maxmsglen))
2268  {
2270  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2271  errmsg("unexpected EOF on standby connection")));
2272  proc_exit(0);
2273  }
2274 
2275  /* ... and process it */
2276  switch (firstchar)
2277  {
2278  /*
2279  * 'd' means a standby reply wrapped in a CopyData packet.
2280  */
2281  case PqMsg_CopyData:
2283  received = true;
2284  break;
2285 
2286  /*
2287  * CopyDone means the standby requested to finish streaming.
2288  * Reply with CopyDone, if we had not sent that already.
2289  */
2290  case PqMsg_CopyDone:
2291  if (!streamingDoneSending)
2292  {
2293  pq_putmessage_noblock('c', NULL, 0);
2294  streamingDoneSending = true;
2295  }
2296 
2297  streamingDoneReceiving = true;
2298  received = true;
2299  break;
2300 
2301  /*
2302  * 'X' means that the standby is closing down the socket.
2303  */
2304  case PqMsg_Terminate:
2305  proc_exit(0);
2306 
2307  default:
2308  Assert(false); /* NOT REACHED */
2309  }
2310  }
2311 
2312  /*
2313  * Save the last reply timestamp if we've received at least one reply.
2314  */
2315  if (received)
2316  {
2318  waiting_for_ping_response = false;
2319  }
2320 }
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
void proc_exit(int code)
Definition: ipc.c:104
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1004
void pq_endmsgread(void)
Definition: pqcomm.c:1165
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_Terminate
Definition: protocol.h:28
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
static bool waiting_for_ping_response
Definition: walsender.c:182
static TimestampTz last_processing
Definition: walsender.c:173
static bool streamingDoneSending
Definition: walsender.c:190
static void ProcessStandbyMessage(void)
Definition: walsender.c:2326
static bool streamingDoneReceiving
Definition: walsender.c:191

References Assert, COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2575 of file walsender.c.

2576 {
2577  TransactionId feedbackXmin;
2578  uint32 feedbackEpoch;
2579  TransactionId feedbackCatalogXmin;
2580  uint32 feedbackCatalogEpoch;
2581  TimestampTz replyTime;
2582 
2583  /*
2584  * Decipher the reply message. The caller already consumed the msgtype
2585  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2586  * of this message.
2587  */
2588  replyTime = pq_getmsgint64(&reply_message);
2589  feedbackXmin = pq_getmsgint(&reply_message, 4);
2590  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2591  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2592  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2593 
2595  {
2596  char *replyTimeStr;
2597 
2598  /* Copy because timestamptz_to_str returns a static buffer */
2599  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2600 
2601  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2602  feedbackXmin,
2603  feedbackEpoch,
2604  feedbackCatalogXmin,
2605  feedbackCatalogEpoch,
2606  replyTimeStr);
2607 
2608  pfree(replyTimeStr);
2609  }
2610 
2611  /*
2612  * Update shared state for this WalSender process based on reply data from
2613  * standby.
2614  */
2615  {
2616  WalSnd *walsnd = MyWalSnd;
2617 
2618  SpinLockAcquire(&walsnd->mutex);
2619  walsnd->replyTime = replyTime;
2620  SpinLockRelease(&walsnd->mutex);
2621  }
2622 
2623  /*
2624  * Unset WalSender's xmins if the feedback message values are invalid.
2625  * This happens when the downstream turned hot_standby_feedback off.
2626  */
2627  if (!TransactionIdIsNormal(feedbackXmin)
2628  && !TransactionIdIsNormal(feedbackCatalogXmin))
2629  {
2631  if (MyReplicationSlot != NULL)
2632  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2633  return;
2634  }
2635 
2636  /*
2637  * Check that the provided xmin/epoch are sane, that is, not in the future
2638  * and not so far back as to be already wrapped around. Ignore if not.
2639  */
2640  if (TransactionIdIsNormal(feedbackXmin) &&
2641  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2642  return;
2643 
2644  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2645  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2646  return;
2647 
2648  /*
2649  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2650  * the xmin will be taken into account by GetSnapshotData() /
2651  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2652  * thereby prevent the generation of cleanup conflicts on the standby
2653  * server.
2654  *
2655  * There is a small window for a race condition here: although we just
2656  * checked that feedbackXmin precedes nextXid, the nextXid could have
2657  * gotten advanced between our fetching it and applying the xmin below,
2658  * perhaps far enough to make feedbackXmin wrap around. In that case the
2659  * xmin we set here would be "in the future" and have no effect. No point
2660  * in worrying about this since it's too late to save the desired data
2661  * anyway. Assuming that the standby sends us an increasing sequence of
2662  * xmins, this could only happen during the first reply cycle, else our
2663  * own xmin would prevent nextXid from advancing so far.
2664  *
2665  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2666  * is assumed atomic, and there's no real need to prevent concurrent
2667  * horizon determinations. (If we're moving our xmin forward, this is
2668  * obviously safe, and if we're moving it backwards, well, the data is at
2669  * risk already since a VACUUM could already have determined the horizon.)
2670  *
2671  * If we're using a replication slot we reserve the xmin via that,
2672  * otherwise via the walsender's PGPROC entry. We can only track the
2673  * catalog xmin separately when using a slot, so we store the least of the
2674  * two provided when not using a slot.
2675  *
2676  * XXX: It might make sense to generalize the ephemeral slot concept and
2677  * always use the slot mechanism to handle the feedback xmin.
2678  */
2679  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2680  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2681  else
2682  {
2683  if (TransactionIdIsNormal(feedbackCatalogXmin)
2684  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2685  MyProc->xmin = feedbackCatalogXmin;
2686  else
2687  MyProc->xmin = feedbackXmin;
2688  }
2689 }
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1843
unsigned int uint32
Definition: c.h:506
uint32 TransactionId
Definition: c.h:652
bool message_level_is_interesting(int elevel)
Definition: elog.c:272
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2495
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2544

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2326 of file walsender.c.

2327 {
2328  char msgtype;
2329 
2330  /*
2331  * Check message type from the first byte.
2332  */
2333  msgtype = pq_getmsgbyte(&reply_message);
2334 
2335  switch (msgtype)
2336  {
2337  case 'r':
2339  break;
2340 
2341  case 'h':
2343  break;
2344 
2345  default:
2347  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2348  errmsg("unexpected message type \"%c\"", msgtype)));
2349  proc_exit(0);
2350  }
2351 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2575
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2390

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2390 of file walsender.c.

2391 {
2392  XLogRecPtr writePtr,
2393  flushPtr,
2394  applyPtr;
2395  bool replyRequested;
2396  TimeOffset writeLag,
2397  flushLag,
2398  applyLag;
2399  bool clearLagTimes;
2400  TimestampTz now;
2401  TimestampTz replyTime;
2402 
2403  static bool fullyAppliedLastTime = false;
2404 
2405  /* the caller already consumed the msgtype byte */
2406  writePtr = pq_getmsgint64(&reply_message);
2407  flushPtr = pq_getmsgint64(&reply_message);
2408  applyPtr = pq_getmsgint64(&reply_message);
2409  replyTime = pq_getmsgint64(&reply_message);
2410  replyRequested = pq_getmsgbyte(&reply_message);
2411 
2413  {
2414  char *replyTimeStr;
2415 
2416  /* Copy because timestamptz_to_str returns a static buffer */
2417  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2418 
2419  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2420  LSN_FORMAT_ARGS(writePtr),
2421  LSN_FORMAT_ARGS(flushPtr),
2422  LSN_FORMAT_ARGS(applyPtr),
2423  replyRequested ? " (reply requested)" : "",
2424  replyTimeStr);
2425 
2426  pfree(replyTimeStr);
2427  }
2428 
2429  /* See if we can compute the round-trip lag for these positions. */
2431  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2432  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2433  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2434 
2435  /*
2436  * If the standby reports that it has fully replayed the WAL in two
2437  * consecutive reply messages, then the second such message must result
2438  * from wal_receiver_status_interval expiring on the standby. This is a
2439  * convenient time to forget the lag times measured when it last
2440  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2441  * until more WAL traffic arrives.
2442  */
2443  clearLagTimes = false;
2444  if (applyPtr == sentPtr)
2445  {
2446  if (fullyAppliedLastTime)
2447  clearLagTimes = true;
2448  fullyAppliedLastTime = true;
2449  }
2450  else
2451  fullyAppliedLastTime = false;
2452 
2453  /* Send a reply if the standby requested one. */
2454  if (replyRequested)
2456 
2457  /*
2458  * Update shared state for this WalSender process based on reply data from
2459  * standby.
2460  */
2461  {
2462  WalSnd *walsnd = MyWalSnd;
2463 
2464  SpinLockAcquire(&walsnd->mutex);
2465  walsnd->write = writePtr;
2466  walsnd->flush = flushPtr;
2467  walsnd->apply = applyPtr;
2468  if (writeLag != -1 || clearLagTimes)
2469  walsnd->writeLag = writeLag;
2470  if (flushLag != -1 || clearLagTimes)
2471  walsnd->flushLag = flushLag;
2472  if (applyLag != -1 || clearLagTimes)
2473  walsnd->applyLag = applyLag;
2474  walsnd->replyTime = replyTime;
2475  SpinLockRelease(&walsnd->mutex);
2476  }
2477 
2480 
2481  /*
2482  * Advance our local xmin horizon when the client confirmed a flush.
2483  */
2484  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2485  {
2488  else
2490  }
2491 }
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1835
#define SlotIsLogical(slot)
Definition: slot.h:213
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:431
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
static XLogRecPtr sentPtr
Definition: walsender.c:165
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2357
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:4037
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:4163

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 488 of file walsender.c.

489 {
490 #define READ_REPLICATION_SLOT_COLS 3
491  ReplicationSlot *slot;
493  TupOutputState *tstate;
494  TupleDesc tupdesc;
496  bool nulls[READ_REPLICATION_SLOT_COLS];
497 
499  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
500  TEXTOID, -1, 0);
501  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
502  TEXTOID, -1, 0);
503  /* TimeLineID is unsigned, so int4 is not wide enough. */
504  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
505  INT8OID, -1, 0);
506 
507  memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
508 
509  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
510  slot = SearchNamedReplicationSlot(cmd->slotname, false);
511  if (slot == NULL || !slot->in_use)
512  {
513  LWLockRelease(ReplicationSlotControlLock);
514  }
515  else
516  {
517  ReplicationSlot slot_contents;
518  int i = 0;
519 
520  /* Copy slot contents while holding spinlock */
521  SpinLockAcquire(&slot->mutex);
522  slot_contents = *slot;
523  SpinLockRelease(&slot->mutex);
524  LWLockRelease(ReplicationSlotControlLock);
525 
526  if (OidIsValid(slot_contents.data.database))
527  ereport(ERROR,
528  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
529  errmsg("cannot use %s with a logical replication slot",
530  "READ_REPLICATION_SLOT"));
531 
532  /* slot type */
533  values[i] = CStringGetTextDatum("physical");
534  nulls[i] = false;
535  i++;
536 
537  /* start LSN */
538  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
539  {
540  char xloc[64];
541 
542  snprintf(xloc, sizeof(xloc), "%X/%X",
543  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
544  values[i] = CStringGetTextDatum(xloc);
545  nulls[i] = false;
546  }
547  i++;
548 
549  /* timeline this WAL was produced on */
550  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
551  {
552  TimeLineID slots_position_timeline;
553  TimeLineID current_timeline;
554  List *timeline_history = NIL;
555 
556  /*
557  * While in recovery, use as timeline the currently-replaying one
558  * to get the LSN position's history.
559  */
560  if (RecoveryInProgress())
561  (void) GetXLogReplayRecPtr(&current_timeline);
562  else
563  current_timeline = GetWALInsertionTimeLine();
564 
565  timeline_history = readTimeLineHistory(current_timeline);
566  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
567  timeline_history);
568  values[i] = Int64GetDatum((int64) slots_position_timeline);
569  nulls[i] = false;
570  }
571  i++;
572 
574  }
575 
577  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
578  do_tup_output(tstate, values, nulls);
579  end_tup_output(tstate);
580 }
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
#define OidIsValid(objectId)
Definition: c.h:775
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:115
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:464
Definition: pg_list.h:54
bool in_use
Definition: slot.h:157
#define READ_REPLICATION_SLOT_COLS

References Assert, begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 587 of file walsender.c.

588 {
590  TupleDesc tupdesc;
592  char histfname[MAXFNAMELEN];
593  char path[MAXPGPATH];
594  int fd;
595  off_t histfilelen;
596  off_t bytesleft;
597  Size len;
598 
600 
601  /*
602  * Reply with a result set with one row, and two columns. The first col is
603  * the name of the history file, 2nd is the contents.
604  */
605  tupdesc = CreateTemplateTupleDesc(2);
606  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
607  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
608 
609  TLHistoryFileName(histfname, cmd->timeline);
610  TLHistoryFilePath(path, cmd->timeline);
611 
612  /* Send a RowDescription message */
613  dest->rStartup(dest, CMD_SELECT, tupdesc);
614 
615  /* Send a DataRow message */
617  pq_sendint16(&buf, 2); /* # of columns */
618  len = strlen(histfname);
619  pq_sendint32(&buf, len); /* col1 len */
620  pq_sendbytes(&buf, histfname, len);
621 
622  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
623  if (fd < 0)
624  ereport(ERROR,
626  errmsg("could not open file \"%s\": %m", path)));
627 
628  /* Determine file length and send it to client */
629  histfilelen = lseek(fd, 0, SEEK_END);
630  if (histfilelen < 0)
631  ereport(ERROR,
633  errmsg("could not seek to end of file \"%s\": %m", path)));
634  if (lseek(fd, 0, SEEK_SET) != 0)
635  ereport(ERROR,
637  errmsg("could not seek to beginning of file \"%s\": %m", path)));
638 
639  pq_sendint32(&buf, histfilelen); /* col2 len */
640 
641  bytesleft = histfilelen;
642  while (bytesleft > 0)
643  {
644  PGAlignedBlock rbuf;
645  int nread;
646 
647  pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
648  nread = read(fd, rbuf.data, sizeof(rbuf));
650  if (nread < 0)
651  ereport(ERROR,
653  errmsg("could not read file \"%s\": %m",
654  path)));
655  else if (nread == 0)
656  ereport(ERROR,
658  errmsg("could not read file \"%s\": read %d of %zu",
659  path, nread, (Size) bytesleft)));
660 
661  pq_sendbytes(&buf, rbuf.data, nread);
662  bytesleft -= nread;
663  }
664 
665  if (CloseTransientFile(fd) != 0)
666  ereport(ERROR,
668  errmsg("could not close file \"%s\": %m", path)));
669 
670  pq_endmessage(&buf);
671 }
#define PG_BINARY
Definition: c.h:1273
size_t Size
Definition: c.h:605
int errcode_for_file_access(void)
Definition: elog.c:876
int CloseTransientFile(int fd)
Definition: fd.c:2832
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2656
#define read(a, b, c)
Definition: win32.h:13
@ CMD_SELECT
Definition: nodes.h:265
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PqMsg_DataRow
Definition: protocol.h:43
TimeLineID timeline
Definition: replnodes.h:120
char data[BLCKSZ]
Definition: c.h:1119
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), PGAlignedBlock::data, generate_unaccent_rules::dest, DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1455 of file walsender.c.

1456 {
1458  QueryCompletion qc;
1459 
1460  /* make sure that our requirements are still fulfilled */
1462 
1464 
1465  ReplicationSlotAcquire(cmd->slotname, true);
1466 
1467  /*
1468  * Force a disconnect, so that the decoding code doesn't need to care
1469  * about an eventual switch from running in recovery, to running in a
1470  * normal environment. Client code is expected to handle reconnects.
1471  */
1473  {
1474  ereport(LOG,
1475  (errmsg("terminating walsender process after promotion")));
1476  got_STOPPING = true;
1477  }
1478 
1479  /*
1480  * Create our decoding context, making it start at the previously ack'ed
1481  * position.
1482  *
1483  * Do this before sending a CopyBothResponse message, so that any errors
1484  * are reported early.
1485  */
1487  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1488  XL_ROUTINE(.page_read = logical_read_xlog_page,
1489  .segment_open = WalSndSegmentOpen,
1490  .segment_close = wal_segment_close),
1494 
1496 
1497  /* Send a CopyBothResponse message, and start streaming */
1499  pq_sendbyte(&buf, 0);
1500  pq_sendint16(&buf, 0);
1501  pq_endmessage(&buf);
1502  pq_flush();
1503 
1504  /* Start reading WAL from the oldest required WAL. */
1507 
1508  /*
1509  * Report the location after which we'll send out further commits as the
1510  * current sentPtr.
1511  */
1513 
1514  /* Also update the sent position status in shared memory */
1518 
1519  replication_active = true;
1520 
1522 
1523  /* Main loop of walsender */
1525 
1528 
1529  replication_active = false;
1530  if (got_STOPPING)
1531  proc_exit(0);
1533 
1534  /* Get out of COPY mode (CommandComplete). */
1535  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1536  EndCommand(&qc, DestRemote, false);
1537 }
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:169
@ DestRemote
Definition: dest.h:89
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:496
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:540
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:97
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2770
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:208
static void XLogSendLogical(void)
Definition: walsender.c:3379
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:231

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 817 of file walsender.c.

818 {
820  XLogRecPtr FlushPtr;
821  TimeLineID FlushTLI;
822 
823  /* create xlogreader for physical replication */
824  xlogreader =
826  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
827  .segment_close = wal_segment_close),
828  NULL);
829 
830  if (!xlogreader)
831  ereport(ERROR,
832  (errcode(ERRCODE_OUT_OF_MEMORY),
833  errmsg("out of memory"),
834  errdetail("Failed while allocating a WAL reading processor.")));
835 
836  /*
837  * We assume here that we're logging enough information in the WAL for
838  * log-shipping, since this is checked in PostmasterMain().
839  *
840  * NOTE: wal_level can only change at shutdown, so in most cases it is
841  * difficult for there to be WAL data that we can still see that was
842  * written at wal_level='minimal'.
843  */
844 
845  if (cmd->slotname)
846  {
847  ReplicationSlotAcquire(cmd->slotname, true);
849  ereport(ERROR,
850  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
851  errmsg("cannot use a logical replication slot for physical replication")));
852 
853  /*
854  * We don't need to verify the slot's restart_lsn here; instead we
855  * rely on the caller requesting the starting point to use. If the
856  * WAL segment doesn't exist, we'll fail later.
857  */
858  }
859 
860  /*
861  * Select the timeline. If it was given explicitly by the client, use
862  * that. Otherwise use the timeline of the last replayed record.
863  */
866  FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
867  else
868  FlushPtr = GetFlushRecPtr(&FlushTLI);
869 
870  if (cmd->timeline != 0)
871  {
872  XLogRecPtr switchpoint;
873 
874  sendTimeLine = cmd->timeline;
875  if (sendTimeLine == FlushTLI)
876  {
877  sendTimeLineIsHistoric = false;
879  }
880  else
881  {
882  List *timeLineHistory;
883 
884  sendTimeLineIsHistoric = true;
885 
886  /*
887  * Check that the timeline the client requested exists, and the
888  * requested start location is on that timeline.
889  */
890  timeLineHistory = readTimeLineHistory(FlushTLI);
891  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
893  list_free_deep(timeLineHistory);
894 
895  /*
896  * Found the requested timeline in the history. Check that
897  * requested startpoint is on that timeline in our history.
898  *
899  * This is quite loose on purpose. We only check that we didn't
900  * fork off the requested timeline before the switchpoint. We
901  * don't check that we switched *to* it before the requested
902  * starting point. This is because the client can legitimately
903  * request to start replication from the beginning of the WAL
904  * segment that contains switchpoint, but on the new timeline, so
905  * that it doesn't end up with a partial segment. If you ask for
906  * too old a starting point, you'll get an error later when we
907  * fail to find the requested WAL segment in pg_wal.
908  *
909  * XXX: we could be more strict here and only allow a startpoint
910  * that's older than the switchpoint, if it's still in the same
911  * WAL segment.
912  */
913  if (!XLogRecPtrIsInvalid(switchpoint) &&
914  switchpoint < cmd->startpoint)
915  {
916  ereport(ERROR,
917  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
919  cmd->timeline),
920  errdetail("This server's history forked from timeline %u at %X/%X.",
921  cmd->timeline,
922  LSN_FORMAT_ARGS(switchpoint))));
923  }
924  sendTimeLineValidUpto = switchpoint;
925  }
926  }
927  else
928  {
929  sendTimeLine = FlushTLI;
931  sendTimeLineIsHistoric = false;
932  }
933 
935 
936  /* If there is nothing to stream, don't even enter COPY mode */
938  {
939  /*
940  * When we first start replication the standby will be behind the
941  * primary. For some applications, for example synchronous
942  * replication, it is important to have a clear state for this initial
943  * catchup mode, so we can trigger actions when we change streaming
944  * state later. We may stay in this state for a long time, which is
945  * exactly why we want to be able to monitor whether or not we are
946  * still here.
947  */
949 
950  /* Send a CopyBothResponse message, and start streaming */
952  pq_sendbyte(&buf, 0);
953  pq_sendint16(&buf, 0);
954  pq_endmessage(&buf);
955  pq_flush();
956 
957  /*
958  * Don't allow a request to stream from a future point in WAL that
959  * hasn't been flushed to disk in this server yet.
960  */
961  if (FlushPtr < cmd->startpoint)
962  {
963  ereport(ERROR,
964  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
966  LSN_FORMAT_ARGS(FlushPtr))));
967  }
968 
969  /* Start streaming from the requested point */
970  sentPtr = cmd->startpoint;
971 
972  /* Initialize shared memory status, too */
976 
978 
979  /* Main loop of walsender */
980  replication_active = true;
981 
983 
984  replication_active = false;
985  if (got_STOPPING)
986  proc_exit(0);
988 
990  }
991 
992  if (cmd->slotname)
994 
995  /*
996  * Copy is finished now. Send a single-row result set indicating the next
997  * timeline.
998  */
1000  {
1001  char startpos_str[8 + 1 + 8 + 1];
1002  DestReceiver *dest;
1003  TupOutputState *tstate;
1004  TupleDesc tupdesc;
1005  Datum values[2];
1006  bool nulls[2] = {0};
1007 
1008  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
1010 
1012 
1013  /*
1014  * Need a tuple descriptor representing two columns. int8 may seem
1015  * like a surprising data type for this, but in theory int4 would not
1016  * be wide enough for this, as TimeLineID is unsigned.
1017  */
1018  tupdesc = CreateTemplateTupleDesc(2);
1019  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1020  INT8OID, -1, 0);
1021  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1022  TEXTOID, -1, 0);
1023 
1024  /* prepare for projection of tuple */
1025  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1026 
1028  values[1] = CStringGetTextDatum(startpos_str);
1029 
1030  /* send it to dest */
1031  do_tup_output(tstate, values, nulls);
1032 
1033  end_tup_output(tstate);
1034  }
1035 
1036  /* Send CommandComplete message */
1037  EndReplicationCommand("START_STREAMING");
1038 }
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
int errdetail(const char *fmt,...)
Definition: elog.c:1203
void list_free_deep(List *list)
Definition: list.c:1560
TimeLineID timeline
Definition: replnodes.h:96
static void XLogSendPhysical(void)
Definition: walsender.c:3069
int wal_segment_size
Definition: xlog.c:142
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:106

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2544 of file walsender.c.

2545 {
2546  FullTransactionId nextFullXid;
2547  TransactionId nextXid;
2548  uint32 nextEpoch;
2549 
2550  nextFullXid = ReadNextFullTransactionId();
2551  nextXid = XidFromFullTransactionId(nextFullXid);
2552  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2553 
2554  if (xid <= nextXid)
2555  {
2556  if (epoch != nextEpoch)
2557  return false;
2558  }
2559  else
2560  {
2561  if (epoch + 1 != nextEpoch)
2562  return false;
2563  }
2564 
2565  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2566  return false; /* epoch OK, but it's wrapped around */
2567 
2568  return true;
2569 }
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 677 of file walsender.c.

678 {
679  MemoryContext mcxt;
681  off_t offset = 0;
683 
684  /*
685  * parsing the manifest will use the cryptohash stuff, which requires a
686  * resource owner
687  */
688  Assert(CurrentResourceOwner == NULL);
689  CurrentResourceOwner = ResourceOwnerCreate(NULL, "base backup");
690 
691  /* Prepare to read manifest data into a temporary context. */
693  "incremental backup information",
695  ib = CreateIncrementalBackupInfo(mcxt);
696 
697  /* Send a CopyInResponse message */
699  pq_sendbyte(&buf, 0);
700  pq_sendint16(&buf, 0);
702  pq_flush();
703 
704  /* Receive packets from client until done. */
705  while (HandleUploadManifestPacket(&buf, &offset, ib))
706  ;
707 
708  /* Finish up manifest processing. */
710 
711  /*
712  * Discard any old manifest information and arrange to preserve the new
713  * information we just got.
714  *
715  * We assume that MemoryContextDelete and MemoryContextSetParent won't
716  * fail, and thus we shouldn't end up bailing out of here in such a way as
717  * to leave dangling pointers.
718  */
719  if (uploaded_manifest_mcxt != NULL)
722  uploaded_manifest = ib;
723  uploaded_manifest_mcxt = mcxt;
724 
725  /* clean up the resource owner we created */
726  WalSndResourceCleanup(true);
727 }
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition: mcxt.c:637
MemoryContext CacheMemoryContext
Definition: mcxt.c:152
void pq_endmessage_reuse(StringInfo buf)
Definition: pqformat.c:314
#define PqMsg_CopyInResponse
Definition: protocol.h:45
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:413
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition: walsender.c:741
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:362
static MemoryContext uploaded_manifest_mcxt
Definition: walsender.c:148

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ResourceOwnerCreate(), uploaded_manifest, uploaded_manifest_mcxt, and WalSndResourceCleanup().

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2743 of file walsender.c.

2744 {
2745  TimestampTz timeout;
2746 
2747  /* don't bail out if we're doing something that doesn't require timeouts */
2748  if (last_reply_timestamp <= 0)
2749  return;
2750 
2753 
2754  if (wal_sender_timeout > 0 && last_processing >= timeout)
2755  {
2756  /*
2757  * Since typically expiration of replication timeout means
2758  * communication problem, we don't send the error message to the
2759  * standby.
2760  */
2762  (errmsg("terminating walsender process due to replication timeout")));
2763 
2764  WalSndShutdown();
2765  }
2766 }
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_sender_timeout
Definition: walsender.c:123

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2699 of file walsender.c.

2700 {
2701  long sleeptime = 10000; /* 10 s */
2702 
2704  {
2705  TimestampTz wakeup_time;
2706 
2707  /*
2708  * At the latest stop sleeping once wal_sender_timeout has been
2709  * reached.
2710  */
2713 
2714  /*
2715  * If no ping has been sent yet, wakeup when it's time to do so.
2716  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2717  * the timeout passed without a response.
2718  */
2721  wal_sender_timeout / 2);
2722 
2723  /* Compute relative time until wakeup. */
2724  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2725  }
2726 
2727  return sleeptime;
2728 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3464 of file walsender.c.

3465 {
3466  XLogRecPtr replicatedPtr;
3467 
3468  /* ... let's just be real sure we're caught up ... */
3469  send_data();
3470 
3471  /*
3472  * To figure out whether all WAL has successfully been replicated, check
3473  * flush location if valid, write otherwise. Tools like pg_receivewal will
3474  * usually (unless in synchronous mode) return an invalid flush location.
3475  */
3476  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3478 
3479  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3480  !pq_is_send_pending())
3481  {
3482  QueryCompletion qc;
3483 
3484  /* Inform the standby that XLOG streaming is done */
3485  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3486  EndCommand(&qc, DestRemote, false);
3487  pq_flush();
3488 
3489  proc_exit(0);
3490  }
3493 }
static bool WalSndCaughtUp
Definition: walsender.c:194

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 327 of file walsender.c.

328 {
332 
333  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
335 
336  if (MyReplicationSlot != NULL)
338 
339  ReplicationSlotCleanup(false);
340 
341  replication_active = false;
342 
343  /*
344  * If there is a transaction in progress, it will clean up our
345  * ResourceOwner, but if a replication command set up a resource owner
346  * without a transaction, we've got to clean that up now.
347  */
349  WalSndResourceCleanup(false);
350 
351  if (got_STOPPING || got_SIGUSR2)
352  proc_exit(0);
353 
354  /* Revert back to startup state */
356 }
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1876
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
WALOpenSegment seg
Definition: xlogreader.h:272
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:197
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4982

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3831 of file walsender.c.

3832 {
3833  switch (state)
3834  {
3835  case WALSNDSTATE_STARTUP:
3836  return "startup";
3837  case WALSNDSTATE_BACKUP:
3838  return "backup";
3839  case WALSNDSTATE_CATCHUP:
3840  return "catchup";
3841  case WALSNDSTATE_STREAMING:
3842  return "streaming";
3843  case WALSNDSTATE_STOPPING:
3844  return "stopping";
3845  }
3846  return "UNKNOWN";
3847 }
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3748 of file walsender.c.

3749 {
3750  int i;
3751 
3752  for (i = 0; i < max_wal_senders; i++)
3753  {
3754  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3755  pid_t pid;
3756 
3757  SpinLockAcquire(&walsnd->mutex);
3758  pid = walsnd->pid;
3759  SpinLockRelease(&walsnd->mutex);
3760 
3761  if (pid == 0)
3762  continue;
3763 
3765  }
3766 }
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:281
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 4037 of file walsender.c.

4038 {
4039  elog(DEBUG2, "sending replication keepalive");
4040 
4041  /* construct the message... */
4043  pq_sendbyte(&output_message, 'k');
4044  pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
4046  pq_sendbyte(&output_message, requestReply ? 1 : 0);
4047 
4048  /* ... and send it wrapped in CopyData */
4050 
4051  /* Set local flag */
4052  if (requestReply)
4054 }
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4060 of file walsender.c.

4061 {
4062  TimestampTz ping_time;
4063 
4064  /*
4065  * Don't send keepalive messages if timeouts are globally disabled or
4066  * we're doing something not partaking in timeouts.
4067  */
4068  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
4069  return;
4070 
4072  return;
4073 
4074  /*
4075  * If half of wal_sender_timeout has lapsed without receiving any reply
4076  * from the standby, send a keep-alive message to the standby requesting
4077  * an immediate reply.
4078  */
4080  wal_sender_timeout / 2);
4081  if (last_processing >= ping_time)
4082  {
4084 
4085  /* Try to flush pending output to the client */
4086  if (pq_flush_if_writable() != 0)
4087  WalSndShutdown();
4088  }
4089 }

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2973 of file walsender.c.

2974 {
2975  WalSnd *walsnd = MyWalSnd;
2976 
2977  Assert(walsnd != NULL);
2978 
2979  MyWalSnd = NULL;
2980 
2981  SpinLockAcquire(&walsnd->mutex);
2982  /* clear latch while holding the spinlock, so it can safely be read */
2983  walsnd->latch = NULL;
2984  /* Mark WalSnd struct as no longer being in use. */
2985  walsnd->pid = 0;
2986  SpinLockRelease(&walsnd->mutex);
2987 }

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3585 of file walsender.c.

3586 {
3587  got_SIGUSR2 = true;
3588  SetLatch(MyLatch);
3589 }

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2770 of file walsender.c.

2771 {
2772  /*
2773  * Initialize the last reply timestamp. That enables timeout processing
2774  * from hereon.
2775  */
2777  waiting_for_ping_response = false;
2778 
2779  /*
2780  * Loop until we reach the end of this timeline or the client requests to
2781  * stop streaming.
2782  */
2783  for (;;)
2784  {
2785  /* Clear any already-pending wakeups */
2787 
2789 
2790  /* Process any requests or signals received recently */
2791  if (ConfigReloadPending)
2792  {
2793  ConfigReloadPending = false;
2796  }
2797 
2798  /* Check for input from the client */
2800 
2801  /*
2802  * If we have received CopyDone from the client, sent CopyDone
2803  * ourselves, and the output buffer is empty, it's time to exit
2804  * streaming.
2805  */
2807  !pq_is_send_pending())
2808  break;
2809 
2810  /*
2811  * If we don't have any pending data in the output buffer, try to send
2812  * some more. If there is some, we don't bother to call send_data
2813  * again until we've flushed it ... but we'd better assume we are not
2814  * caught up.
2815  */
2816  if (!pq_is_send_pending())
2817  send_data();
2818  else
2819  WalSndCaughtUp = false;
2820 
2821  /* Try to flush pending output to the client */
2822  if (pq_flush_if_writable() != 0)
2823  WalSndShutdown();
2824 
2825  /* If nothing remains to be sent right now ... */
2827  {
2828  /*
2829  * If we're in catchup state, move to streaming. This is an
2830  * important state change for users to know about, since before
2831  * this point data loss might occur if the primary dies and we
2832  * need to failover to the standby. The state change is also
2833  * important for synchronous replication, since commits that
2834  * started to wait at that point might wait for some time.
2835  */
2837  {
2838  ereport(DEBUG1,
2839  (errmsg_internal("\"%s\" has now caught up with upstream server",
2840  application_name)));
2842  }
2843 
2844  /*
2845  * When SIGUSR2 arrives, we send any outstanding logs up to the
2846  * shutdown checkpoint record (i.e., the latest record), wait for
2847  * them to be replicated to the standby, and exit. This may be a
2848  * normal termination at shutdown, or a promotion, the walsender
2849  * is not sure which.
2850  */
2851  if (got_SIGUSR2)
2852  WalSndDone(send_data);
2853  }
2854 
2855  /* Check for replication timeout. */
2857 
2858  /* Send keepalive if the time has come */
2860 
2861  /*
2862  * Block if we have unsent data. XXX For logical replication, let
2863  * WalSndWaitForWal() handle any other blocking; idle receivers need
2864  * its additional actions. For physical replication, also block if
2865  * caught up; its send_data does not block.
2866  */
2867  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2870  {
2871  long sleeptime;
2872  int wakeEvents;
2873 
2875  wakeEvents = WL_SOCKET_READABLE;
2876  else
2877  wakeEvents = 0;
2878 
2879  /*
2880  * Use fresh timestamp, not last_processing, to reduce the chance
2881  * of reaching wal_sender_timeout before sending a keepalive.
2882  */
2884 
2885  if (pq_is_send_pending())
2886  wakeEvents |= WL_SOCKET_WRITEABLE;
2887 
2888  /* Sleep until something happens or we time out */
2889  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2890  }
2891  }
2892 }
char * application_name
Definition: guc_tables.c:543
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3464

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1548 of file walsender.c.

1549 {
1550  /* can't have sync rep confused by sending the same LSN several times */
1551  if (!last_write)
1552  lsn = InvalidXLogRecPtr;
1553 
1554  resetStringInfo(ctx->out);
1555 
1556  pq_sendbyte(ctx->out, 'w');
1557  pq_sendint64(ctx->out, lsn); /* dataStart */
1558  pq_sendint64(ctx->out, lsn); /* walEnd */
1559 
1560  /*
1561  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1562  * reserve space here.
1563  */
1564  pq_sendint64(ctx->out, 0); /* sendtime */
1565 }
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 362 of file walsender.c.

363 {
364  ResourceOwner resowner;
365 
366  if (CurrentResourceOwner == NULL)
367  return;
368 
369  /*
370  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
371  * in a local variable and clear it first.
372  */
373  resowner = CurrentResourceOwner;
374  CurrentResourceOwner = NULL;
375 
376  /* Now we can release resources and delete it. */
377  ResourceOwnerRelease(resowner,
378  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
379  ResourceOwnerRelease(resowner,
380  RESOURCE_RELEASE_LOCKS, isCommit, true);
381  ResourceOwnerRelease(resowner,
382  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
383  ResourceOwnerDelete(resowner);
384 }
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:648
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:854
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:55
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:54
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:56

References CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), and ResourceOwnerRelease().

Referenced by perform_base_backup(), UploadManifest(), and WalSndErrorCleanup().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3540 of file walsender.c.

3541 {
3542  int i;
3543 
3544  for (i = 0; i < max_wal_senders; i++)
3545  {
3546  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3547 
3548  SpinLockAcquire(&walsnd->mutex);
3549  if (walsnd->pid == 0)
3550  {
3551  SpinLockRelease(&walsnd->mutex);
3552  continue;
3553  }
3554  walsnd->needreload = true;
3555  SpinLockRelease(&walsnd->mutex);
3556  }
3557 }

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2991 of file walsender.c.

2993 {
2994  char path[MAXPGPATH];
2995 
2996  /*-------
2997  * When reading from a historic timeline, and there is a timeline switch
2998  * within this segment, read from the WAL segment belonging to the new
2999  * timeline.
3000  *
3001  * For example, imagine that this server is currently on timeline 5, and
3002  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3003  * 0/13002088. In pg_wal, we have these files:
3004  *
3005  * ...
3006  * 000000040000000000000012
3007  * 000000040000000000000013
3008  * 000000050000000000000013
3009  * 000000050000000000000014
3010  * ...
3011  *
3012  * In this situation, when requested to send the WAL from segment 0x13, on
3013  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3014  * recovery prefers files from newer timelines, so if the segment was
3015  * restored from the archive on this server, the file belonging to the old
3016  * timeline, 000000040000000000000013, might not exist. Their contents are
3017  * equal up to the switchpoint, because at a timeline switch, the used
3018  * portion of the old segment is copied to the new file.
3019  */
3020  *tli_p = sendTimeLine;
3022  {
3023  XLogSegNo endSegNo;
3024 
3025  XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3026  if (nextSegNo == endSegNo)
3027  *tli_p = sendTimeLineNextTLI;
3028  }
3029 
3030  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3031  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3032  if (state->seg.ws_file >= 0)
3033  return;
3034 
3035  /*
3036  * If the file is not found, assume it's because the standby asked for a
3037  * too old WAL segment that has already been removed or recycled.
3038  */
3039  if (errno == ENOENT)
3040  {
3041  char xlogfname[MAXFNAMELEN];
3042  int save_errno = errno;
3043 
3044  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
3045  errno = save_errno;
3046  ereport(ERROR,
3048  errmsg("requested WAL segment %s has already been removed",
3049  xlogfname)));
3050  }
3051  else
3052  ereport(ERROR,
3054  errmsg("could not open file \"%s\": %m",
3055  path)));
3056 }
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1087
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3812 of file walsender.c.

3813 {
3814  WalSnd *walsnd = MyWalSnd;
3815 
3817 
3818  if (walsnd->state == state)
3819  return;
3820 
3821  SpinLockAcquire(&walsnd->mutex);
3822  walsnd->state = state;
3823  SpinLockRelease(&walsnd->mutex);
3824 }

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3624 of file walsender.c.

3625 {
3626  bool found;
3627  int i;
3628 
3629  WalSndCtl = (WalSndCtlData *)
3630  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3631 
3632  if (!found)
3633  {
3634  /* First time through, so initialize */
3636 
3637  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3639 
3640  for (i = 0; i < max_wal_senders; i++)
3641  {
3642  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3643 
3644  SpinLockInit(&walsnd->mutex);
3645  }
3646 
3650  }
3651 }
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition: walsender.c:3612

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3612 of file walsender.c.

3613 {
3614  Size size = 0;
3615 
3616  size = offsetof(WalSndCtlData, walsnds);
3618 
3619  return size;
3620 }
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), max_wal_senders, mul_size(), and size.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 240 of file walsender.c.

279 {
281 
282  /* Create a per-walsender data structure in shared memory */
284 
285  /*
286  * We don't currently need any ResourceOwner in a walsender process, but
287  * if we did, we could call CreateAuxProcessResourceOwner here.
288  */
289 
290  /*
291  * Let postmaster know that we're a WAL sender. Once we've declared us as
292  * a WAL sender process, postmaster will let us outlive the bgwriter and
293  * kill us last in the shutdown sequence, so we get a chance to stream all
294  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
295  * there's no going back, and we mustn't write any WAL records after this.
296  */
299 
300  /*
301  * If the client didn't specify a database to connect to, show in PGPROC
302  * that our advertised xmin should affect vacuum horizons in all
303  * databases. This allows physical replication clients to send hot
304  * standby feedback that will delay vacuum cleanup in all databases.
305  */
306  if (MyDatabaseId == InvalidOid)
307  {
309  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
312  LWLockRelease(ProcArrayLock);
313  }
314 
315  /* Initialize empty timestamp buffer for lag tracking. */
317 }
@ LW_EXCLUSIVE
Definition: lwlock.h:114
MemoryContext TopMemoryContext
Definition: mcxt.c:149
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:181
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:339
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
PROC_HDR * ProcGlobal
Definition: proc.c:79
uint8 statusFlags
Definition: proc.h:237
int pgxactoff
Definition: proc.h:179
uint8 * statusFlags
Definition: proc.h:394
static void InitWalSenderSlot(void)
Definition: walsender.c:2896

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3593 of file walsender.c.

3594 {
3595  /* Set up signal handlers */
3597  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3598  pqsignal(SIGTERM, die); /* request shutdown */
3599  /* SIGQUIT handler was already set up by InitPostmasterChild */
3600  InitializeTimeouts(); /* establishes SIGALRM handler */
3603  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3604  * shutdown */
3605 
3606  /* Reset some signals that are accepted by postmaster but not here */
3608 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3011
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:671
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3585
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1671 of file walsender.c.

1673 {
1674  static TimestampTz sendTime = 0;
1676  bool pending_writes = false;
1677  bool end_xact = ctx->end_xact;
1678 
1679  /*
1680  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1681  * avoid flooding the lag tracker when we commit frequently.
1682  *
1683  * We don't have a mechanism to get the ack for any LSN other than end
1684  * xact LSN from the downstream. So, we track lag only for end of
1685  * transaction LSN.
1686  */
1687 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1688  if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1690  {
1691  LagTrackerWrite(lsn, now);
1692  sendTime = now;
1693  }
1694 
1695  /*
1696  * When skipping empty transactions in synchronous replication, we send a
1697  * keepalive message to avoid delaying such transactions.
1698  *
1699  * It is okay to check sync_standbys_defined flag without lock here as in
1700  * the worst case we will just send an extra keepalive message when it is
1701  * really not required.
1702  */
1703  if (skipped_xact &&
1704  SyncRepRequested() &&
1705  ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1706  {
1707  WalSndKeepalive(false, lsn);
1708 
1709  /* Try to flush pending output to the client */
1710  if (pq_flush_if_writable() != 0)
1711  WalSndShutdown();
1712 
1713  /* If we have pending write here, make sure it's actually flushed */
1714  if (pq_is_send_pending())
1715  pending_writes = true;
1716  }
1717 
1718  /*
1719  * Process pending writes if any or try to send a keepalive if required.
1720  * We don't need to try sending keep alive messages at the transaction end
1721  * as that will be done at a later point in time. This is required only
1722  * for large transactions where we don't send any changes to the
1723  * downstream and the receiver can timeout due to that.
1724  */
1725  if (pending_writes || (!end_xact &&
1727  wal_sender_timeout / 2)))
1729 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1780
#define SyncRepRequested()
Definition: syncrep.h:18
static void ProcessPendingWrites(void)
Definition: walsender.c:1617
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:4098

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3690 of file walsender.c.

3691 {
3692  WaitEvent event;
3693 
3694  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3695 
3696  /*
3697  * We use a condition variable to efficiently wake up walsenders in
3698  * WalSndWakeup().
3699  *
3700  * Every walsender prepares to sleep on a shared memory CV. Note that it
3701  * just prepares to sleep on the CV (i.e., adds itself to the CV's
3702  * waitlist), but does not actually wait on the CV (IOW, it never calls
3703  * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3704  * waiting, because we also need to wait for socket events. The processes
3705  * (startup process, walreceiver etc.) wanting to wake up walsenders use
3706  * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3707  * walsenders come out of WaitEventSetWait().
3708  *
3709  * This approach is simple and efficient because, one doesn't have to loop
3710  * through all the walsenders slots, with a spinlock acquisition and
3711  * release for every iteration, just to wake up only the waiting
3712  * walsenders. It makes WalSndWakeup() callers' life easy.
3713  *
3714  * XXX: A desirable future improvement would be to add support for CVs
3715  * into WaitEventSetWait().
3716  *
3717  * And, we use separate shared memory CVs for physical and logical
3718  * walsenders for selective wake ups, see WalSndWakeup() for more details.
3719  *
3720  * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
3721  * until awakened by physical walsenders after the walreceiver confirms
3722  * the receipt of the LSN.
3723  */
3724  if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
3728  else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
3730 
3731  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3732  (event.events & WL_POSTMASTER_DEATH))
3733  {
3735  proc_exit(1);
3736  }
3737 
3739 }
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:1049
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1424
#define WL_POSTMASTER_DEATH
Definition: latch.h:131
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:166
uint32 events
Definition: latch.h:155

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1821 of file walsender.c.

1822 {
1823  int wakeEvents;
1824  uint32 wait_event = 0;
1825  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1826 
1827  /*
1828  * Fast path to avoid acquiring the spinlock in case we already know we
1829  * have enough WAL available and all the standby servers have confirmed
1830  * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1831  * if we're far behind.
1832  */
1833  if (!XLogRecPtrIsInvalid(RecentFlushPtr) &&
1834  !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1835  return RecentFlushPtr;
1836 
1837  /*
1838  * Within the loop, we wait for the necessary WALs to be flushed to disk
1839  * first, followed by waiting for standbys to catch up if there are enough
1840  * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1841  */
1842  for (;;)
1843  {
1844  bool wait_for_standby_at_stop = false;
1845  long sleeptime;
1846 
1847  /* Clear any already-pending wakeups */
1849 
1851 
1852  /* Process any requests or signals received recently */
1853  if (ConfigReloadPending)
1854  {
1855  ConfigReloadPending = false;
1858  }
1859 
1860  /* Check for input from the client */
1862 
1863  /*
1864  * If we're shutting down, trigger pending WAL to be written out,
1865  * otherwise we'd possibly end up waiting for WAL that never gets
1866  * written, because walwriter has shut down already.
1867  */
1868  if (got_STOPPING)
1870 
1871  /*
1872  * To avoid the scenario where standbys need to catch up to a newer
1873  * WAL location in each iteration, we update our idea of the currently
1874  * flushed position only if we are not waiting for standbys to catch
1875  * up.
1876  */
1877  if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
1878  {
1879  if (!RecoveryInProgress())
1880  RecentFlushPtr = GetFlushRecPtr(NULL);
1881  else
1882  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1883  }
1884 
1885  /*
1886  * If postmaster asked us to stop and the standby slots have caught up
1887  * to the flushed position, don't wait anymore.
1888  *
1889  * It's important to do this check after the recomputation of
1890  * RecentFlushPtr, so we can send all remaining data before shutting
1891  * down.
1892  */
1893  if (got_STOPPING)
1894  {
1895  if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event))
1896  wait_for_standby_at_stop = true;
1897  else
1898  break;
1899  }
1900 
1901  /*
1902  * We only send regular messages to the client for full decoded
1903  * transactions, but a synchronous replication and walsender shutdown
1904  * possibly are waiting for a later location. So, before sleeping, we
1905  * send a ping containing the flush location. If the receiver is
1906  * otherwise idle, this keepalive will trigger a reply. Processing the
1907  * reply will update these MyWalSnd locations.
1908  */
1909  if (MyWalSnd->flush < sentPtr &&
1910  MyWalSnd->write < sentPtr &&
1913 
1914  /*
1915  * Exit the loop if already caught up and doesn't need to wait for
1916  * standby slots.
1917  */
1918  if (!wait_for_standby_at_stop &&
1919  !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1920  break;
1921 
1922  /*
1923  * Waiting for new WAL or waiting for standbys to catch up. Since we
1924  * need to wait, we're now caught up.
1925  */
1926  WalSndCaughtUp = true;
1927 
1928  /*
1929  * Try to flush any pending output to the client.
1930  */
1931  if (pq_flush_if_writable() != 0)
1932  WalSndShutdown();
1933 
1934  /*
1935  * If we have received CopyDone from the client, sent CopyDone
1936  * ourselves, and the output buffer is empty, it's time to exit
1937  * streaming, so fail the current WAL fetch request.
1938  */
1940  !pq_is_send_pending())
1941  break;
1942 
1943  /* die if timeout was reached */
1945 
1946  /* Send keepalive if the time has come */
1948 
1949  /*
1950  * Sleep until something happens or we time out. Also wait for the
1951  * socket becoming writable, if there's still pending output.
1952  * Otherwise we might sit on sendable output data while waiting for
1953  * new WAL to be generated. (But if we have nothing to send, we don't
1954  * want to wake on socket-writable.)
1955  */
1957 
1958  wakeEvents = WL_SOCKET_READABLE;
1959 
1960  if (pq_is_send_pending())
1961  wakeEvents |= WL_SOCKET_WRITEABLE;
1962 
1963  Assert(wait_event != 0);
1964 
1965  WalSndWait(wakeEvents, sleeptime, wait_event);
1966  }
1967 
1968  /* reactivate latch so WalSndLoop knows to continue */
1969  SetLatch(MyLatch);
1970  return RecentFlushPtr;
1971 }
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1793
bool XLogBackgroundFlush(void)
Definition: xlog.c:2983

References Assert, CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogBackgroundFlush(), and XLogRecPtrIsInvalid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3774 of file walsender.c.

3775 {
3776  for (;;)
3777  {
3778  int i;
3779  bool all_stopped = true;
3780 
3781  for (i = 0; i < max_wal_senders; i++)
3782  {
3783  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3784 
3785  SpinLockAcquire(&walsnd->mutex);
3786 
3787  if (walsnd->pid == 0)
3788  {
3789  SpinLockRelease(&walsnd->mutex);
3790  continue;
3791  }
3792 
3793  if (walsnd->state != WALSNDSTATE_STOPPING)
3794  {
3795  all_stopped = false;
3796  SpinLockRelease(&walsnd->mutex);
3797  break;
3798  }
3799  SpinLockRelease(&walsnd->mutex);
3800  }
3801 
3802  /* safe to leave if confirmation is done for all WAL senders */
3803  if (all_stopped)
3804  return;
3805 
3806  pg_usleep(10000L); /* wait for 10 msec */
3807  }
3808 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3669 of file walsender.c.

3670 {
3671  /*
3672  * Wake up all the walsenders waiting on WAL being flushed or replayed
3673  * respectively. Note that waiting walsender would have prepared to sleep
3674  * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3675  * before actually waiting.
3676  */
3677  if (physical)
3679 
3680  if (logical)
3682 }

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1575 of file walsender.c.

1577 {
1578  TimestampTz now;
1579 
1580  /*
1581  * Fill the send timestamp last, so that it is taken as late as possible.
1582  * This is somewhat ugly, but the protocol is set as it's already used for
1583  * several releases by streaming physical replication.
1584  */
1587  pq_sendint64(&tmpbuf, now);
1588  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1589  tmpbuf.data, sizeof(int64));
1590 
1591  /* output previously gathered data in a CopyData packet */
1592  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1593 
1595 
1596  /* Try to flush pending output to the client */
1597  if (pq_flush_if_writable() != 0)
1598  WalSndShutdown();
1599 
1600  /* Try taking fast path unless we get too close to walsender timeout. */
1602  wal_sender_timeout / 2) &&
1603  !pq_is_send_pending())
1604  {
1605  return;
1606  }
1607 
1608  /* If we have pending write here, go to slow path */
1610 }

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3379 of file walsender.c.

3380 {
3381  XLogRecord *record;
3382  char *errm;
3383 
3384  /*
3385  * We'll use the current flush point to determine whether we've caught up.
3386  * This variable is static in order to cache it across calls. Caching is
3387  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3388  * spinlock.
3389  */
3390  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3391 
3392  /*
3393  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3394  * true in WalSndWaitForWal, if we're actually waiting. We also set to
3395  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3396  * didn't wait - i.e. when we're shutting down.
3397  */
3398  WalSndCaughtUp = false;
3399 
3400  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3401 
3402  /* xlog record was invalid */
3403  if (errm != NULL)
3404  elog(ERROR, "could not find record while sending logically-decoded data: %s",
3405  errm);
3406 
3407  if (record != NULL)
3408  {
3409  /*
3410  * Note the lack of any call to LagTrackerWrite() which is handled by
3411  * WalSndUpdateProgress which is called by output plugin through
3412  * logical decoding write api.
3413  */
3415 
3417  }
3418 
3419  /*
3420  * If first time through in this session, initialize flushPtr. Otherwise,
3421  * we only need to update flushPtr if EndRecPtr is past it.
3422  */
3423  if (flushPtr == InvalidXLogRecPtr ||
3424  logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3425  {
3427  flushPtr = GetStandbyFlushRecPtr(NULL);
3428  else
3429  flushPtr = GetFlushRecPtr(NULL);
3430  }
3431 
3432  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3433  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3434  WalSndCaughtUp = true;
3435 
3436  /*
3437  * If we're caught up and have been requested to stop, have WalSndLoop()
3438  * terminate the connection in an orderly manner, after writing out all
3439  * the pending data.
3440  */
3442  got_SIGUSR2 = true;
3443 
3444  /* Update shared memory status */
3445  {
3446  WalSnd *walsnd = MyWalSnd;
3447 
3448  SpinLockAcquire(&walsnd->mutex);
3449  walsnd->sentPtr = sentPtr;
3450  SpinLockRelease(&walsnd->mutex);
3451  }
3452 }
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:88
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:389

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3069 of file walsender.c.

3070 {
3071  XLogRecPtr SendRqstPtr;
3072  XLogRecPtr startptr;
3073  XLogRecPtr endptr;
3074  Size nbytes;
3075  XLogSegNo segno;
3076  WALReadError errinfo;
3077  Size rbytes;
3078 
3079  /* If requested switch the WAL sender to the stopping state. */
3080  if (got_STOPPING)
3082 
3084  {
3085  WalSndCaughtUp = true;
3086  return;
3087  }
3088 
3089  /* Figure out how far we can safely send the WAL. */
3091  {
3092  /*
3093  * Streaming an old timeline that's in this server's history, but is
3094  * not the one we're currently inserting or replaying. It can be
3095  * streamed up to the point where we switched off that timeline.
3096  */
3097  SendRqstPtr = sendTimeLineValidUpto;
3098  }
3099  else if (am_cascading_walsender)
3100  {
3101  TimeLineID SendRqstTLI;
3102 
3103  /*
3104  * Streaming the latest timeline on a standby.
3105  *
3106  * Attempt to send all WAL that has already been replayed, so that we
3107  * know it's valid. If we're receiving WAL through streaming
3108  * replication, it's also OK to send any WAL that has been received
3109  * but not replayed.
3110  *
3111  * The timeline we're recovering from can change, or we can be
3112  * promoted. In either case, the current timeline becomes historic. We
3113  * need to detect that so that we don't try to stream past the point
3114  * where we switched to another timeline. We check for promotion or
3115  * timeline switch after calculating FlushPtr, to avoid a race
3116  * condition: if the timeline becomes historic just after we checked
3117  * that it was still current, it's still be OK to stream it up to the
3118  * FlushPtr that was calculated before it became historic.
3119  */
3120  bool becameHistoric = false;
3121 
3122  SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
3123 
3124  if (!RecoveryInProgress())
3125  {
3126  /* We have been promoted. */
3127  SendRqstTLI = GetWALInsertionTimeLine();
3128  am_cascading_walsender = false;
3129  becameHistoric = true;
3130  }
3131  else
3132  {
3133  /*
3134  * Still a cascading standby. But is the timeline we're sending
3135  * still the one recovery is recovering from?
3136  */
3137  if (sendTimeLine != SendRqstTLI)
3138  becameHistoric = true;
3139  }
3140 
3141  if (becameHistoric)
3142  {
3143  /*
3144  * The timeline we were sending has become historic. Read the
3145  * timeline history file of the new timeline to see where exactly
3146  * we forked off from the timeline we were sending.
3147  */
3148  List *history;
3149 
3150  history = readTimeLineHistory(SendRqstTLI);
3152 
3154  list_free_deep(history);
3155 
3156  sendTimeLineIsHistoric = true;
3157 
3158  SendRqstPtr = sendTimeLineValidUpto;
3159  }
3160  }
3161  else
3162  {
3163  /*
3164  * Streaming the current timeline on a primary.
3165  *
3166  * Attempt to send all data that's already been written out and
3167  * fsync'd to disk. We cannot go further than what's been written out
3168  * given the current implementation of WALRead(). And in any case
3169  * it's unsafe to send WAL that is not securely down to disk on the
3170  * primary: if the primary subsequently crashes and restarts, standbys
3171  * must not have applied any WAL that got lost on the primary.
3172  */
3173  SendRqstPtr = GetFlushRecPtr(NULL);
3174  }
3175 
3176  /*
3177  * Record the current system time as an approximation of the time at which
3178  * this WAL location was written for the purposes of lag tracking.
3179  *
3180  * In theory we could make XLogFlush() record a time in shmem whenever WAL
3181  * is flushed and we could get that time as well as the LSN when we call
3182  * GetFlushRecPtr() above (and likewise for the cascading standby
3183  * equivalent), but rather than putting any new code into the hot WAL path
3184  * it seems good enough to capture the time here. We should reach this
3185  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3186  * may take some time, we read the WAL flush pointer and take the time
3187  * very close to together here so that we'll get a later position if it is
3188  * still moving.
3189  *
3190  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3191  * this gives us a cheap approximation for the WAL flush time for this
3192  * LSN.
3193  *
3194  * Note that the LSN is not necessarily the LSN for the data contained in
3195  * the present message; it's the end of the WAL, which might be further
3196  * ahead. All the lag tracking machinery cares about is finding out when
3197  * that arbitrary LSN is eventually reported as written, flushed and
3198  * applied, so that it can measure the elapsed time.
3199  */
3200  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
3201 
3202  /*
3203  * If this is a historic timeline and we've reached the point where we
3204  * forked to the next timeline, stop streaming.
3205  *
3206  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3207  * startup process will normally replay all WAL that has been received
3208  * from the primary, before promoting, but if the WAL streaming is
3209  * terminated at a WAL page boundary, the valid portion of the timeline
3210  * might end in the middle of a WAL record. We might've already sent the
3211  * first half of that partial WAL record to the cascading standby, so that
3212  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3213  * replay the partial WAL record either, so it can still follow our
3214  * timeline switch.
3215  */
3217  {
3218  /* close the current file. */
3219  if (xlogreader->seg.ws_file >= 0)
3221 
3222  /* Send CopyDone */
3223  pq_putmessage_noblock('c', NULL, 0);
3224  streamingDoneSending = true;
3225 
3226  WalSndCaughtUp = true;
3227 
3228  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
3231  return;
3232  }
3233 
3234  /* Do we have any work to do? */
3235  Assert(sentPtr <= SendRqstPtr);
3236  if (SendRqstPtr <= sentPtr)
3237  {
3238  WalSndCaughtUp = true;
3239  return;
3240  }
3241 
3242  /*
3243  * Figure out how much to send in one message. If there's no more than
3244  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3245  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3246  *
3247  * The rounding is not only for performance reasons. Walreceiver relies on
3248  * the fact that we never split a WAL record across two messages. Since a
3249  * long WAL record is split at page boundary into continuation records,
3250  * page boundary is always a safe cut-off point. We also assume that
3251  * SendRqstPtr never points to the middle of a WAL record.
3252  */
3253  startptr = sentPtr;
3254  endptr = startptr;
3255  endptr += MAX_SEND_SIZE;
3256 
3257  /* if we went beyond SendRqstPtr, back off */
3258  if (SendRqstPtr <= endptr)
3259  {
3260  endptr = SendRqstPtr;
3262  WalSndCaughtUp = false;
3263  else
3264  WalSndCaughtUp = true;
3265  }
3266  else
3267  {
3268  /* round down to page boundary. */
3269  endptr -= (endptr % XLOG_BLCKSZ);
3270  WalSndCaughtUp = false;
3271  }
3272 
3273  nbytes = endptr - startptr;
3274  Assert(nbytes <= MAX_SEND_SIZE);
3275 
3276  /*
3277  * OK to read and send the slice.
3278  */
3280  pq_sendbyte(&output_message, 'w');
3281 
3282  pq_sendint64(&output_message, startptr); /* dataStart */
3283  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3284  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3285 
3286  /*
3287  * Read the log directly into the output buffer to avoid extra memcpy
3288  * calls.
3289  */
3291 
3292 retry:
3293  /* attempt to read WAL from WAL buffers first */
3295  startptr, nbytes, xlogreader->seg.ws_tli);
3296  output_message.len += rbytes;
3297  startptr += rbytes;
3298  nbytes -= rbytes;
3299 
3300  /* now read the remaining WAL from WAL file */
3301  if (nbytes > 0 &&
3304  startptr,
3305  nbytes,
3306  xlogreader->seg.ws_tli, /* Pass the current TLI because
3307  * only WalSndSegmentOpen controls
3308  * whether new TLI is needed. */
3309  &errinfo))
3310  WALReadRaiseError(&errinfo);
3311 
3312  /* See logical_read_xlog_page(). */
3313  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3315 
3316  /*
3317  * During recovery, the currently-open WAL file might be replaced with the
3318  * file of the same name retrieved from archive. So we always need to
3319  * check what we read was valid after reading into the buffer. If it's
3320  * invalid, we try to open and read the file again.
3321  */
3323  {
3324  WalSnd *walsnd = MyWalSnd;
3325  bool reload;
3326 
3327  SpinLockAcquire(&walsnd->mutex);
3328  reload = walsnd->needreload;
3329  walsnd->needreload = false;
3330  SpinLockRelease(&walsnd->mutex);
3331 
3332  if (reload && xlogreader->seg.ws_file >= 0)
3333  {