PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void ParseAlterReplSlotOptions (AlterReplicationSlotCmd *cmd, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void ProcessPendingWrites (void)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 221 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 109 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 239 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1442 of file walsender.c.

1443 {
1444  bool failover = false;
1445 
1446  ParseAlterReplSlotOptions(cmd, &failover);
1447  ReplicationSlotAlter(cmd->slotname, failover);
1448 }
void ReplicationSlotAlter(const char *name, bool failover)
Definition: slot.c:743
static void ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
Definition: walsender.c:1417

References ParseAlterReplSlotOptions(), ReplicationSlotAlter(), and AlterReplicationSlotCmd::slotname.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1203 of file walsender.c.

1204 {
1205  const char *snapshot_name = NULL;
1206  char xloc[MAXFNAMELEN];
1207  char *slot_name;
1208  bool reserve_wal = false;
1209  bool two_phase = false;
1210  bool failover = false;
1211  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1212  DestReceiver *dest;
1213  TupOutputState *tstate;
1214  TupleDesc tupdesc;
1215  Datum values[4];
1216  bool nulls[4] = {0};
1217 
1219 
1220  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
1221  &failover);
1222 
1223  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1224  {
1225  ReplicationSlotCreate(cmd->slotname, false,
1227  false, false, false);
1228 
1229  if (reserve_wal)
1230  {
1232 
1234 
1235  /* Write this slot to disk if it's a permanent one. */
1236  if (!cmd->temporary)
1238  }
1239  }
1240  else
1241  {
1243  bool need_full_snapshot = false;
1244 
1246 
1248 
1249  /*
1250  * Initially create persistent slot as ephemeral - that allows us to
1251  * nicely handle errors during initialization because it'll get
1252  * dropped if this transaction fails. We'll make it persistent at the
1253  * end. Temporary slots can be created as temporary from beginning as
1254  * they get dropped on error as well.
1255  */
1256  ReplicationSlotCreate(cmd->slotname, true,
1258  two_phase, failover, false);
1259 
1260  /*
1261  * Do options check early so that we can bail before calling the
1262  * DecodingContextFindStartpoint which can take long time.
1263  */
1264  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1265  {
1266  if (IsTransactionBlock())
1267  ereport(ERROR,
1268  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1269  (errmsg("%s must not be called inside a transaction",
1270  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1271 
1272  need_full_snapshot = true;
1273  }
1274  else if (snapshot_action == CRS_USE_SNAPSHOT)
1275  {
1276  if (!IsTransactionBlock())
1277  ereport(ERROR,
1278  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1279  (errmsg("%s must be called inside a transaction",
1280  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1281 
1283  ereport(ERROR,
1284  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1285  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1286  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1287  if (!XactReadOnly)
1288  ereport(ERROR,
1289  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1290  (errmsg("%s must be called in a read-only transaction",
1291  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1292 
1293  if (FirstSnapshotSet)
1294  ereport(ERROR,
1295  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1296  (errmsg("%s must be called before any query",
1297  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1298 
1299  if (IsSubTransaction())
1300  ereport(ERROR,
1301  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1302  (errmsg("%s must not be called in a subtransaction",
1303  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1304 
1305  need_full_snapshot = true;
1306  }
1307 
1308  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1310  XL_ROUTINE(.page_read = logical_read_xlog_page,
1311  .segment_open = WalSndSegmentOpen,
1312  .segment_close = wal_segment_close),
1315 
1316  /*
1317  * Signal that we don't need the timeout mechanism. We're just
1318  * creating the replication slot and don't yet accept feedback
1319  * messages or send keepalives. As we possibly need to wait for
1320  * further WAL the walsender would otherwise possibly be killed too
1321  * soon.
1322  */
1324 
1325  /* build initial snapshot, might take a while */
1327 
1328  /*
1329  * Export or use the snapshot if we've been asked to do so.
1330  *
1331  * NB. We will convert the snapbuild.c kind of snapshot to normal
1332  * snapshot when doing this.
1333  */
1334  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1335  {
1336  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1337  }
1338  else if (snapshot_action == CRS_USE_SNAPSHOT)
1339  {
1340  Snapshot snap;
1341 
1344  }
1345 
1346  /* don't need the decoding context anymore */
1347  FreeDecodingContext(ctx);
1348 
1349  if (!cmd->temporary)
1351  }
1352 
1353  snprintf(xloc, sizeof(xloc), "%X/%X",
1355 
1357 
1358  /*----------
1359  * Need a tuple descriptor representing four columns:
1360  * - first field: the slot name
1361  * - second field: LSN at which we became consistent
1362  * - third field: exported snapshot's name
1363  * - fourth field: output plugin
1364  */
1365  tupdesc = CreateTemplateTupleDesc(4);
1366  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1367  TEXTOID, -1, 0);
1368  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1369  TEXTOID, -1, 0);
1370  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1371  TEXTOID, -1, 0);
1372  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1373  TEXTOID, -1, 0);
1374 
1375  /* prepare for projection of tuples */
1376  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1377 
1378  /* slot_name */
1379  slot_name = NameStr(MyReplicationSlot->data.name);
1380  values[0] = CStringGetTextDatum(slot_name);
1381 
1382  /* consistent wal location */
1383  values[1] = CStringGetTextDatum(xloc);
1384 
1385  /* snapshot name, or NULL if none */
1386  if (snapshot_name != NULL)
1387  values[2] = CStringGetTextDatum(snapshot_name);
1388  else
1389  nulls[2] = true;
1390 
1391  /* plugin, or NULL if none */
1392  if (cmd->plugin != NULL)
1393  values[3] = CStringGetTextDatum(cmd->plugin);
1394  else
1395  nulls[3] = true;
1396 
1397  /* send it to dest */
1398  do_tup_output(tstate, values, nulls);
1399  end_tup_output(tstate);
1400 
1402 }
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define NameStr(name)
Definition: c.h:735
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
Definition: execTuples.c:2274
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2332
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2254
Assert(fmt[strlen(fmt) - 1] !='\n')
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:688
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:644
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:110
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:330
#define NIL
Definition: pg_list.h:68
static bool two_phase
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:272
void ReplicationSlotMarkDirty(void)
Definition: slot.c:946
void ReplicationSlotReserveWal(void)
Definition: slot.c:1337
void ReplicationSlotPersist(void)
Definition: slot.c:963
ReplicationSlot * MyReplicationSlot
Definition: slot.c:116
void ReplicationSlotSave(void)
Definition: slot.c:928
void ReplicationSlotRelease(void)
Definition: slot.c:606
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:669
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:570
bool FirstSnapshotSet
Definition: snapmgr.c:142
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:1846
PGPROC * MyProc
Definition: proc.c:68
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:104
ReplicationSlotPersistentData data
Definition: slot.h:178
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:67
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:678
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition: walsender.c:1126
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2883
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1575
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1671
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:1054
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1548
static TimestampTz last_reply_timestamp
Definition: walsender.c:182
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
bool XactReadOnly
Definition: xact.c:82
int XactIsoLevel
Definition: xact.c:79
bool IsSubTransaction(void)
Definition: xact.c:4905
bool IsTransactionBlock(void)
Definition: xact.c:4832
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:845

References Assert(), begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1408 of file walsender.c.

1409 {
1410  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1411 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:720

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1873 of file walsender.c.

1874 {
1875  int parse_rc;
1876  Node *cmd_node;
1877  const char *cmdtag;
1878  MemoryContext cmd_context;
1879  MemoryContext old_context;
1880 
1881  /*
1882  * If WAL sender has been told that shutdown is getting close, switch its
1883  * status accordingly to handle the next replication commands correctly.
1884  */
1885  if (got_STOPPING)
1887 
1888  /*
1889  * Throw error if in stopping mode. We need prevent commands that could
1890  * generate WAL while the shutdown checkpoint is being written. To be
1891  * safe, we just prohibit all new commands.
1892  */
1894  ereport(ERROR,
1895  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1896  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1897 
1898  /*
1899  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1900  * command arrives. Clean up the old stuff if there's anything.
1901  */
1903 
1905 
1906  /*
1907  * Prepare to parse and execute the command.
1908  */
1910  "Replication command context",
1912  old_context = MemoryContextSwitchTo(cmd_context);
1913 
1914  replication_scanner_init(cmd_string);
1915 
1916  /*
1917  * Is it a WalSender command?
1918  */
1920  {
1921  /* Nope; clean up and get out. */
1923 
1924  MemoryContextSwitchTo(old_context);
1925  MemoryContextDelete(cmd_context);
1926 
1927  /* XXX this is a pretty random place to make this check */
1928  if (MyDatabaseId == InvalidOid)
1929  ereport(ERROR,
1930  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1931  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1932 
1933  /* Tell the caller that this wasn't a WalSender command. */
1934  return false;
1935  }
1936 
1937  /*
1938  * Looks like a WalSender command, so parse it.
1939  */
1940  parse_rc = replication_yyparse();
1941  if (parse_rc != 0)
1942  ereport(ERROR,
1943  (errcode(ERRCODE_SYNTAX_ERROR),
1944  errmsg_internal("replication command parser returned %d",
1945  parse_rc)));
1947 
1948  cmd_node = replication_parse_result;
1949 
1950  /*
1951  * Report query to various monitoring facilities. For this purpose, we
1952  * report replication commands just like SQL commands.
1953  */
1954  debug_query_string = cmd_string;
1955 
1957 
1958  /*
1959  * Log replication command if log_replication_commands is enabled. Even
1960  * when it's disabled, log the command with DEBUG1 level for backward
1961  * compatibility.
1962  */
1964  (errmsg("received replication command: %s", cmd_string)));
1965 
1966  /*
1967  * Disallow replication commands in aborted transaction blocks.
1968  */
1970  ereport(ERROR,
1971  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1972  errmsg("current transaction is aborted, "
1973  "commands ignored until end of transaction block")));
1974 
1976 
1977  /*
1978  * Allocate buffers that will be used for each outgoing and incoming
1979  * message. We do this just once per command to reduce palloc overhead.
1980  */
1984 
1985  switch (cmd_node->type)
1986  {
1987  case T_IdentifySystemCmd:
1988  cmdtag = "IDENTIFY_SYSTEM";
1989  set_ps_display(cmdtag);
1990  IdentifySystem();
1991  EndReplicationCommand(cmdtag);
1992  break;
1993 
1994  case T_ReadReplicationSlotCmd:
1995  cmdtag = "READ_REPLICATION_SLOT";
1996  set_ps_display(cmdtag);
1998  EndReplicationCommand(cmdtag);
1999  break;
2000 
2001  case T_BaseBackupCmd:
2002  cmdtag = "BASE_BACKUP";
2003  set_ps_display(cmdtag);
2004  PreventInTransactionBlock(true, cmdtag);
2006  EndReplicationCommand(cmdtag);
2007  break;
2008 
2009  case T_CreateReplicationSlotCmd:
2010  cmdtag = "CREATE_REPLICATION_SLOT";
2011  set_ps_display(cmdtag);
2013  EndReplicationCommand(cmdtag);
2014  break;
2015 
2016  case T_DropReplicationSlotCmd:
2017  cmdtag = "DROP_REPLICATION_SLOT";
2018  set_ps_display(cmdtag);
2020  EndReplicationCommand(cmdtag);
2021  break;
2022 
2023  case T_AlterReplicationSlotCmd:
2024  cmdtag = "ALTER_REPLICATION_SLOT";
2025  set_ps_display(cmdtag);
2027  EndReplicationCommand(cmdtag);
2028  break;
2029 
2030  case T_StartReplicationCmd:
2031  {
2032  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2033 
2034  cmdtag = "START_REPLICATION";
2035  set_ps_display(cmdtag);
2036  PreventInTransactionBlock(true, cmdtag);
2037 
2038  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2039  StartReplication(cmd);
2040  else
2042 
2043  /* dupe, but necessary per libpqrcv_endstreaming */
2044  EndReplicationCommand(cmdtag);
2045 
2046  Assert(xlogreader != NULL);
2047  break;
2048  }
2049 
2050  case T_TimeLineHistoryCmd:
2051  cmdtag = "TIMELINE_HISTORY";
2052  set_ps_display(cmdtag);
2053  PreventInTransactionBlock(true, cmdtag);
2055  EndReplicationCommand(cmdtag);
2056  break;
2057 
2058  case T_VariableShowStmt:
2059  {
2061  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2062 
2063  cmdtag = "SHOW";
2064  set_ps_display(cmdtag);
2065 
2066  /* syscache access needs a transaction environment */
2068  GetPGVariable(n->name, dest);
2070  EndReplicationCommand(cmdtag);
2071  }
2072  break;
2073 
2074  case T_UploadManifestCmd:
2075  cmdtag = "UPLOAD_MANIFEST";
2076  set_ps_display(cmdtag);
2077  PreventInTransactionBlock(true, cmdtag);
2078  UploadManifest();
2079  EndReplicationCommand(cmdtag);
2080  break;
2081 
2082  default:
2083  elog(ERROR, "unrecognized replication command node tag: %u",
2084  cmd_node->type);
2085  }
2086 
2087  /* done */
2088  MemoryContextSwitchTo(old_context);
2089  MemoryContextDelete(cmd_context);
2090 
2091  /*
2092  * We need not update ps display or pg_stat_activity, because PostgresMain
2093  * will reset those to "idle". But we must reset debug_query_string to
2094  * ensure it doesn't become a dangling pointer.
2095  */
2096  debug_query_string = NULL;
2097 
2098  return true;
2099 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:991
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:201
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1162
int errcode(int sqlerrcode)
Definition: elog.c:860
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:90
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
#define AllocSetContextCreate
Definition: memutils.h:128
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:152
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:86
#define InvalidOid
Definition: postgres_ext.h:36
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:730
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1442
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:592
WalSnd * MyWalSnd
Definition: walsender.c:115
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:493
static StringInfoData tmpbuf
Definition: walsender.c:173
static void IdentifySystem(void)
Definition: walsender.c:410
static StringInfoData reply_message
Definition: walsender.c:172
void WalSndSetState(WalSndState state)
Definition: walsender.c:3697
static StringInfoData output_message
Definition: walsender.c:171
static void UploadManifest(void)
Definition: walsender.c:682
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:201
bool log_replication_commands
Definition: walsender.c:128
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1203
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1455
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:150
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1408
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:822
static XLogReaderState * xlogreader
Definition: walsender.c:140
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3497
void StartTransactionCommand(void)
Definition: xact.c:2953
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:398
void CommitTransactionCommand(void)
Definition: xact.c:3050

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog(), EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3399 of file walsender.c.

3400 {
3401  XLogRecPtr replayPtr;
3402  TimeLineID replayTLI;
3403  XLogRecPtr receivePtr;
3405  XLogRecPtr result;
3406 
3408 
3409  /*
3410  * We can safely send what's already been replayed. Also, if walreceiver
3411  * is streaming WAL from the same timeline, we can send anything that it
3412  * has streamed, but hasn't been replayed yet.
3413  */
3414 
3415  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3416  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3417 
3418  if (tli)
3419  *tli = replayTLI;
3420 
3421  result = replayPtr;
3422  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3423  result = receivePtr;
3424 
3425  return result;
3426 }
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1429
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:119
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:264
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t *  offset,
IncrementalBackupInfo ib 
)
static

Definition at line 746 of file walsender.c.

748 {
749  int mtype;
750  int maxmsglen;
751 
753 
754  pq_startmsgread();
755  mtype = pq_getbyte();
756  if (mtype == EOF)
757  ereport(ERROR,
758  (errcode(ERRCODE_CONNECTION_FAILURE),
759  errmsg("unexpected EOF on client connection with an open transaction")));
760 
761  switch (mtype)
762  {
763  case 'd': /* CopyData */
764  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
765  break;
766  case 'c': /* CopyDone */
767  case 'f': /* CopyFail */
768  case 'H': /* Flush */
769  case 'S': /* Sync */
770  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
771  break;
772  default:
773  ereport(ERROR,
774  (errcode(ERRCODE_PROTOCOL_VIOLATION),
775  errmsg("unexpected message type 0x%02X during COPY from stdin",
776  mtype)));
777  maxmsglen = 0; /* keep compiler quiet */
778  break;
779  }
780 
781  /* Now collect the message body */
782  if (pq_getmessage(buf, maxmsglen))
783  ereport(ERROR,
784  (errcode(ERRCODE_CONNECTION_FAILURE),
785  errmsg("unexpected EOF on client connection with an open transaction")));
787 
788  /* Process the message */
789  switch (mtype)
790  {
791  case 'd': /* CopyData */
792  AppendIncrementalManifestData(ib, buf->data, buf->len);
793  return true;
794 
795  case 'c': /* CopyDone */
796  return false;
797 
798  case 'H': /* Sync */
799  case 'S': /* Flush */
800  /* Ignore these while in CopyOut mode as we do elsewhere. */
801  return true;
802 
803  case 'f':
804  ereport(ERROR,
805  (errcode(ERRCODE_QUERY_CANCELED),
806  errmsg("COPY from stdin failed: %s",
807  pq_getmsgstring(buf))));
808  }
809 
810  /* Not reached. */
811  Assert(false);
812  return false;
813 }
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:141
#define RESUME_CANCEL_INTERRUPTS()
Definition: miscadmin.h:143
static char * buf
Definition: pg_test_fsync.c:73
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1218
int pq_getbyte(void)
Definition: pqcomm.c:981
void pq_startmsgread(void)
Definition: pqcomm.c:1156
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:582

References AppendIncrementalManifestData(), Assert(), buf, ereport, errcode(), errmsg(), ERROR, HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3455 of file walsender.c.

3456 {
3458 
3459  /*
3460  * If replication has not yet started, die like with SIGTERM. If
3461  * replication is active, only set a flag and wake up the main loop. It
3462  * will send any outstanding WAL, wait for it to be replicated to the
3463  * standby, and then exit gracefully.
3464  */
3465  if (!replication_active)
3466  kill(MyProcPid, SIGTERM);
3467  else
3468  got_STOPPING = true;
3469 }
int MyProcPid
Definition: globals.c:45
bool am_walsender
Definition: walsender.c:118
static volatile sig_atomic_t replication_active
Definition: walsender.c:209
#define kill(pid, sig)
Definition: win32_port.h:485

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 410 of file walsender.c.

411 {
412  char sysid[32];
413  char xloc[MAXFNAMELEN];
414  XLogRecPtr logptr;
415  char *dbname = NULL;
417  TupOutputState *tstate;
418  TupleDesc tupdesc;
419  Datum values[4];
420  bool nulls[4] = {0};
421  TimeLineID currTLI;
422 
423  /*
424  * Reply with a result set with one row, four columns. First col is system
425  * ID, second is timeline ID, third is current xlog location and the
426  * fourth contains the database name if we are connected to one.
427  */
428 
429  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
431 
434  logptr = GetStandbyFlushRecPtr(&currTLI);
435  else
436  logptr = GetFlushRecPtr(&currTLI);
437 
438  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
439 
440  if (MyDatabaseId != InvalidOid)
441  {
443 
444  /* syscache access needs a transaction env. */
446  /* make dbname live outside TX context */
450  /* CommitTransactionCommand switches to TopMemoryContext */
452  }
453 
455 
456  /* need a tuple descriptor representing four columns */
457  tupdesc = CreateTemplateTupleDesc(4);
458  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
459  TEXTOID, -1, 0);
460  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
461  INT8OID, -1, 0);
462  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
463  TEXTOID, -1, 0);
464  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
465  TEXTOID, -1, 0);
466 
467  /* prepare for projection of tuples */
468  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
469 
470  /* column 1: system identifier */
471  values[0] = CStringGetTextDatum(sysid);
472 
473  /* column 2: timeline */
474  values[1] = Int64GetDatum(currTLI);
475 
476  /* column 3: wal location */
477  values[2] = CStringGetTextDatum(xloc);
478 
479  /* column 4: database name, or NULL if none */
480  if (dbname)
482  else
483  nulls[3] = true;
484 
485  /* send it to dest */
486  do_tup_output(tstate, values, nulls);
487 
488  end_tup_output(tstate);
489 }
#define UINT64_FORMAT
Definition: c.h:538
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3089
struct cursor * cur
Definition: ecpg.c:28
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
char * dbname
Definition: streamutil.c:51
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3399
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4463
bool RecoveryInProgress(void)
Definition: xlog.c:6211
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6376

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextSwitchTo(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2788 of file walsender.c.

2789 {
2790  int i;
2791 
2792  /*
2793  * WalSndCtl should be set up already (we inherit this by fork() or
2794  * EXEC_BACKEND mechanism from the postmaster).
2795  */
2796  Assert(WalSndCtl != NULL);
2797  Assert(MyWalSnd == NULL);
2798 
2799  /*
2800  * Find a free walsender slot and reserve it. This must not fail due to
2801  * the prior check for free WAL senders in InitProcess().
2802  */
2803  for (i = 0; i < max_wal_senders; i++)
2804  {
2805  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2806 
2807  SpinLockAcquire(&walsnd->mutex);
2808 
2809  if (walsnd->pid != 0)
2810  {
2811  SpinLockRelease(&walsnd->mutex);
2812  continue;
2813  }
2814  else
2815  {
2816  /*
2817  * Found a free slot. Reserve it for us.
2818  */
2819  walsnd->pid = MyProcPid;
2820  walsnd->state = WALSNDSTATE_STARTUP;
2821  walsnd->sentPtr = InvalidXLogRecPtr;
2822  walsnd->needreload = false;
2823  walsnd->write = InvalidXLogRecPtr;
2824  walsnd->flush = InvalidXLogRecPtr;
2825  walsnd->apply = InvalidXLogRecPtr;
2826  walsnd->writeLag = -1;
2827  walsnd->flushLag = -1;
2828  walsnd->applyLag = -1;
2829  walsnd->sync_standby_priority = 0;
2830  walsnd->latch = &MyProc->procLatch;
2831  walsnd->replyTime = 0;
2832 
2833  /*
2834  * The kind assignment is done here and not in StartReplication()
2835  * and StartLogicalReplication(). Indeed, the logical walsender
2836  * needs to read WAL records (like snapshot of running
2837  * transactions) during the slot creation. So it needs to be woken
2838  * up based on its kind.
2839  *
2840  * The kind assignment could also be done in StartReplication(),
2841  * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2842  * seems better to set it on one place.
2843  */
2844  if (MyDatabaseId == InvalidOid)
2845  walsnd->kind = REPLICATION_KIND_PHYSICAL;
2846  else
2847  walsnd->kind = REPLICATION_KIND_LOGICAL;
2848 
2849  SpinLockRelease(&walsnd->mutex);
2850  /* don't need the lock anymore */
2851  MyWalSnd = (WalSnd *) walsnd;
2852 
2853  break;
2854  }
2855  }
2856 
2857  Assert(MyWalSnd != NULL);
2858 
2859  /* Arrange to clean up at walsender exit */
2861 }
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch procLatch
Definition: proc.h:170
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
Latch * latch
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:124
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2865
WalSndCtlData * WalSndCtl
Definition: walsender.c:112
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert(), WalSnd::flush, WalSnd::flushLag, i, InvalidOid, InvalidXLogRecPtr, WalSnd::kind, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyDatabaseId, MyProc, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4048 of file walsender.c.

4049 {
4050  TimestampTz time = 0;
4051 
4052  /* Read all unread samples up to this LSN or end of buffer. */
4053  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4054  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
4055  {
4056  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
4057  lag_tracker->last_read[head] =
4059  lag_tracker->read_heads[head] =
4061  }
4062 
4063  /*
4064  * If the lag tracker is empty, that means the standby has processed
4065  * everything we've ever sent so we should now clear 'last_read'. If we
4066  * didn't do that, we'd risk using a stale and irrelevant sample for
4067  * interpolation at the beginning of the next burst of WAL after a period
4068  * of idleness.
4069  */
4071  lag_tracker->last_read[head].time = 0;
4072 
4073  if (time > now)
4074  {
4075  /* If the clock somehow went backwards, treat as not found. */
4076  return -1;
4077  }
4078  else if (time == 0)
4079  {
4080  /*
4081  * We didn't cross a time. If there is a future sample that we
4082  * haven't reached yet, and we've already reached at least one sample,
4083  * let's interpolate the local flushed time. This is mainly useful
4084  * for reporting a completely stuck apply position as having
4085  * increasing lag, since otherwise we'd have to wait for it to
4086  * eventually start moving again and cross one of our samples before
4087  * we can show the lag increasing.
4088  */
4090  {
4091  /* There are no future samples, so we can't interpolate. */
4092  return -1;
4093  }
4094  else if (lag_tracker->last_read[head].time != 0)
4095  {
4096  /* We can interpolate between last_read and the next sample. */
4097  double fraction;
4098  WalTimeSample prev = lag_tracker->last_read[head];
4100 
4101  if (lsn < prev.lsn)
4102  {
4103  /*
4104  * Reported LSNs shouldn't normally go backwards, but it's
4105  * possible when there is a timeline change. Treat as not
4106  * found.
4107  */
4108  return -1;
4109  }
4110 
4111  Assert(prev.lsn < next.lsn);
4112 
4113  if (prev.time > next.time)
4114  {
4115  /* If the clock somehow went backwards, treat as not found. */
4116  return -1;
4117  }
4118 
4119  /* See how far we are between the previous and next samples. */
4120  fraction =
4121  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4122 
4123  /* Scale the local flush time proportionally. */
4124  time = (TimestampTz)
4125  ((double) prev.time + (next.time - prev.time) * fraction);
4126  }
4127  else
4128  {
4129  /*
4130  * We have only a future sample, implying that we were entirely
4131  * caught up but and now there is a new burst of WAL and the
4132  * standby hasn't processed the first sample yet. Until the
4133  * standby reaches the future sample the best we can do is report
4134  * the hypothetical lag if that sample were to be replayed now.
4135  */
4136  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
4137  }
4138  }
4139 
4140  /* Return the elapsed time since local flush time in microseconds. */
4141  Assert(time != 0);
4142  return now - time;
4143 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1619
static int32 next
Definition: blutils.c:221
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:227
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:229
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:230
int write_head
Definition: walsender.c:228
TimestampTz time
Definition: walsender.c:217
XLogRecPtr lsn
Definition: walsender.c:216
static LagTracker * lag_tracker
Definition: walsender.c:233
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:221

References Assert(), LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3983 of file walsender.c.

3984 {
3985  bool buffer_full;
3986  int new_write_head;
3987  int i;
3988 
3989  if (!am_walsender)
3990  return;
3991 
3992  /*
3993  * If the lsn hasn't advanced since last time, then do nothing. This way
3994  * we only record a new sample when new WAL has been written.
3995  */
3996  if (lag_tracker->last_lsn == lsn)
3997  return;
3998  lag_tracker->last_lsn = lsn;
3999 
4000  /*
4001  * If advancing the write head of the circular buffer would crash into any
4002  * of the read heads, then the buffer is full. In other words, the
4003  * slowest reader (presumably apply) is the one that controls the release
4004  * of space.
4005  */
4006  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
4007  buffer_full = false;
4008  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4009  {
4010  if (new_write_head == lag_tracker->read_heads[i])
4011  buffer_full = true;
4012  }
4013 
4014  /*
4015  * If the buffer is full, for now we just rewind by one slot and overwrite
4016  * the last sample, as a simple (if somewhat uneven) way to lower the
4017  * sampling rate. There may be better adaptive compaction algorithms.
4018  */
4019  if (buffer_full)
4020  {
4021  new_write_head = lag_tracker->write_head;
4022  if (lag_tracker->write_head > 0)
4024  else
4026  }
4027 
4028  /* Store a sample at the current write head position. */
4030  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
4031  lag_tracker->write_head = new_write_head;
4032 }
XLogRecPtr last_lsn
Definition: walsender.c:226
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 1054 of file walsender.c.

1056 {
1057  XLogRecPtr flushptr;
1058  int count;
1059  WALReadError errinfo;
1060  XLogSegNo segno;
1061  TimeLineID currTLI;
1062 
1063  /*
1064  * Make sure we have enough WAL available before retrieving the current
1065  * timeline. This is needed to determine am_cascading_walsender accurately
1066  * which is needed to determine the current timeline.
1067  */
1068  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
1069 
1070  /*
1071  * Since logical decoding is also permitted on a standby server, we need
1072  * to check if the server is in recovery to decide how to get the current
1073  * timeline ID (so that it also cover the promotion or timeline change
1074  * cases).
1075  */
1077 
1079  GetXLogReplayRecPtr(&currTLI);
1080  else
1081  currTLI = GetWALInsertionTimeLine();
1082 
1083  XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
1084  sendTimeLineIsHistoric = (state->currTLI != currTLI);
1085  sendTimeLine = state->currTLI;
1086  sendTimeLineValidUpto = state->currTLIValidUntil;
1087  sendTimeLineNextTLI = state->nextTLI;
1088 
1089  /* fail if not (implies we are going to shut down) */
1090  if (flushptr < targetPagePtr + reqLen)
1091  return -1;
1092 
1093  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1094  count = XLOG_BLCKSZ; /* more than one block available */
1095  else
1096  count = flushptr - targetPagePtr; /* part of the page available */
1097 
1098  /* now actually read the data, we know it's there */
1099  if (!WALRead(state,
1100  cur_page,
1101  targetPagePtr,
1102  count,
1103  currTLI, /* Pass the current TLI because only
1104  * WalSndSegmentOpen controls whether new TLI
1105  * is needed. */
1106  &errinfo))
1107  WALReadRaiseError(&errinfo);
1108 
1109  /*
1110  * After reading into the buffer, check that what we read was valid. We do
1111  * this after reading, because even though the segment was present when we
1112  * opened it, it might get recycled or removed while we read it. The
1113  * read() succeeds in that case, but the data we tried to read might
1114  * already have been overwritten with new WAL records.
1115  */
1116  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1117  CheckXLogRemoved(segno, state->seg.ws_tli);
1118 
1119  return count;
1120 }
Definition: regguts.h:323
static TimeLineID sendTimeLine
Definition: walsender.c:159
static bool sendTimeLineIsHistoric
Definition: walsender.c:161
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1739
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:160
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:162
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6399
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3662
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1505
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:721
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1023

References am_cascading_walsender, CheckXLogRemoved(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3735 of file walsender.c.

3736 {
3737  Interval *result = palloc(sizeof(Interval));
3738 
3739  result->month = 0;
3740  result->day = 0;
3741  result->time = offset;
3742 
3743  return result;
3744 }
void * palloc(Size size)
Definition: mcxt.c:1201
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ ParseAlterReplSlotOptions()

static void ParseAlterReplSlotOptions ( AlterReplicationSlotCmd cmd,
bool failover 
)
static

Definition at line 1417 of file walsender.c.

1418 {
1419  bool failover_given = false;
1420 
1421  /* Parse options */
1422  foreach_ptr(DefElem, defel, cmd->options)
1423  {
1424  if (strcmp(defel->defname, "failover") == 0)
1425  {
1426  if (failover_given)
1427  ereport(ERROR,
1428  (errcode(ERRCODE_SYNTAX_ERROR),
1429  errmsg("conflicting or redundant options")));
1430  failover_given = true;
1431  *failover = defGetBoolean(defel);
1432  }
1433  else
1434  elog(ERROR, "unrecognized option: %s", defel->defname);
1435  }
1436 }
bool defGetBoolean(DefElem *def)
Definition: define.c:108
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469

References defGetBoolean(), elog(), ereport, errcode(), errmsg(), ERROR, foreach_ptr, and AlterReplicationSlotCmd::options.

Referenced by AlterReplicationSlot().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase,
bool failover 
)
static

Definition at line 1126 of file walsender.c.

1130 {
1131  ListCell *lc;
1132  bool snapshot_action_given = false;
1133  bool reserve_wal_given = false;
1134  bool two_phase_given = false;
1135  bool failover_given = false;
1136 
1137  /* Parse options */
1138  foreach(lc, cmd->options)
1139  {
1140  DefElem *defel = (DefElem *) lfirst(lc);
1141 
1142  if (strcmp(defel->defname, "snapshot") == 0)
1143  {
1144  char *action;
1145 
1146  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1147  ereport(ERROR,
1148  (errcode(ERRCODE_SYNTAX_ERROR),
1149  errmsg("conflicting or redundant options")));
1150 
1151  action = defGetString(defel);
1152  snapshot_action_given = true;
1153 
1154  if (strcmp(action, "export") == 0)
1155  *snapshot_action = CRS_EXPORT_SNAPSHOT;
1156  else if (strcmp(action, "nothing") == 0)
1157  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1158  else if (strcmp(action, "use") == 0)
1159  *snapshot_action = CRS_USE_SNAPSHOT;
1160  else
1161  ereport(ERROR,
1162  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1163  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1164  defel->defname, action)));
1165  }
1166  else if (strcmp(defel->defname, "reserve_wal") == 0)
1167  {
1168  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1169  ereport(ERROR,
1170  (errcode(ERRCODE_SYNTAX_ERROR),
1171  errmsg("conflicting or redundant options")));
1172 
1173  reserve_wal_given = true;
1174  *reserve_wal = defGetBoolean(defel);
1175  }
1176  else if (strcmp(defel->defname, "two_phase") == 0)
1177  {
1178  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1179  ereport(ERROR,
1180  (errcode(ERRCODE_SYNTAX_ERROR),
1181  errmsg("conflicting or redundant options")));
1182  two_phase_given = true;
1183  *two_phase = defGetBoolean(defel);
1184  }
1185  else if (strcmp(defel->defname, "failover") == 0)
1186  {
1187  if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1188  ereport(ERROR,
1189  (errcode(ERRCODE_SYNTAX_ERROR),
1190  errmsg("conflicting or redundant options")));
1191  failover_given = true;
1192  *failover = defGetBoolean(defel);
1193  }
1194  else
1195  elog(ERROR, "unrecognized option: %s", defel->defname);
1196  }
1197 }
char * defGetString(DefElem *def)
Definition: define.c:49
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:802
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog(), ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3751 of file walsender.c.

3752 {
3753 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3754  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3755  SyncRepStandbyData *sync_standbys;
3756  int num_standbys;
3757  int i;
3758 
3759  InitMaterializedSRF(fcinfo, 0);
3760 
3761  /*
3762  * Get the currently active synchronous standbys. This could be out of
3763  * date before we're done, but we'll use the data anyway.
3764  */
3765  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3766 
3767  for (i = 0; i < max_wal_senders; i++)
3768  {
3769  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3770  XLogRecPtr sent_ptr;
3771  XLogRecPtr write;
3772  XLogRecPtr flush;
3773  XLogRecPtr apply;
3774  TimeOffset writeLag;
3775  TimeOffset flushLag;
3776  TimeOffset applyLag;
3777  int priority;
3778  int pid;
3780  TimestampTz replyTime;
3781  bool is_sync_standby;
3783  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3784  int j;
3785 
3786  /* Collect data from shared memory */
3787  SpinLockAcquire(&walsnd->mutex);
3788  if (walsnd->pid == 0)
3789  {
3790  SpinLockRelease(&walsnd->mutex);
3791  continue;
3792  }
3793  pid = walsnd->pid;
3794  sent_ptr = walsnd->sentPtr;
3795  state = walsnd->state;
3796  write = walsnd->write;
3797  flush = walsnd->flush;
3798  apply = walsnd->apply;
3799  writeLag = walsnd->writeLag;
3800  flushLag = walsnd->flushLag;
3801  applyLag = walsnd->applyLag;
3802  priority = walsnd->sync_standby_priority;
3803  replyTime = walsnd->replyTime;
3804  SpinLockRelease(&walsnd->mutex);
3805 
3806  /*
3807  * Detect whether walsender is/was considered synchronous. We can
3808  * provide some protection against stale data by checking the PID
3809  * along with walsnd_index.
3810  */
3811  is_sync_standby = false;
3812  for (j = 0; j < num_standbys; j++)
3813  {
3814  if (sync_standbys[j].walsnd_index == i &&
3815  sync_standbys[j].pid == pid)
3816  {
3817  is_sync_standby = true;
3818  break;
3819  }
3820  }
3821 
3822  values[0] = Int32GetDatum(pid);
3823 
3824  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3825  {
3826  /*
3827  * Only superusers and roles with privileges of pg_read_all_stats
3828  * can see details. Other users only get the pid value to know
3829  * it's a walsender, but no details.
3830  */
3831  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3832  }
3833  else
3834  {
3836 
3837  if (XLogRecPtrIsInvalid(sent_ptr))
3838  nulls[2] = true;
3839  values[2] = LSNGetDatum(sent_ptr);
3840 
3842  nulls[3] = true;
3843  values[3] = LSNGetDatum(write);
3844 
3845  if (XLogRecPtrIsInvalid(flush))
3846  nulls[4] = true;
3847  values[4] = LSNGetDatum(flush);
3848 
3849  if (XLogRecPtrIsInvalid(apply))
3850  nulls[5] = true;
3851  values[5] = LSNGetDatum(apply);
3852 
3853  /*
3854  * Treat a standby such as a pg_basebackup background process
3855  * which always returns an invalid flush location, as an
3856  * asynchronous standby.
3857  */
3858  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3859 
3860  if (writeLag < 0)
3861  nulls[6] = true;
3862  else
3864 
3865  if (flushLag < 0)
3866  nulls[7] = true;
3867  else
3869 
3870  if (applyLag < 0)
3871  nulls[8] = true;
3872  else
3874 
3875  values[9] = Int32GetDatum(priority);
3876 
3877  /*
3878  * More easily understood version of standby state. This is purely
3879  * informational.
3880  *
3881  * In quorum-based sync replication, the role of each standby
3882  * listed in synchronous_standby_names can be changing very
3883  * frequently. Any standbys considered as "sync" at one moment can
3884  * be switched to "potential" ones at the next moment. So, it's
3885  * basically useless to report "sync" or "potential" as their sync
3886  * states. We report just "quorum" for them.
3887  */
3888  if (priority == 0)
3889  values[10] = CStringGetTextDatum("async");
3890  else if (is_sync_standby)
3892  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3893  else
3894  values[10] = CStringGetTextDatum("potential");
3895 
3896  if (replyTime == 0)
3897  nulls[11] = true;
3898  else
3899  values[11] = TimestampTzGetDatum(replyTime);
3900  }
3901 
3902  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3903  values, nulls);
3904  }
3905 
3906  return (Datum) 0;
3907 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5060
#define MemSet(start, val, len)
Definition: c.h:1009
int64 TimeOffset
Definition: timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:74
Oid GetUserId(void)
Definition: miscinit.c:515
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
uint8 syncrep_method
Definition: syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:99
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:713
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:750
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3735
#define PG_STAT_GET_WAL_SENDERS_COLS
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3716
WalSndState
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2250 of file walsender.c.

2251 {
2252  bool changed = false;
2254 
2255  Assert(lsn != InvalidXLogRecPtr);
2256  SpinLockAcquire(&slot->mutex);
2257  if (slot->data.restart_lsn != lsn)
2258  {
2259  changed = true;
2260  slot->data.restart_lsn = lsn;
2261  }
2262  SpinLockRelease(&slot->mutex);
2263 
2264  if (changed)
2265  {
2268  }
2269 
2270  /*
2271  * One could argue that the slot should be saved to disk now, but that'd
2272  * be energy wasted - the worst thing lost information could cause here is
2273  * to give wrong information in a statistics view - we'll just potentially
2274  * be more conservative in removing files.
2275  */
2276 }
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1041
XLogRecPtr restart_lsn
Definition: slot.h:93
slock_t mutex
Definition: slot.h:151

References Assert(), ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2387 of file walsender.c.

2388 {
2389  bool changed = false;
2391 
2392  SpinLockAcquire(&slot->mutex);
2394 
2395  /*
2396  * For physical replication we don't need the interlock provided by xmin
2397  * and effective_xmin since the consequences of a missed increase are
2398  * limited to query cancellations, so set both at once.
2399  */
2400  if (!TransactionIdIsNormal(slot->data.xmin) ||
2401  !TransactionIdIsNormal(feedbackXmin) ||
2402  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2403  {
2404  changed = true;
2405  slot->data.xmin = feedbackXmin;
2406  slot->effective_xmin = feedbackXmin;
2407  }
2408  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2409  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2410  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2411  {
2412  changed = true;
2413  slot->data.catalog_xmin = feedbackCatalogXmin;
2414  slot->effective_catalog_xmin = feedbackCatalogXmin;
2415  }
2416  SpinLockRelease(&slot->mutex);
2417 
2418  if (changed)
2419  {
2422  }
2423 }
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:985
TransactionId xmin
Definition: proc.h:178
TransactionId xmin
Definition: slot.h:82
TransactionId catalog_xmin
Definition: slot.h:90
TransactionId effective_catalog_xmin
Definition: slot.h:175
TransactionId effective_xmin
Definition: slot.h:174
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1617 of file walsender.c.

1618 {
1619  for (;;)
1620  {
1621  long sleeptime;
1622 
1623  /* Check for input from the client */
1625 
1626  /* die if timeout was reached */
1628 
1629  /* Send keepalive if the time has come */
1631 
1632  if (!pq_is_send_pending())
1633  break;
1634 
1636 
1637  /* Sleep until something happens or we time out */
1639  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1640 
1641  /* Clear any already-pending wakeups */
1643 
1645 
1646  /* Process any requests or signals received recently */
1647  if (ConfigReloadPending)
1648  {
1649  ConfigReloadPending = false;
1652  }
1653 
1654  /* Try to flush pending output to the client */
1655  if (pq_flush_if_writable() != 0)
1656  WalSndShutdown();
1657  }
1658 
1659  /* reactivate latch so WalSndLoop knows to continue */
1660  SetLatch(MyLatch);
1661 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1655
struct Latch * MyLatch
Definition: globals.c:59
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:633
void ResetLatch(Latch *latch)
Definition: latch.c:725
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_SOCKET_WRITEABLE
Definition: latch.h:129
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:404
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3581
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2635
static void ProcessRepliesIfAny(void)
Definition: walsender.c:2106
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3945
static void WalSndShutdown(void)
Definition: walsender.c:243
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2591

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2106 of file walsender.c.

2107 {
2108  unsigned char firstchar;
2109  int maxmsglen;
2110  int r;
2111  bool received = false;
2112 
2114 
2115  /*
2116  * If we already received a CopyDone from the frontend, any subsequent
2117  * message is the beginning of a new command, and should be processed in
2118  * the main processing loop.
2119  */
2120  while (!streamingDoneReceiving)
2121  {
2122  pq_startmsgread();
2123  r = pq_getbyte_if_available(&firstchar);
2124  if (r < 0)
2125  {
2126  /* unexpected error or EOF */
2128  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2129  errmsg("unexpected EOF on standby connection")));
2130  proc_exit(0);
2131  }
2132  if (r == 0)
2133  {
2134  /* no data available without blocking */
2135  pq_endmsgread();
2136  break;
2137  }
2138 
2139  /* Validate message type and set packet size limit */
2140  switch (firstchar)
2141  {
2142  case PqMsg_CopyData:
2143  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
2144  break;
2145  case PqMsg_CopyDone:
2146  case PqMsg_Terminate:
2147  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
2148  break;
2149  default:
2150  ereport(FATAL,
2151  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2152  errmsg("invalid standby message type \"%c\"",
2153  firstchar)));
2154  maxmsglen = 0; /* keep compiler quiet */
2155  break;
2156  }
2157 
2158  /* Read the message contents */
2160  if (pq_getmessage(&reply_message, maxmsglen))
2161  {
2163  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2164  errmsg("unexpected EOF on standby connection")));
2165  proc_exit(0);
2166  }
2167 
2168  /* ... and process it */
2169  switch (firstchar)
2170  {
2171  /*
2172  * 'd' means a standby reply wrapped in a CopyData packet.
2173  */
2174  case PqMsg_CopyData:
2176  received = true;
2177  break;
2178 
2179  /*
2180  * CopyDone means the standby requested to finish streaming.
2181  * Reply with CopyDone, if we had not sent that already.
2182  */
2183  case PqMsg_CopyDone:
2184  if (!streamingDoneSending)
2185  {
2186  pq_putmessage_noblock('c', NULL, 0);
2187  streamingDoneSending = true;
2188  }
2189 
2190  streamingDoneReceiving = true;
2191  received = true;
2192  break;
2193 
2194  /*
2195  * 'X' means that the standby is closing down the socket.
2196  */
2197  case PqMsg_Terminate:
2198  proc_exit(0);
2199 
2200  default:
2201  Assert(false); /* NOT REACHED */
2202  }
2203  }
2204 
2205  /*
2206  * Save the last reply timestamp if we've received at least one reply.
2207  */
2208  if (received)
2209  {
2211  waiting_for_ping_response = false;
2212  }
2213 }
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
void proc_exit(int code)
Definition: ipc.c:104
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1021
void pq_endmsgread(void)
Definition: pqcomm.c:1180
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_Terminate
Definition: protocol.h:28
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
static bool waiting_for_ping_response
Definition: walsender.c:185
static TimestampTz last_processing
Definition: walsender.c:176
static bool streamingDoneSending
Definition: walsender.c:193
static void ProcessStandbyMessage(void)
Definition: walsender.c:2219
static bool streamingDoneReceiving
Definition: walsender.c:194

References Assert(), COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2467 of file walsender.c.

2468 {
2469  TransactionId feedbackXmin;
2470  uint32 feedbackEpoch;
2471  TransactionId feedbackCatalogXmin;
2472  uint32 feedbackCatalogEpoch;
2473  TimestampTz replyTime;
2474 
2475  /*
2476  * Decipher the reply message. The caller already consumed the msgtype
2477  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2478  * of this message.
2479  */
2480  replyTime = pq_getmsgint64(&reply_message);
2481  feedbackXmin = pq_getmsgint(&reply_message, 4);
2482  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2483  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2484  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2485 
2487  {
2488  char *replyTimeStr;
2489 
2490  /* Copy because timestamptz_to_str returns a static buffer */
2491  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2492 
2493  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2494  feedbackXmin,
2495  feedbackEpoch,
2496  feedbackCatalogXmin,
2497  feedbackCatalogEpoch,
2498  replyTimeStr);
2499 
2500  pfree(replyTimeStr);
2501  }
2502 
2503  /*
2504  * Update shared state for this WalSender process based on reply data from
2505  * standby.
2506  */
2507  {
2508  WalSnd *walsnd = MyWalSnd;
2509 
2510  SpinLockAcquire(&walsnd->mutex);
2511  walsnd->replyTime = replyTime;
2512  SpinLockRelease(&walsnd->mutex);
2513  }
2514 
2515  /*
2516  * Unset WalSender's xmins if the feedback message values are invalid.
2517  * This happens when the downstream turned hot_standby_feedback off.
2518  */
2519  if (!TransactionIdIsNormal(feedbackXmin)
2520  && !TransactionIdIsNormal(feedbackCatalogXmin))
2521  {
2523  if (MyReplicationSlot != NULL)
2524  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2525  return;
2526  }
2527 
2528  /*
2529  * Check that the provided xmin/epoch are sane, that is, not in the future
2530  * and not so far back as to be already wrapped around. Ignore if not.
2531  */
2532  if (TransactionIdIsNormal(feedbackXmin) &&
2533  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2534  return;
2535 
2536  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2537  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2538  return;
2539 
2540  /*
2541  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2542  * the xmin will be taken into account by GetSnapshotData() /
2543  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2544  * thereby prevent the generation of cleanup conflicts on the standby
2545  * server.
2546  *
2547  * There is a small window for a race condition here: although we just
2548  * checked that feedbackXmin precedes nextXid, the nextXid could have
2549  * gotten advanced between our fetching it and applying the xmin below,
2550  * perhaps far enough to make feedbackXmin wrap around. In that case the
2551  * xmin we set here would be "in the future" and have no effect. No point
2552  * in worrying about this since it's too late to save the desired data
2553  * anyway. Assuming that the standby sends us an increasing sequence of
2554  * xmins, this could only happen during the first reply cycle, else our
2555  * own xmin would prevent nextXid from advancing so far.
2556  *
2557  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2558  * is assumed atomic, and there's no real need to prevent concurrent
2559  * horizon determinations. (If we're moving our xmin forward, this is
2560  * obviously safe, and if we're moving it backwards, well, the data is at
2561  * risk already since a VACUUM could already have determined the horizon.)
2562  *
2563  * If we're using a replication slot we reserve the xmin via that,
2564  * otherwise via the walsender's PGPROC entry. We can only track the
2565  * catalog xmin separately when using a slot, so we store the least of the
2566  * two provided when not using a slot.
2567  *
2568  * XXX: It might make sense to generalize the ephemeral slot concept and
2569  * always use the slot mechanism to handle the feedback xmin.
2570  */
2571  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2572  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2573  else
2574  {
2575  if (TransactionIdIsNormal(feedbackCatalogXmin)
2576  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2577  MyProc->xmin = feedbackCatalogXmin;
2578  else
2579  MyProc->xmin = feedbackXmin;
2580  }
2581 }
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1854
unsigned int uint32
Definition: c.h:495
uint32 TransactionId
Definition: c.h:641
bool message_level_is_interesting(int elevel)
Definition: elog.c:277
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1619
void pfree(void *pointer)
Definition: mcxt.c:1431
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:418
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:456
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2387
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2436

References DEBUG2, elog(), InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2219 of file walsender.c.

2220 {
2221  char msgtype;
2222 
2223  /*
2224  * Check message type from the first byte.
2225  */
2226  msgtype = pq_getmsgbyte(&reply_message);
2227 
2228  switch (msgtype)
2229  {
2230  case 'r':
2232  break;
2233 
2234  case 'h':
2236  break;
2237 
2238  default:
2240  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2241  errmsg("unexpected message type \"%c\"", msgtype)));
2242  proc_exit(0);
2243  }
2244 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2467
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2282

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2282 of file walsender.c.

2283 {
2284  XLogRecPtr writePtr,
2285  flushPtr,
2286  applyPtr;
2287  bool replyRequested;
2288  TimeOffset writeLag,
2289  flushLag,
2290  applyLag;
2291  bool clearLagTimes;
2292  TimestampTz now;
2293  TimestampTz replyTime;
2294 
2295  static bool fullyAppliedLastTime = false;
2296 
2297  /* the caller already consumed the msgtype byte */
2298  writePtr = pq_getmsgint64(&reply_message);
2299  flushPtr = pq_getmsgint64(&reply_message);
2300  applyPtr = pq_getmsgint64(&reply_message);
2301  replyTime = pq_getmsgint64(&reply_message);
2302  replyRequested = pq_getmsgbyte(&reply_message);
2303 
2305  {
2306  char *replyTimeStr;
2307 
2308  /* Copy because timestamptz_to_str returns a static buffer */
2309  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2310 
2311  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2312  LSN_FORMAT_ARGS(writePtr),
2313  LSN_FORMAT_ARGS(flushPtr),
2314  LSN_FORMAT_ARGS(applyPtr),
2315  replyRequested ? " (reply requested)" : "",
2316  replyTimeStr);
2317 
2318  pfree(replyTimeStr);
2319  }
2320 
2321  /* See if we can compute the round-trip lag for these positions. */
2323  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2324  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2325  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2326 
2327  /*
2328  * If the standby reports that it has fully replayed the WAL in two
2329  * consecutive reply messages, then the second such message must result
2330  * from wal_receiver_status_interval expiring on the standby. This is a
2331  * convenient time to forget the lag times measured when it last
2332  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2333  * until more WAL traffic arrives.
2334  */
2335  clearLagTimes = false;
2336  if (applyPtr == sentPtr)
2337  {
2338  if (fullyAppliedLastTime)
2339  clearLagTimes = true;
2340  fullyAppliedLastTime = true;
2341  }
2342  else
2343  fullyAppliedLastTime = false;
2344 
2345  /* Send a reply if the standby requested one. */
2346  if (replyRequested)
2348 
2349  /*
2350  * Update shared state for this WalSender process based on reply data from
2351  * standby.
2352  */
2353  {
2354  WalSnd *walsnd = MyWalSnd;
2355 
2356  SpinLockAcquire(&walsnd->mutex);
2357  walsnd->write = writePtr;
2358  walsnd->flush = flushPtr;
2359  walsnd->apply = applyPtr;
2360  if (writeLag != -1 || clearLagTimes)
2361  walsnd->writeLag = writeLag;
2362  if (flushLag != -1 || clearLagTimes)
2363  walsnd->flushLag = flushLag;
2364  if (applyLag != -1 || clearLagTimes)
2365  walsnd->applyLag = applyLag;
2366  walsnd->replyTime = replyTime;
2367  SpinLockRelease(&walsnd->mutex);
2368  }
2369 
2372 
2373  /*
2374  * Advance our local xmin horizon when the client confirmed a flush.
2375  */
2376  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2377  {
2380  else
2382  }
2383 }
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1829
#define SlotIsLogical(slot)
Definition: slot.h:207
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:433
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
static XLogRecPtr sentPtr
Definition: walsender.c:168
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2250
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:3922
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:4048

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog(), WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 493 of file walsender.c.

494 {
495 #define READ_REPLICATION_SLOT_COLS 3
496  ReplicationSlot *slot;
498  TupOutputState *tstate;
499  TupleDesc tupdesc;
501  bool nulls[READ_REPLICATION_SLOT_COLS];
502 
504  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
505  TEXTOID, -1, 0);
506  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
507  TEXTOID, -1, 0);
508  /* TimeLineID is unsigned, so int4 is not wide enough. */
509  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
510  INT8OID, -1, 0);
511 
512  memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
513 
514  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
515  slot = SearchNamedReplicationSlot(cmd->slotname, false);
516  if (slot == NULL || !slot->in_use)
517  {
518  LWLockRelease(ReplicationSlotControlLock);
519  }
520  else
521  {
522  ReplicationSlot slot_contents;
523  int i = 0;
524 
525  /* Copy slot contents while holding spinlock */
526  SpinLockAcquire(&slot->mutex);
527  slot_contents = *slot;
528  SpinLockRelease(&slot->mutex);
529  LWLockRelease(ReplicationSlotControlLock);
530 
531  if (OidIsValid(slot_contents.data.database))
532  ereport(ERROR,
533  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
534  errmsg("cannot use %s with a logical replication slot",
535  "READ_REPLICATION_SLOT"));
536 
537  /* slot type */
538  values[i] = CStringGetTextDatum("physical");
539  nulls[i] = false;
540  i++;
541 
542  /* start LSN */
543  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
544  {
545  char xloc[64];
546 
547  snprintf(xloc, sizeof(xloc), "%X/%X",
548  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
549  values[i] = CStringGetTextDatum(xloc);
550  nulls[i] = false;
551  }
552  i++;
553 
554  /* timeline this WAL was produced on */
555  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
556  {
557  TimeLineID slots_position_timeline;
558  TimeLineID current_timeline;
559  List *timeline_history = NIL;
560 
561  /*
562  * While in recovery, use as timeline the currently-replaying one
563  * to get the LSN position's history.
564  */
565  if (RecoveryInProgress())
566  (void) GetXLogReplayRecPtr(&current_timeline);
567  else
568  current_timeline = GetWALInsertionTimeLine();
569 
570  timeline_history = readTimeLineHistory(current_timeline);
571  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
572  timeline_history);
573  values[i] = Int64GetDatum((int64) slots_position_timeline);
574  nulls[i] = false;
575  }
576  i++;
577 
579  }
580 
582  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
583  do_tup_output(tstate, values, nulls);
584  end_tup_output(tstate);
585 }
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
#define OidIsValid(objectId)
Definition: c.h:764
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:117
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:426
Definition: pg_list.h:54
bool in_use
Definition: slot.h:154
#define READ_REPLICATION_SLOT_COLS

References Assert(), begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 592 of file walsender.c.

593 {
595  TupleDesc tupdesc;
597  char histfname[MAXFNAMELEN];
598  char path[MAXPGPATH];
599  int fd;
600  off_t histfilelen;
601  off_t bytesleft;
602  Size len;
603 
605 
606  /*
607  * Reply with a result set with one row, and two columns. The first col is
608  * the name of the history file, 2nd is the contents.
609  */
610  tupdesc = CreateTemplateTupleDesc(2);
611  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
612  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
613 
614  TLHistoryFileName(histfname, cmd->timeline);
615  TLHistoryFilePath(path, cmd->timeline);
616 
617  /* Send a RowDescription message */
618  dest->rStartup(dest, CMD_SELECT, tupdesc);
619 
620  /* Send a DataRow message */
622  pq_sendint16(&buf, 2); /* # of columns */
623  len = strlen(histfname);
624  pq_sendint32(&buf, len); /* col1 len */
625  pq_sendbytes(&buf, histfname, len);
626 
627  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
628  if (fd < 0)
629  ereport(ERROR,
631  errmsg("could not open file \"%s\": %m", path)));
632 
633  /* Determine file length and send it to client */
634  histfilelen = lseek(fd, 0, SEEK_END);
635  if (histfilelen < 0)
636  ereport(ERROR,
638  errmsg("could not seek to end of file \"%s\": %m", path)));
639  if (lseek(fd, 0, SEEK_SET) != 0)
640  ereport(ERROR,
642  errmsg("could not seek to beginning of file \"%s\": %m", path)));
643 
644  pq_sendint32(&buf, histfilelen); /* col2 len */
645 
646  bytesleft = histfilelen;
647  while (bytesleft > 0)
648  {
649  PGAlignedBlock rbuf;
650  int nread;
651 
652  pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
653  nread = read(fd, rbuf.data, sizeof(rbuf));
655  if (nread < 0)
656  ereport(ERROR,
658  errmsg("could not read file \"%s\": %m",
659  path)));
660  else if (nread == 0)
661  ereport(ERROR,
663  errmsg("could not read file \"%s\": read %d of %zu",
664  path, nread, (Size) bytesleft)));
665 
666  pq_sendbytes(&buf, rbuf.data, nread);
667  bytesleft -= nread;
668  }
669 
670  if (CloseTransientFile(fd) != 0)
671  ereport(ERROR,
673  errmsg("could not close file \"%s\": %m", path)));
674 
675  pq_endmessage(&buf);
676 }
#define PG_BINARY
Definition: c.h:1262
size_t Size
Definition: c.h:594
int errcode_for_file_access(void)
Definition: elog.c:883
int CloseTransientFile(int fd)
Definition: fd.c:2809
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
#define read(a, b, c)
Definition: win32.h:13
@ CMD_SELECT
Definition: nodes.h:255
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:299
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PqMsg_DataRow
Definition: protocol.h:43
TimeLineID timeline
Definition: replnodes.h:120
char data[BLCKSZ]
Definition: c.h:1108
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), PGAlignedBlock::data, generate_unaccent_rules::dest, DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1455 of file walsender.c.

1456 {
1458  QueryCompletion qc;
1459 
1460  /* make sure that our requirements are still fulfilled */
1462 
1464 
1465  ReplicationSlotAcquire(cmd->slotname, true);
1466 
1467  /*
1468  * Force a disconnect, so that the decoding code doesn't need to care
1469  * about an eventual switch from running in recovery, to running in a
1470  * normal environment. Client code is expected to handle reconnects.
1471  */
1473  {
1474  ereport(LOG,
1475  (errmsg("terminating walsender process after promotion")));
1476  got_STOPPING = true;
1477  }
1478 
1479  /*
1480  * Create our decoding context, making it start at the previously ack'ed
1481  * position.
1482  *
1483  * Do this before sending a CopyBothResponse message, so that any errors
1484  * are reported early.
1485  */
1487  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1488  XL_ROUTINE(.page_read = logical_read_xlog_page,
1489  .segment_open = WalSndSegmentOpen,
1490  .segment_close = wal_segment_close),
1494 
1496 
1497  /* Send a CopyBothResponse message, and start streaming */
1499  pq_sendbyte(&buf, 0);
1500  pq_sendint16(&buf, 0);
1501  pq_endmessage(&buf);
1502  pq_flush();
1503 
1504  /* Start reading WAL from the oldest required WAL. */
1507 
1508  /*
1509  * Report the location after which we'll send out further commits as the
1510  * current sentPtr.
1511  */
1513 
1514  /* Also update the sent position status in shared memory */
1518 
1519  replication_active = true;
1520 
1522 
1523  /* Main loop of walsender */
1525 
1528 
1529  replication_active = false;
1530  if (got_STOPPING)
1531  proc_exit(0);
1533 
1534  /* Get out of COPY mode (CommandComplete). */
1535  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1536  EndCommand(&qc, DestRemote, false);
1537 }
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:38
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
@ DestRemote
Definition: dest.h:89
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:496
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:502
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:97
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2662
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:211
static void XLogSendLogical(void)
Definition: walsender.c:3271
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:233

References am_cascading_walsender, Assert(), buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 822 of file walsender.c.

823 {
825  XLogRecPtr FlushPtr;
826  TimeLineID FlushTLI;
827 
828  /* create xlogreader for physical replication */
829  xlogreader =
831  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
832  .segment_close = wal_segment_close),
833  NULL);
834 
835  if (!xlogreader)
836  ereport(ERROR,
837  (errcode(ERRCODE_OUT_OF_MEMORY),
838  errmsg("out of memory"),
839  errdetail("Failed while allocating a WAL reading processor.")));
840 
841  /*
842  * We assume here that we're logging enough information in the WAL for
843  * log-shipping, since this is checked in PostmasterMain().
844  *
845  * NOTE: wal_level can only change at shutdown, so in most cases it is
846  * difficult for there to be WAL data that we can still see that was
847  * written at wal_level='minimal'.
848  */
849 
850  if (cmd->slotname)
851  {
852  ReplicationSlotAcquire(cmd->slotname, true);
854  ereport(ERROR,
855  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
856  errmsg("cannot use a logical replication slot for physical replication")));
857 
858  /*
859  * We don't need to verify the slot's restart_lsn here; instead we
860  * rely on the caller requesting the starting point to use. If the
861  * WAL segment doesn't exist, we'll fail later.
862  */
863  }
864 
865  /*
866  * Select the timeline. If it was given explicitly by the client, use
867  * that. Otherwise use the timeline of the last replayed record.
868  */
871  FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
872  else
873  FlushPtr = GetFlushRecPtr(&FlushTLI);
874 
875  if (cmd->timeline != 0)
876  {
877  XLogRecPtr switchpoint;
878 
879  sendTimeLine = cmd->timeline;
880  if (sendTimeLine == FlushTLI)
881  {
882  sendTimeLineIsHistoric = false;
884  }
885  else
886  {
887  List *timeLineHistory;
888 
889  sendTimeLineIsHistoric = true;
890 
891  /*
892  * Check that the timeline the client requested exists, and the
893  * requested start location is on that timeline.
894  */
895  timeLineHistory = readTimeLineHistory(FlushTLI);
896  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
898  list_free_deep(timeLineHistory);
899 
900  /*
901  * Found the requested timeline in the history. Check that
902  * requested startpoint is on that timeline in our history.
903  *
904  * This is quite loose on purpose. We only check that we didn't
905  * fork off the requested timeline before the switchpoint. We
906  * don't check that we switched *to* it before the requested
907  * starting point. This is because the client can legitimately
908  * request to start replication from the beginning of the WAL
909  * segment that contains switchpoint, but on the new timeline, so
910  * that it doesn't end up with a partial segment. If you ask for
911  * too old a starting point, you'll get an error later when we
912  * fail to find the requested WAL segment in pg_wal.
913  *
914  * XXX: we could be more strict here and only allow a startpoint
915  * that's older than the switchpoint, if it's still in the same
916  * WAL segment.
917  */
918  if (!XLogRecPtrIsInvalid(switchpoint) &&
919  switchpoint < cmd->startpoint)
920  {
921  ereport(ERROR,
922  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
924  cmd->timeline),
925  errdetail("This server's history forked from timeline %u at %X/%X.",
926  cmd->timeline,
927  LSN_FORMAT_ARGS(switchpoint))));
928  }
929  sendTimeLineValidUpto = switchpoint;
930  }
931  }
932  else
933  {
934  sendTimeLine = FlushTLI;
936  sendTimeLineIsHistoric = false;
937  }
938 
940 
941  /* If there is nothing to stream, don't even enter COPY mode */
943  {
944  /*
945  * When we first start replication the standby will be behind the
946  * primary. For some applications, for example synchronous
947  * replication, it is important to have a clear state for this initial
948  * catchup mode, so we can trigger actions when we change streaming
949  * state later. We may stay in this state for a long time, which is
950  * exactly why we want to be able to monitor whether or not we are
951  * still here.
952  */
954 
955  /* Send a CopyBothResponse message, and start streaming */
957  pq_sendbyte(&buf, 0);
958  pq_sendint16(&buf, 0);
959  pq_endmessage(&buf);
960  pq_flush();
961 
962  /*
963  * Don't allow a request to stream from a future point in WAL that
964  * hasn't been flushed to disk in this server yet.
965  */
966  if (FlushPtr < cmd->startpoint)
967  {
968  ereport(ERROR,
969  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
971  LSN_FORMAT_ARGS(FlushPtr))));
972  }
973 
974  /* Start streaming from the requested point */
975  sentPtr = cmd->startpoint;
976 
977  /* Initialize shared memory status, too */
981 
983 
984  /* Main loop of walsender */
985  replication_active = true;
986 
988 
989  replication_active = false;
990  if (got_STOPPING)
991  proc_exit(0);
993 
995  }
996 
997  if (cmd->slotname)
999 
1000  /*
1001  * Copy is finished now. Send a single-row result set indicating the next
1002  * timeline.
1003  */
1005  {
1006  char startpos_str[8 + 1 + 8 + 1];
1007  DestReceiver *dest;
1008  TupOutputState *tstate;
1009  TupleDesc tupdesc;
1010  Datum values[2];
1011  bool nulls[2] = {0};
1012 
1013  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
1015 
1017 
1018  /*
1019  * Need a tuple descriptor representing two columns. int8 may seem
1020  * like a surprising data type for this, but in theory int4 would not
1021  * be wide enough for this, as TimeLineID is unsigned.
1022  */
1023  tupdesc = CreateTemplateTupleDesc(2);
1024  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1025  INT8OID, -1, 0);
1026  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1027  TEXTOID, -1, 0);
1028 
1029  /* prepare for projection of tuple */
1030  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1031 
1033  values[1] = CStringGetTextDatum(startpos_str);
1034 
1035  /* send it to dest */
1036  do_tup_output(tstate, values, nulls);
1037 
1038  end_tup_output(tstate);
1039  }
1040 
1041  /* Send CommandComplete message */
1042  EndReplicationCommand("START_STREAMING");
1043 }
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
int errdetail(const char *fmt,...)
Definition: elog.c:1208
void list_free_deep(List *list)
Definition: list.c:1560
TimeLineID timeline
Definition: replnodes.h:96
static void XLogSendPhysical(void)
Definition: walsender.c:2961
int wal_segment_size
Definition: xlog.c:147
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:108

References am_cascading_walsender, Assert(), begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2436 of file walsender.c.

2437 {
2438  FullTransactionId nextFullXid;
2439  TransactionId nextXid;
2440  uint32 nextEpoch;
2441 
2442  nextFullXid = ReadNextFullTransactionId();
2443  nextXid = XidFromFullTransactionId(nextFullXid);
2444  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2445 
2446  if (xid <= nextXid)
2447  {
2448  if (epoch != nextEpoch)
2449  return false;
2450  }
2451  else
2452  {
2453  if (epoch + 1 != nextEpoch)
2454  return false;
2455  }
2456 
2457  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2458  return false; /* epoch OK, but it's wrapped around */
2459 
2460  return true;
2461 }
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 682 of file walsender.c.

683 {
684  MemoryContext mcxt;
686  off_t offset = 0;
688 
689  /*
690  * parsing the manifest will use the cryptohash stuff, which requires a
691  * resource owner
692  */
693  Assert(CurrentResourceOwner == NULL);
694  CurrentResourceOwner = ResourceOwnerCreate(NULL, "base backup");
695 
696  /* Prepare to read manifest data into a temporary context. */
698  "incremental backup information",
700  ib = CreateIncrementalBackupInfo(mcxt);
701 
702  /* Send a CopyInResponse message */
703  pq_beginmessage(&buf, 'G');
704  pq_sendbyte(&buf, 0);
705  pq_sendint16(&buf, 0);
707  pq_flush();
708 
709  /* Receive packets from client until done. */
710  while (HandleUploadManifestPacket(&buf, &offset, ib))
711  ;
712 
713  /* Finish up manifest processing. */
715 
716  /*
717  * Discard any old manifest information and arrange to preserve the new
718  * information we just got.
719  *
720  * We assume that MemoryContextDelete and MemoryContextSetParent won't
721  * fail, and thus we shouldn't end up bailing out of here in such a way as
722  * to leave dangling pointers.
723  */
724  if (uploaded_manifest_mcxt != NULL)
727  uploaded_manifest = ib;
728  uploaded_manifest_mcxt = mcxt;
729 
730  /* clean up the resource owner we created */
731  WalSndResourceCleanup(true);
732 }
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition: mcxt.c:546
MemoryContext CacheMemoryContext
Definition: mcxt.c:144
void pq_endmessage_reuse(StringInfo buf)
Definition: pqformat.c:317
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:413
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition: walsender.c:746
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:365
static MemoryContext uploaded_manifest_mcxt
Definition: walsender.c:151

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), ResourceOwnerCreate(), uploaded_manifest, uploaded_manifest_mcxt, and WalSndResourceCleanup().

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2635 of file walsender.c.

2636 {
2637  TimestampTz timeout;
2638 
2639  /* don't bail out if we're doing something that doesn't require timeouts */
2640  if (last_reply_timestamp <= 0)
2641  return;
2642 
2645 
2646  if (wal_sender_timeout > 0 && last_processing >= timeout)
2647  {
2648  /*
2649  * Since typically expiration of replication timeout means
2650  * communication problem, we don't send the error message to the
2651  * standby.
2652  */
2654  (errmsg("terminating walsender process due to replication timeout")));
2655 
2656  WalSndShutdown();
2657  }
2658 }
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_sender_timeout
Definition: walsender.c:126

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2591 of file walsender.c.

2592 {
2593  long sleeptime = 10000; /* 10 s */
2594 
2596  {
2597  TimestampTz wakeup_time;
2598 
2599  /*
2600  * At the latest stop sleeping once wal_sender_timeout has been
2601  * reached.
2602  */
2605 
2606  /*
2607  * If no ping has been sent yet, wakeup when it's time to do so.
2608  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2609  * the timeout passed without a response.
2610  */
2613  wal_sender_timeout / 2);
2614 
2615  /* Compute relative time until wakeup. */
2616  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2617  }
2618 
2619  return sleeptime;
2620 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1767

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3356 of file walsender.c.

3357 {
3358  XLogRecPtr replicatedPtr;
3359 
3360  /* ... let's just be real sure we're caught up ... */
3361  send_data();
3362 
3363  /*
3364  * To figure out whether all WAL has successfully been replicated, check
3365  * flush location if valid, write otherwise. Tools like pg_receivewal will
3366  * usually (unless in synchronous mode) return an invalid flush location.
3367  */
3368  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3370 
3371  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3372  !pq_is_send_pending())
3373  {
3374  QueryCompletion qc;
3375 
3376  /* Inform the standby that XLOG streaming is done */
3377  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3378  EndCommand(&qc, DestRemote, false);
3379  pq_flush();
3380 
3381  proc_exit(0);
3382  }
3385 }
static bool WalSndCaughtUp
Definition: walsender.c:197

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 330 of file walsender.c.

331 {
335 
336  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
338 
339  if (MyReplicationSlot != NULL)
341 
343 
344  replication_active = false;
345 
346  /*
347  * If there is a transaction in progress, it will clean up our
348  * ResourceOwner, but if a replication command set up a resource owner
349  * without a transaction, we've got to clean that up now.
350  */
352  WalSndResourceCleanup(false);
353 
354  if (got_STOPPING || got_SIGUSR2)
355  proc_exit(0);
356 
357  /* Revert back to startup state */
359 }
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1876
void ReplicationSlotCleanup(void)
Definition: slot.c:682
WALOpenSegment seg
Definition: xlogreader.h:272
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:200
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4850

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3716 of file walsender.c.

3717 {
3718  switch (state)
3719  {
3720  case WALSNDSTATE_STARTUP:
3721  return "startup";
3722  case WALSNDSTATE_BACKUP:
3723  return "backup";
3724  case WALSNDSTATE_CATCHUP:
3725  return "catchup";
3726  case WALSNDSTATE_STREAMING:
3727  return "streaming";
3728  case WALSNDSTATE_STOPPING:
3729  return "stopping";
3730  }
3731  return "UNKNOWN";
3732 }
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3633 of file walsender.c.

3634 {
3635  int i;
3636 
3637  for (i = 0; i < max_wal_senders; i++)
3638  {
3639  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3640  pid_t pid;
3641 
3642  SpinLockAcquire(&walsnd->mutex);
3643  pid = walsnd->pid;
3644  SpinLockRelease(&walsnd->mutex);
3645 
3646  if (pid == 0)
3647  continue;
3648 
3650  }
3651 }
#define InvalidBackendId
Definition: backendid.h:23
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 3922 of file walsender.c.

3923 {
3924  elog(DEBUG2, "sending replication keepalive");
3925 
3926  /* construct the message... */
3928  pq_sendbyte(&output_message, 'k');
3929  pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
3931  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3932 
3933  /* ... and send it wrapped in CopyData */
3935 
3936  /* Set local flag */
3937  if (requestReply)
3939 }
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153

References StringInfoData::data, DEBUG2, elog(), GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3945 of file walsender.c.

3946 {
3947  TimestampTz ping_time;
3948 
3949  /*
3950  * Don't send keepalive messages if timeouts are globally disabled or
3951  * we're doing something not partaking in timeouts.
3952  */
3953  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3954  return;
3955 
3957  return;
3958 
3959  /*
3960  * If half of wal_sender_timeout has lapsed without receiving any reply
3961  * from the standby, send a keep-alive message to the standby requesting
3962  * an immediate reply.
3963  */
3965  wal_sender_timeout / 2);
3966  if (last_processing >= ping_time)
3967  {
3969 
3970  /* Try to flush pending output to the client */
3971  if (pq_flush_if_writable() != 0)
3972  WalSndShutdown();
3973  }
3974 }

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2865 of file walsender.c.

2866 {
2867  WalSnd *walsnd = MyWalSnd;
2868 
2869  Assert(walsnd != NULL);
2870 
2871  MyWalSnd = NULL;
2872 
2873  SpinLockAcquire(&walsnd->mutex);
2874  /* clear latch while holding the spinlock, so it can safely be read */
2875  walsnd->latch = NULL;
2876  /* Mark WalSnd struct as no longer being in use. */
2877  walsnd->pid = 0;
2878  SpinLockRelease(&walsnd->mutex);
2879 }

References Assert(), WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3477 of file walsender.c.

3478 {
3479  got_SIGUSR2 = true;
3480  SetLatch(MyLatch);
3481 }

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2662 of file walsender.c.

2663 {
2664  /*
2665  * Initialize the last reply timestamp. That enables timeout processing
2666  * from hereon.
2667  */
2669  waiting_for_ping_response = false;
2670 
2671  /*
2672  * Loop until we reach the end of this timeline or the client requests to
2673  * stop streaming.
2674  */
2675  for (;;)
2676  {
2677  /* Clear any already-pending wakeups */
2679 
2681 
2682  /* Process any requests or signals received recently */
2683  if (ConfigReloadPending)
2684  {
2685  ConfigReloadPending = false;
2688  }
2689 
2690  /* Check for input from the client */
2692 
2693  /*
2694  * If we have received CopyDone from the client, sent CopyDone
2695  * ourselves, and the output buffer is empty, it's time to exit
2696  * streaming.
2697  */
2699  !pq_is_send_pending())
2700  break;
2701 
2702  /*
2703  * If we don't have any pending data in the output buffer, try to send
2704  * some more. If there is some, we don't bother to call send_data
2705  * again until we've flushed it ... but we'd better assume we are not
2706  * caught up.
2707  */
2708  if (!pq_is_send_pending())
2709  send_data();
2710  else
2711  WalSndCaughtUp = false;
2712 
2713  /* Try to flush pending output to the client */
2714  if (pq_flush_if_writable() != 0)
2715  WalSndShutdown();
2716 
2717  /* If nothing remains to be sent right now ... */
2719  {
2720  /*
2721  * If we're in catchup state, move to streaming. This is an
2722  * important state change for users to know about, since before
2723  * this point data loss might occur if the primary dies and we
2724  * need to failover to the standby. The state change is also
2725  * important for synchronous replication, since commits that
2726  * started to wait at that point might wait for some time.
2727  */
2729  {
2730  ereport(DEBUG1,
2731  (errmsg_internal("\"%s\" has now caught up with upstream server",
2732  application_name)));
2734  }
2735 
2736  /*
2737  * When SIGUSR2 arrives, we send any outstanding logs up to the
2738  * shutdown checkpoint record (i.e., the latest record), wait for
2739  * them to be replicated to the standby, and exit. This may be a
2740  * normal termination at shutdown, or a promotion, the walsender
2741  * is not sure which.
2742  */
2743  if (got_SIGUSR2)
2744  WalSndDone(send_data);
2745  }
2746 
2747  /* Check for replication timeout. */
2749 
2750  /* Send keepalive if the time has come */
2752 
2753  /*
2754  * Block if we have unsent data. XXX For logical replication, let
2755  * WalSndWaitForWal() handle any other blocking; idle receivers need
2756  * its additional actions. For physical replication, also block if
2757  * caught up; its send_data does not block.
2758  */
2759  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2762  {
2763  long sleeptime;
2764  int wakeEvents;
2765 
2767  wakeEvents = WL_SOCKET_READABLE;
2768  else
2769  wakeEvents = 0;
2770 
2771  /*
2772  * Use fresh timestamp, not last_processing, to reduce the chance
2773  * of reaching wal_sender_timeout before sending a keepalive.
2774  */
2776 
2777  if (pq_is_send_pending())
2778  wakeEvents |= WL_SOCKET_WRITEABLE;
2779 
2780  /* Sleep until something happens or we time out */
2781  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2782  }
2783  }
2784 }
char * application_name
Definition: guc_tables.c:543
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3356

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1548 of file walsender.c.

1549 {
1550  /* can't have sync rep confused by sending the same LSN several times */
1551  if (!last_write)
1552  lsn = InvalidXLogRecPtr;
1553 
1554  resetStringInfo(ctx->out);
1555 
1556  pq_sendbyte(ctx->out, 'w');
1557  pq_sendint64(ctx->out, lsn); /* dataStart */
1558  pq_sendint64(ctx->out, lsn); /* walEnd */
1559 
1560  /*
1561  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1562  * reserve space here.
1563  */
1564  pq_sendint64(ctx->out, 0); /* sendtime */
1565 }
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 365 of file walsender.c.

366 {
367  ResourceOwner resowner;
368 
369  if (CurrentResourceOwner == NULL)
370  return;
371 
372  /*
373  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
374  * in a local variable and clear it first.
375  */
376  resowner = CurrentResourceOwner;
377  CurrentResourceOwner = NULL;
378 
379  /* Now we can release resources and delete it. */
380  ResourceOwnerRelease(resowner,
381  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
382  ResourceOwnerRelease(resowner,
383  RESOURCE_RELEASE_LOCKS, isCommit, true);
384  ResourceOwnerRelease(resowner,
385  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
386  ResourceOwnerDelete(resowner);
387 }
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:648
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:854
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:55
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:54
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:56

References CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), and ResourceOwnerRelease().

Referenced by perform_base_backup(), UploadManifest(), and WalSndErrorCleanup().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3432 of file walsender.c.

3433 {
3434  int i;
3435 
3436  for (i = 0; i < max_wal_senders; i++)
3437  {
3438  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3439 
3440  SpinLockAcquire(&walsnd->mutex);
3441  if (walsnd->pid == 0)
3442  {
3443  SpinLockRelease(&walsnd->mutex);
3444  continue;
3445  }
3446  walsnd->needreload = true;
3447  SpinLockRelease(&walsnd->mutex);
3448  }
3449 }

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2883 of file walsender.c.

2885 {
2886  char path[MAXPGPATH];
2887 
2888  /*-------
2889  * When reading from a historic timeline, and there is a timeline switch
2890  * within this segment, read from the WAL segment belonging to the new
2891  * timeline.
2892  *
2893  * For example, imagine that this server is currently on timeline 5, and
2894  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2895  * 0/13002088. In pg_wal, we have these files:
2896  *
2897  * ...
2898  * 000000040000000000000012
2899  * 000000040000000000000013
2900  * 000000050000000000000013
2901  * 000000050000000000000014
2902  * ...
2903  *
2904  * In this situation, when requested to send the WAL from segment 0x13, on
2905  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2906  * recovery prefers files from newer timelines, so if the segment was
2907  * restored from the archive on this server, the file belonging to the old
2908  * timeline, 000000040000000000000013, might not exist. Their contents are
2909  * equal up to the switchpoint, because at a timeline switch, the used
2910  * portion of the old segment is copied to the new file.
2911  */
2912  *tli_p = sendTimeLine;
2914  {
2915  XLogSegNo endSegNo;
2916 
2917  XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2918  if (nextSegNo == endSegNo)
2919  *tli_p = sendTimeLineNextTLI;
2920  }
2921 
2922  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2923  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2924  if (state->seg.ws_file >= 0)
2925  return;
2926 
2927  /*
2928  * If the file is not found, assume it's because the standby asked for a
2929  * too old WAL segment that has already been removed or recycled.
2930  */
2931  if (errno == ENOENT)
2932  {
2933  char xlogfname[MAXFNAMELEN];
2934  int save_errno = errno;
2935 
2936  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2937  errno = save_errno;
2938  ereport(ERROR,
2940  errmsg("requested WAL segment %s has already been removed",
2941  xlogfname)));
2942  }
2943  else
2944  ereport(ERROR,
2946  errmsg("could not open file \"%s\": %m",
2947  path)));
2948 }
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1087
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3697 of file walsender.c.

3698 {
3699  WalSnd *walsnd = MyWalSnd;
3700 
3702 
3703  if (walsnd->state == state)
3704  return;
3705 
3706  SpinLockAcquire(&walsnd->mutex);
3707  walsnd->state = state;
3708  SpinLockRelease(&walsnd->mutex);
3709 }

References am_walsender, Assert(), WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3516 of file walsender.c.

3517 {
3518  bool found;
3519  int i;
3520 
3521  WalSndCtl = (WalSndCtlData *)
3522  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3523 
3524  if (!found)
3525  {
3526  /* First time through, so initialize */
3528 
3529  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3531 
3532  for (i = 0; i < max_wal_senders; i++)
3533  {
3534  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3535 
3536  SpinLockInit(&walsnd->mutex);
3537  }
3538 
3541  }
3542 }
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:388
#define SpinLockInit(lock)
Definition: spin.h:60
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition: walsender.c:3504

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3504 of file walsender.c.

3505 {
3506  Size size = 0;
3507 
3508  size = offsetof(WalSndCtlData, walsnds);
3509  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3510 
3511  return size;
3512 }
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
Size mul_size(Size s1, Size s2)
Definition: shmem.c:511

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 243 of file walsender.c.

282 {
284 
285  /* Create a per-walsender data structure in shared memory */
287 
288  /*
289  * We don't currently need any ResourceOwner in a walsender process, but
290  * if we did, we could call CreateAuxProcessResourceOwner here.
291  */
292 
293  /*
294  * Let postmaster know that we're a WAL sender. Once we've declared us as
295  * a WAL sender process, postmaster will let us outlive the bgwriter and
296  * kill us last in the shutdown sequence, so we get a chance to stream all
297  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
298  * there's no going back, and we mustn't write any WAL records after this.
299  */
302 
303  /*
304  * If the client didn't specify a database to connect to, show in PGPROC
305  * that our advertised xmin should affect vacuum horizons in all
306  * databases. This allows physical replication clients to send hot
307  * standby feedback that will delay vacuum cleanup in all databases.
308  */
309  if (MyDatabaseId == InvalidOid)
310  {
312  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
315  LWLockRelease(ProcArrayLock);
316  }
317 
318  /* Initialize empty timestamp buffer for lag tracking. */
320 }
@ LW_EXCLUSIVE
Definition: lwlock.h:116
MemoryContext TopMemoryContext
Definition: mcxt.c:141
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1077
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:181
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:339
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:61
PROC_HDR * ProcGlobal
Definition: proc.c:81
uint8 statusFlags
Definition: proc.h:228
int pgxactoff
Definition: proc.h:188
uint8 * statusFlags
Definition: proc.h:373
static void InitWalSenderSlot(void)
Definition: walsender.c:2788

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3485 of file walsender.c.

3486 {
3487  /* Set up signal handlers */
3489  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3490  pqsignal(SIGTERM, die); /* request shutdown */
3491  /* SIGQUIT handler was already set up by InitPostmasterChild */
3492  InitializeTimeouts(); /* establishes SIGALRM handler */
3495  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3496  * shutdown */
3497 
3498  /* Reset some signals that are accepted by postmaster but not here */
3500 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3001
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:639
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3477
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1671 of file walsender.c.

1673 {
1674  static TimestampTz sendTime = 0;
1676  bool pending_writes = false;
1677  bool end_xact = ctx->end_xact;
1678 
1679  /*
1680  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1681  * avoid flooding the lag tracker when we commit frequently.
1682  *
1683  * We don't have a mechanism to get the ack for any LSN other than end
1684  * xact LSN from the downstream. So, we track lag only for end of
1685  * transaction LSN.
1686  */
1687 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1688  if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1690  {
1691  LagTrackerWrite(lsn, now);
1692  sendTime = now;
1693  }
1694 
1695  /*
1696  * When skipping empty transactions in synchronous replication, we send a
1697  * keepalive message to avoid delaying such transactions.
1698  *
1699  * It is okay to check sync_standbys_defined flag without lock here as in
1700  * the worst case we will just send an extra keepalive message when it is
1701  * really not required.
1702  */
1703  if (skipped_xact &&
1704  SyncRepRequested() &&
1705  ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1706  {
1707  WalSndKeepalive(false, lsn);
1708 
1709  /* Try to flush pending output to the client */
1710  if (pq_flush_if_writable() != 0)
1711  WalSndShutdown();
1712 
1713  /* If we have pending write here, make sure it's actually flushed */
1714  if (pq_is_send_pending())
1715  pending_writes = true;
1716  }
1717 
1718  /*
1719  * Process pending writes if any or try to send a keepalive if required.
1720  * We don't need to try sending keep alive messages at the transaction end
1721  * as that will be done at a later point in time. This is required only
1722  * for large transactions where we don't send any changes to the
1723  * downstream and the receiver can timeout due to that.
1724  */
1725  if (pending_writes || (!end_xact &&
1727  wal_sender_timeout / 2)))
1729 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1791
#define SyncRepRequested()
Definition: syncrep.h:18
static void ProcessPendingWrites(void)
Definition: walsender.c:1617
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3983

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3581 of file walsender.c.

3582 {
3583  WaitEvent event;
3584 
3585  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3586 
3587  /*
3588  * We use a condition variable to efficiently wake up walsenders in
3589  * WalSndWakeup().
3590  *
3591  * Every walsender prepares to sleep on a shared memory CV. Note that it
3592  * just prepares to sleep on the CV (i.e., adds itself to the CV's
3593  * waitlist), but does not actually wait on the CV (IOW, it never calls
3594  * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3595  * waiting, because we also need to wait for socket events. The processes
3596  * (startup process, walreceiver etc.) wanting to wake up walsenders use
3597  * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3598  * walsenders come out of WaitEventSetWait().
3599  *
3600  * This approach is simple and efficient because, one doesn't have to loop
3601  * through all the walsenders slots, with a spinlock acquisition and
3602  * release for every iteration, just to wake up only the waiting
3603  * walsenders. It makes WalSndWakeup() callers' life easy.
3604  *
3605  * XXX: A desirable future improvement would be to add support for CVs
3606  * into WaitEventSetWait().
3607  *
3608  * And, we use separate shared memory CVs for physical and logical
3609  * walsenders for selective wake ups, see WalSndWakeup() for more details.
3610  */
3613  else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
3615 
3616  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3617  (event.events & WL_POSTMASTER_DEATH))
3618  {
3620  proc_exit(1);
3621  }
3622 
3624 }
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:1050
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1425
#define WL_POSTMASTER_DEATH
Definition: latch.h:131
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:164
uint32 events
Definition: latch.h:155

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1739 of file walsender.c.

1740 {
1741  int wakeEvents;
1742  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1743 
1744  /*
1745  * Fast path to avoid acquiring the spinlock in case we already know we
1746  * have enough WAL available. This is particularly interesting if we're
1747  * far behind.
1748  */
1749  if (RecentFlushPtr != InvalidXLogRecPtr &&
1750  loc <= RecentFlushPtr)
1751  return RecentFlushPtr;
1752 
1753  /* Get a more recent flush pointer. */
1754  if (!RecoveryInProgress())
1755  RecentFlushPtr = GetFlushRecPtr(NULL);
1756  else
1757  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1758 
1759  for (;;)
1760  {
1761  long sleeptime;
1762 
1763  /* Clear any already-pending wakeups */
1765 
1767 
1768  /* Process any requests or signals received recently */
1769  if (ConfigReloadPending)
1770  {
1771  ConfigReloadPending = false;
1774  }
1775 
1776  /* Check for input from the client */
1778 
1779  /*
1780  * If we're shutting down, trigger pending WAL to be written out,
1781  * otherwise we'd possibly end up waiting for WAL that never gets
1782  * written, because walwriter has shut down already.
1783  */
1784  if (got_STOPPING)
1786 
1787  /* Update our idea of the currently flushed position. */
1788  if (!RecoveryInProgress())
1789  RecentFlushPtr = GetFlushRecPtr(NULL);
1790  else
1791  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1792 
1793  /*
1794  * If postmaster asked us to stop, don't wait anymore.
1795  *
1796  * It's important to do this check after the recomputation of
1797  * RecentFlushPtr, so we can send all remaining data before shutting
1798  * down.
1799  */
1800  if (got_STOPPING)
1801  break;
1802 
1803  /*
1804  * We only send regular messages to the client for full decoded
1805  * transactions, but a synchronous replication and walsender shutdown
1806  * possibly are waiting for a later location. So, before sleeping, we
1807  * send a ping containing the flush location. If the receiver is
1808  * otherwise idle, this keepalive will trigger a reply. Processing the
1809  * reply will update these MyWalSnd locations.
1810  */
1811  if (MyWalSnd->flush < sentPtr &&
1812  MyWalSnd->write < sentPtr &&
1815 
1816  /* check whether we're done */
1817  if (loc <= RecentFlushPtr)
1818  break;
1819 
1820  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1821  WalSndCaughtUp = true;
1822 
1823  /*
1824  * Try to flush any pending output to the client.
1825  */
1826  if (pq_flush_if_writable() != 0)
1827  WalSndShutdown();
1828 
1829  /*
1830  * If we have received CopyDone from the client, sent CopyDone
1831  * ourselves, and the output buffer is empty, it's time to exit
1832  * streaming, so fail the current WAL fetch request.
1833  */
1835  !pq_is_send_pending())
1836  break;
1837 
1838  /* die if timeout was reached */
1840 
1841  /* Send keepalive if the time has come */
1843 
1844  /*
1845  * Sleep until something happens or we time out. Also wait for the
1846  * socket becoming writable, if there's still pending output.
1847  * Otherwise we might sit on sendable output data while waiting for
1848  * new WAL to be generated. (But if we have nothing to send, we don't
1849  * want to wake on socket-writable.)
1850  */
1852 
1853  wakeEvents = WL_SOCKET_READABLE;
1854 
1855  if (pq_is_send_pending())
1856  wakeEvents |= WL_SOCKET_WRITEABLE;
1857 
1858  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL);
1859  }
1860 
1861  /* reactivate latch so WalSndLoop knows to continue */
1862  SetLatch(MyLatch);
1863  return RecentFlushPtr;
1864 }
bool XLogBackgroundFlush(void)
Definition: xlog.c:2923

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3659 of file walsender.c.

3660 {
3661  for (;;)
3662  {
3663  int i;
3664  bool all_stopped = true;
3665 
3666  for (i = 0; i < max_wal_senders; i++)
3667  {
3668  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3669 
3670  SpinLockAcquire(&walsnd->mutex);
3671 
3672  if (walsnd->pid == 0)
3673  {
3674  SpinLockRelease(&walsnd->mutex);
3675  continue;
3676  }
3677 
3678  if (walsnd->state != WALSNDSTATE_STOPPING)
3679  {
3680  all_stopped = false;
3681  SpinLockRelease(&walsnd->mutex);
3682  break;
3683  }
3684  SpinLockRelease(&walsnd->mutex);
3685  }
3686 
3687  /* safe to leave if confirmation is done for all WAL senders */
3688  if (all_stopped)
3689  return;
3690 
3691  pg_usleep(10000L); /* wait for 10 msec */
3692  }
3693 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3560 of file walsender.c.

3561 {
3562  /*
3563  * Wake up all the walsenders waiting on WAL being flushed or replayed
3564  * respectively. Note that waiting walsender would have prepared to sleep
3565  * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3566  * before actually waiting.
3567  */
3568  if (physical)
3570 
3571  if (logical)
3573 }
void ConditionVariableBroadcast(ConditionVariable *cv)

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1575 of file walsender.c.

1577 {
1578  TimestampTz now;
1579 
1580  /*
1581  * Fill the send timestamp last, so that it is taken as late as possible.
1582  * This is somewhat ugly, but the protocol is set as it's already used for
1583  * several releases by streaming physical replication.
1584  */
1587  pq_sendint64(&tmpbuf, now);
1588  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1589  tmpbuf.data, sizeof(int64));
1590 
1591  /* output previously gathered data in a CopyData packet */
1592  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1593 
1595 
1596  /* Try to flush pending output to the client */
1597  if (pq_flush_if_writable() != 0)
1598  WalSndShutdown();
1599 
1600  /* Try taking fast path unless we get too close to walsender timeout. */
1602  wal_sender_timeout / 2) &&
1603  !pq_is_send_pending())
1604  {
1605  return;
1606  }
1607 
1608  /* If we have pending write here, go to slow path */
1610 }

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3271 of file walsender.c.

3272 {
3273  XLogRecord *record;
3274  char *errm;
3275 
3276  /*
3277  * We'll use the current flush point to determine whether we've caught up.
3278  * This variable is static in order to cache it across calls. Caching is
3279  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3280  * spinlock.
3281  */
3282  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3283 
3284  /*
3285  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3286  * true in WalSndWaitForWal, if we're actually waiting. We also set to
3287  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3288  * didn't wait - i.e. when we're shutting down.
3289  */
3290  WalSndCaughtUp = false;
3291 
3292  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3293 
3294  /* xlog record was invalid */
3295  if (errm != NULL)
3296  elog(ERROR, "could not find record while sending logically-decoded data: %s",
3297  errm);
3298 
3299  if (record != NULL)
3300  {
3301  /*
3302  * Note the lack of any call to LagTrackerWrite() which is handled by
3303  * WalSndUpdateProgress which is called by output plugin through
3304  * logical decoding write api.
3305  */
3307 
3309  }
3310 
3311  /*
3312  * If first time through in this session, initialize flushPtr. Otherwise,
3313  * we only need to update flushPtr if EndRecPtr is past it.
3314  */
3315  if (flushPtr == InvalidXLogRecPtr ||
3316  logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3317  {
3319  flushPtr = GetStandbyFlushRecPtr(NULL);
3320  else
3321  flushPtr = GetFlushRecPtr(NULL);
3322  }
3323 
3324  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3325  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3326  WalSndCaughtUp = true;
3327 
3328  /*
3329  * If we're caught up and have been requested to stop, have WalSndLoop()
3330  * terminate the connection in an orderly manner, after writing out all
3331  * the pending data.
3332  */
3334  got_SIGUSR2 = true;
3335 
3336  /* Update shared memory status */
3337  {
3338  WalSnd *walsnd = MyWalSnd;
3339 
3340  SpinLockAcquire(&walsnd->mutex);
3341  walsnd->sentPtr = sentPtr;
3342  SpinLockRelease(&walsnd->mutex);
3343  }
3344 }
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:91
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:391

References am_cascading_walsender, elog(), XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 2961 of file walsender.c.

2962 {
2963  XLogRecPtr SendRqstPtr;
2964  XLogRecPtr startptr;
2965  XLogRecPtr endptr;
2966  Size nbytes;
2967  XLogSegNo segno;
2968  WALReadError errinfo;
2969  Size rbytes;
2970 
2971  /* If requested switch the WAL sender to the stopping state. */
2972  if (got_STOPPING)
2974 
2976  {
2977  WalSndCaughtUp = true;
2978  return;
2979  }
2980 
2981  /* Figure out how far we can safely send the WAL. */
2983  {
2984  /*
2985  * Streaming an old timeline that's in this server's history, but is
2986  * not the one we're currently inserting or replaying. It can be
2987  * streamed up to the point where we switched off that timeline.
2988  */
2989  SendRqstPtr = sendTimeLineValidUpto;
2990  }
2991  else if (am_cascading_walsender)
2992  {
2993  TimeLineID SendRqstTLI;
2994 
2995  /*
2996  * Streaming the latest timeline on a standby.
2997  *
2998  * Attempt to send all WAL that has already been replayed, so that we
2999  * know it's valid. If we're receiving WAL through streaming
3000  * replication, it's also OK to send any WAL that has been received
3001  * but not replayed.
3002  *
3003  * The timeline we're recovering from can change, or we can be
3004  * promoted. In either case, the current timeline becomes historic. We
3005  * need to detect that so that we don't try to stream past the point
3006  * where we switched to another timeline. We check for promotion or
3007  * timeline switch after calculating FlushPtr, to avoid a race
3008  * condition: if the timeline becomes historic just after we checked
3009  * that it was still current, it's still be OK to stream it up to the
3010  * FlushPtr that was calculated before it became historic.
3011  */
3012  bool becameHistoric = false;
3013 
3014  SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
3015 
3016  if (!RecoveryInProgress())
3017  {
3018  /* We have been promoted. */
3019  SendRqstTLI = GetWALInsertionTimeLine();
3020  am_cascading_walsender = false;
3021  becameHistoric = true;
3022  }
3023  else
3024  {
3025  /*
3026  * Still a cascading standby. But is the timeline we're sending
3027  * still the one recovery is recovering from?
3028  */
3029  if (sendTimeLine != SendRqstTLI)
3030  becameHistoric = true;
3031  }
3032 
3033  if (becameHistoric)
3034  {
3035  /*
3036  * The timeline we were sending has become historic. Read the
3037  * timeline history file of the new timeline to see where exactly
3038  * we forked off from the timeline we were sending.
3039  */
3040  List *history;
3041 
3042  history = readTimeLineHistory(SendRqstTLI);
3044 
3046  list_free_deep(history);
3047 
3048  sendTimeLineIsHistoric = true;
3049 
3050  SendRqstPtr = sendTimeLineValidUpto;
3051  }
3052  }
3053  else
3054  {
3055  /*
3056  * Streaming the current timeline on a primary.
3057  *
3058  * Attempt to send all data that's already been written out and
3059  * fsync'd to disk. We cannot go further than what's been written out
3060  * given the current implementation of WALRead(). And in any case
3061  * it's unsafe to send WAL that is not securely down to disk on the
3062  * primary: if the primary subsequently crashes and restarts, standbys
3063  * must not have applied any WAL that got lost on the primary.
3064  */
3065  SendRqstPtr = GetFlushRecPtr(NULL);
3066  }
3067 
3068  /*
3069  * Record the current system time as an approximation of the time at which
3070  * this WAL location was written for the purposes of lag tracking.
3071  *
3072  * In theory we could make XLogFlush() record a time in shmem whenever WAL
3073  * is flushed and we could get that time as well as the LSN when we call
3074  * GetFlushRecPtr() above (and likewise for the cascading standby
3075  * equivalent), but rather than putting any new code into the hot WAL path
3076  * it seems good enough to capture the time here. We should reach this
3077  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3078  * may take some time, we read the WAL flush pointer and take the time
3079  * very close to together here so that we'll get a later position if it is
3080  * still moving.
3081  *
3082  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3083  * this gives us a cheap approximation for the WAL flush time for this
3084  * LSN.
3085  *
3086  * Note that the LSN is not necessarily the LSN for the data contained in
3087  * the present message; it's the end of the WAL, which might be further
3088  * ahead. All the lag tracking machinery cares about is finding out when
3089  * that arbitrary LSN is eventually reported as written, flushed and
3090  * applied, so that it can measure the elapsed time.
3091  */
3092  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
3093 
3094  /*
3095  * If this is a historic timeline and we've reached the point where we
3096  * forked to the next timeline, stop streaming.
3097  *
3098  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3099  * startup process will normally replay all WAL that has been received
3100  * from the primary, before promoting, but if the WAL streaming is
3101  * terminated at a WAL page boundary, the valid portion of the timeline
3102  * might end in the middle of a WAL record. We might've already sent the
3103  * first half of that partial WAL record to the cascading standby, so that
3104  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3105  * replay the partial WAL record either, so it can still follow our
3106  * timeline switch.
3107  */
3109  {
3110  /* close the current file. */
3111  if (xlogreader->seg.ws_file >= 0)
3113 
3114  /* Send CopyDone */
3115  pq_putmessage_noblock('c', NULL, 0);
3116  streamingDoneSending = true;
3117 
3118  WalSndCaughtUp = true;
3119 
3120  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
3123  return;
3124  }
3125 
3126  /* Do we have any work to do? */
3127  Assert(sentPtr <= SendRqstPtr);
3128  if (SendRqstPtr <= sentPtr)
3129  {
3130  WalSndCaughtUp = true;
3131  return;
3132  }
3133 
3134  /*
3135  * Figure out how much to send in one message. If there's no more than
3136  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3137  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3138  *
3139  * The rounding is not only for performance reasons. Walreceiver relies on
3140  * the fact that we never split a WAL record across two messages. Since a
3141  * long WAL record is split at page boundary into continuation records,
3142  * page boundary is always a safe cut-off point. We also assume that
3143  * SendRqstPtr never points to the middle of a WAL record.
3144  */
3145  startptr = sentPtr;
3146  endptr = startptr;
3147  endptr += MAX_SEND_SIZE;
3148 
3149  /* if we went beyond SendRqstPtr, back off */
3150  if (SendRqstPtr <= endptr)
3151  {
3152  endptr = SendRqstPtr;
3154  WalSndCaughtUp = false;
3155  else
3156  WalSndCaughtUp = true;
3157  }
3158  else
3159  {
3160  /* round down to page boundary. */
3161  endptr -= (endptr % XLOG_BLCKSZ);
3162  WalSndCaughtUp = false;
3163  }
3164 
3165  nbytes = endptr - startptr;
3166  Assert(nbytes <= MAX_SEND_SIZE);
3167 
3168  /*
3169  * OK to read and send the slice.
3170  */
3172  pq_sendbyte(&output_message, 'w');
3173 
3174  pq_sendint64(&output_message, startptr); /* dataStart */
3175  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3176  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3177 
3178  /*
3179  * Read the log directly into the output buffer to avoid extra memcpy
3180  * calls.
3181  */
3183 
3184 retry:
3185  /* attempt to read WAL from WAL buffers first */
3187  startptr, nbytes, xlogreader->seg.ws_tli);
3188  output_message.len += rbytes;
3189  startptr += rbytes;
3190  nbytes -= rbytes;
3191 
3192  /* now read the remaining WAL from WAL file */
3193  if (nbytes > 0 &&
3196  startptr,
3197  nbytes,
3198  xlogreader->seg.ws_tli, /* Pass the current TLI because
3199  * only WalSndSegmentOpen controls
3200  * whether new TLI is needed. */
3201  &errinfo))
3202  WALReadRaiseError(&errinfo);
3203 
3204  /* See logical_read_xlog_page(). */
3205  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3207 
3208  /*
3209  * During recovery, the currently-open WAL file might be replaced with the
3210  * file of the same name retrieved from archive. So we always need to
3211  * check what we read was valid after reading into the buffer. If it's
3212  * invalid, we try to open and read the file again.
3213  */
3215  {
3216  WalSnd *walsnd = MyWalSnd;
3217  bool reload;
3218 
3219  SpinLockAcquire(&walsnd->mutex);
3220  reload = walsnd->needreload;
3221  walsnd->needreload = false;
3222  SpinLockRelease(&walsnd->mutex);
3223 
3224  if (reload && xlogreader->seg.ws_file >= 0)
3225  {
3227 
3228  goto retry;
3229  }
3230  }
3231 
3232  output_message.len += nbytes;
3234 
3235  /*
3236  * Fill the send timestamp last, so that it is taken as late as possible.
3237  */
3240  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3241  tmpbuf.data, sizeof(int64));
3242 
3244 
3245  sentPtr = endptr;
3246 
3247  /* Update shared memory status */
3248  {
3249  WalSnd *walsnd = MyWalSnd;
3250 
3251  SpinLockAcquire(&walsnd->mutex);
3252  walsnd->sentPtr = sentPtr;
3253  SpinLockRelease(&walsnd->mutex);
3254  }
3255 
3256  /* Report progress of XLOG streaming in PS display */
3258  {
3259  char activitymsg[50];
3260 
3261  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3263  set_ps_display(activitymsg);
3264  }
3265 }
bool update_process_title
Definition: ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:289
TimeLineID ws_tli
Definition: xlogreader.h:49
WALSegmentContext segcxt
Definition: xlogreader.h:271
#define MAX_SEND_SIZE
Definition: walsender.c:109
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition: xlog.c:1723

References am_cascading_walsender, Assert(), CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog(), enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, WalSnd::needreload, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 121 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 233 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

TimestampTz last_reply_timestamp = 0
static

Definition at line 182 of file walsender.c.

Referenced by CreateReplicationSlot(), ProcessRepliesIfAny()