PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define PG_STAT_GET_WAL_SENDERS_COLS   11
 

Typedefs

typedef void(* WalSndSendDataCallback )(void)
 

Functions

static void WalSndSigHupHandler (SIGNAL_ARGS)
 
static void WalSndXLogSendHandler (SIGNAL_ARGS)
 
static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (TimestampTz now)
 
static void XLogRead (char *buf, XLogRecPtr startptr, Size count)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (TimestampTz now)
 
void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static int sendFile = -1
 
static XLogSegNo sendSegNo = 0
 
static uint32 sendOff = 0
 
static TimeLineID curFileTimeLine = 0
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t walsender_ready_to_stop = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
struct {
   XLogRecPtr   last_lsn
 
   WalTimeSample   buffer [LAG_TRACKER_BUFFER_SIZE]
 
   int   write_head
 
   int   read_heads [NUM_SYNC_REP_WAIT_MODE]
 
   WalTimeSample   last_read [NUM_SYNC_REP_WAIT_MODE]
 
LagTracker
 

Macro Definition Documentation

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 201 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 99 of file walsender.c.

Referenced by XLogSendPhysical().

#define PG_STAT_GET_WAL_SENDERS_COLS   11

Referenced by pg_stat_get_wal_senders().

Typedef Documentation

typedef void(* WalSndSendDataCallback)(void)

Definition at line 219 of file walsender.c.

Function Documentation

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 831 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, NULL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitalSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf(), CreateReplicationSlotCmd::temporary, TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), and main().

832 {
833  const char *snapshot_name = NULL;
834  char xpos[MAXFNAMELEN];
835  char *slot_name;
836  bool reserve_wal = false;
837  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
838  DestReceiver *dest;
839  TupOutputState *tstate;
840  TupleDesc tupdesc;
841  Datum values[4];
842  bool nulls[4];
843 
845 
846  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
847 
848  /* setup state for XLogReadPage */
849  sendTimeLineIsHistoric = false;
851 
852  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
853  {
854  ReplicationSlotCreate(cmd->slotname, false,
856  }
857  else
858  {
860 
861  /*
862  * Initially create persistent slot as ephemeral - that allows us to
863  * nicely handle errors during initialization because it'll get
864  * dropped if this transaction fails. We'll make it persistent at the
865  * end. Temporary slots can be created as temporary from beginning as
866  * they get dropped on error as well.
867  */
868  ReplicationSlotCreate(cmd->slotname, true,
870  }
871 
872  if (cmd->kind == REPLICATION_KIND_LOGICAL)
873  {
875 
876  /*
877  * Do options check early so that we can bail before calling the
878  * DecodingContextFindStartpoint which can take long time.
879  */
880  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
881  {
882  if (IsTransactionBlock())
883  ereport(ERROR,
884  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
885  "must not be called inside a transaction")));
886  }
887  else if (snapshot_action == CRS_USE_SNAPSHOT)
888  {
889  if (!IsTransactionBlock())
890  ereport(ERROR,
891  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
892  "must be called inside a transaction")));
893 
895  ereport(ERROR,
896  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
897  "must be called in REPEATABLE READ isolation mode transaction")));
898 
899  if (FirstSnapshotSet)
900  ereport(ERROR,
901  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
902  "must be called before any query")));
903 
904  if (IsSubTransaction())
905  ereport(ERROR,
906  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
907  "must not be called in a subtransaction")));
908  }
909 
913 
914  /*
915  * Signal that we don't need the timeout mechanism. We're just
916  * creating the replication slot and don't yet accept feedback
917  * messages or send keepalives. As we possibly need to wait for
918  * further WAL the walsender would otherwise possibly be killed too
919  * soon.
920  */
922 
923  /* build initial snapshot, might take a while */
925 
926  /*
927  * Export or use the snapshot if we've been asked to do so.
928  *
929  * NB. We will convert the snapbuild.c kind of snapshot to normal
930  * snapshot when doing this.
931  */
932  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
933  {
934  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
935  }
936  else if (snapshot_action == CRS_USE_SNAPSHOT)
937  {
938  Snapshot snap;
939 
942  }
943 
944  /* don't need the decoding context anymore */
945  FreeDecodingContext(ctx);
946 
947  if (!cmd->temporary)
949  }
950  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
951  {
953 
955 
956  /* Write this slot to disk if it's permanent one. */
957  if (!cmd->temporary)
959  }
960 
961  snprintf(xpos, sizeof(xpos), "%X/%X",
964 
966  MemSet(nulls, false, sizeof(nulls));
967 
968  /*----------
969  * Need a tuple descriptor representing four columns:
970  * - first field: the slot name
971  * - second field: LSN at which we became consistent
972  * - third field: exported snapshot's name
973  * - fourth field: output plugin
974  *----------
975  */
976  tupdesc = CreateTemplateTupleDesc(4, false);
977  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
978  TEXTOID, -1, 0);
979  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
980  TEXTOID, -1, 0);
981  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
982  TEXTOID, -1, 0);
983  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
984  TEXTOID, -1, 0);
985 
986  /* prepare for projection of tuples */
987  tstate = begin_tup_output_tupdesc(dest, tupdesc);
988 
989  /* slot_name */
990  slot_name = NameStr(MyReplicationSlot->data.name);
991  values[0] = CStringGetTextDatum(slot_name);
992 
993  /* consistent wal location */
994  values[1] = CStringGetTextDatum(xpos);
995 
996  /* snapshot name, or NULL if none */
997  if (snapshot_name != NULL)
998  values[2] = CStringGetTextDatum(snapshot_name);
999  else
1000  nulls[2] = true;
1001 
1002  /* plugin, or NULL if none */
1003  if (cmd->plugin != NULL)
1004  values[3] = CStringGetTextDatum(cmd->plugin);
1005  else
1006  nulls[3] = true;
1007 
1008  /* send it to dest */
1009  do_tup_output(tstate, values, nulls);
1010  end_tup_output(tstate);
1011 
1013 }
#define NIL
Definition: pg_list.h:69
Snapshot SnapBuildInitalSnapshot(SnapBuild *builder)
Definition: snapbuild.c:509
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:778
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:30
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2139
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:220
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ReplicationSlotSave(void)
Definition: slot.c:563
ReplicationSlotPersistentData data
Definition: slot.h:115
XLogRecPtr confirmed_flush
Definition: slot.h:76
ReplicationKind kind
Definition: replnodes.h:56
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:743
bool IsTransactionBlock(void)
Definition: xact.c:4304
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:587
void ReplicationSlotReserveWal(void)
Definition: slot.c:825
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:414
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:563
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:203
void ReplicationSlotPersist(void)
Definition: slot.c:598
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1144
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:375
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1117
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:372
TimeLineID ThisTimeLineID
Definition: xlog.c:179
struct SnapBuild * snapshot_builder
Definition: logical.h:40
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
static TimeLineID sendTimeLine
Definition: walsender.c:141
#define Assert(condition)
Definition: c.h:675
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:457
int XactIsoLevel
Definition: xact.c:74
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
bool IsSubTransaction(void)
Definition: xact.c:4376
static Datum values[MAXATTR]
Definition: bootstrap.c:162
int errmsg(const char *fmt,...)
Definition: elog.c:797
CRSSnapshotAction
Definition: walsender.h:22
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:211
#define NameStr(name)
Definition: c.h:499
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
void ReplicationSlotMarkDirty(void)
Definition: slot.c:581
static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1019 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), and DropReplicationSlotCmd::slotname.

Referenced by exec_replication_command(), and main().

1020 {
1022  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1023 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void ReplicationSlotDrop(const char *name)
Definition: slot.c:440
bool exec_replication_command ( const char *  cmd_string)

Definition at line 1363 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, PreventTransactionChain(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, and Node::type.

Referenced by PostgresMain().

1364 {
1365  int parse_rc;
1366  Node *cmd_node;
1367  MemoryContext cmd_context;
1368  MemoryContext old_context;
1369 
1370  /*
1371  * Log replication command if log_replication_commands is enabled. Even
1372  * when it's disabled, log the command with DEBUG1 level for backward
1373  * compatibility.
1374  */
1376  (errmsg("received replication command: %s", cmd_string)));
1377 
1378  /*
1379  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1380  * command arrives. Clean up the old stuff if there's anything.
1381  */
1383 
1385 
1387  "Replication command context",
1389  old_context = MemoryContextSwitchTo(cmd_context);
1390 
1391  replication_scanner_init(cmd_string);
1392  parse_rc = replication_yyparse();
1393  if (parse_rc != 0)
1394  ereport(ERROR,
1395  (errcode(ERRCODE_SYNTAX_ERROR),
1396  (errmsg_internal("replication command parser returned %d",
1397  parse_rc))));
1398 
1399  cmd_node = replication_parse_result;
1400 
1401  /*
1402  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1403  * called outside of transaction the snapshot should be cleared here.
1404  */
1405  if (!IsTransactionBlock())
1407 
1408  /*
1409  * For aborted transactions, don't allow anything except pure SQL,
1410  * the exec_simple_query() will handle it correctly.
1411  */
1412  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1413  ereport(ERROR,
1414  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1415  errmsg("current transaction is aborted, "
1416  "commands ignored until end of transaction block")));
1417 
1419 
1420  /*
1421  * Allocate buffers that will be used for each outgoing and incoming
1422  * message. We do this just once per command to reduce palloc overhead.
1423  */
1427 
1428  switch (cmd_node->type)
1429  {
1430  case T_IdentifySystemCmd:
1431  IdentifySystem();
1432  break;
1433 
1434  case T_BaseBackupCmd:
1435  PreventTransactionChain(true, "BASE_BACKUP");
1436  SendBaseBackup((BaseBackupCmd *) cmd_node);
1437  break;
1438 
1441  break;
1442 
1445  break;
1446 
1447  case T_StartReplicationCmd:
1448  {
1449  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1450 
1451  PreventTransactionChain(true, "START_REPLICATION");
1452 
1453  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1454  StartReplication(cmd);
1455  else
1457  break;
1458  }
1459 
1460  case T_TimeLineHistoryCmd:
1461  PreventTransactionChain(true, "TIMELINE_HISTORY");
1463  break;
1464 
1465  case T_VariableShowStmt:
1466  {
1468  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1469 
1470  GetPGVariable(n->name, dest);
1471  }
1472  break;
1473 
1474  case T_SQLCmd:
1475  if (MyDatabaseId == InvalidOid)
1476  ereport(ERROR,
1477  (errmsg("not connected to database")));
1478 
1479  /* Tell the caller that this wasn't a WalSender command. */
1480  return false;
1481 
1482  default:
1483  elog(ERROR, "unrecognized replication command node tag: %u",
1484  cmd_node->type);
1485  }
1486 
1487  /* done */
1488  MemoryContextSwitchTo(old_context);
1489  MemoryContextDelete(cmd_context);
1490 
1491  /* Send CommandComplete message */
1492  EndCommand("SELECT", DestRemote);
1493 
1494  return true;
1495 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:570
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:418
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1019
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:370
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:519
static StringInfoData output_message
Definition: walsender.c:153
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7870
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4304
ReplicationKind kind
Definition: replnodes.h:81
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:686
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
NodeTag type
Definition: nodes.h:521
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:514
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
static StringInfoData reply_message
Definition: walsender.c:154
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1030
Oid MyDatabaseId
Definition: globals.c:76
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:831
static StringInfoData tmpbuf
Definition: walsender.c:155
bool log_replication_commands
Definition: walsender.c:117
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
static void IdentifySystem(void)
Definition: walsender.c:329
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:648
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3154
void replication_scanner_init(const char *query_string)
static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2716 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), NULL, receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2717 {
2718  XLogRecPtr replayPtr;
2719  TimeLineID replayTLI;
2720  XLogRecPtr receivePtr;
2722  XLogRecPtr result;
2723 
2724  /*
2725  * We can safely send what's already been replayed. Also, if walreceiver
2726  * is streaming WAL from the same timeline, we can send anything that it
2727  * has streamed, but hasn't been replayed yet.
2728  */
2729 
2730  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2731  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2732 
2733  ThisTimeLineID = replayTLI;
2734 
2735  result = replayPtr;
2736  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2737  result = receivePtr;
2738 
2739  return result;
2740 }
uint32 TimeLineID
Definition: xlogdefs.h:45
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11037
static TimeLineID receiveTLI
Definition: xlog.c:201
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void IdentifySystem ( void  )
static

Definition at line 329 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, INT4OID, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, NULL, RecoveryInProgress(), snprintf(), StartTransactionCommand(), TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

330 {
331  char sysid[32];
332  char xpos[MAXFNAMELEN];
333  XLogRecPtr logptr;
334  char *dbname = NULL;
335  DestReceiver *dest;
336  TupOutputState *tstate;
337  TupleDesc tupdesc;
338  Datum values[4];
339  bool nulls[4];
340 
341  /*
342  * Reply with a result set with one row, four columns. First col is system
343  * ID, second is timeline ID, third is current xlog location and the
344  * fourth contains the database name if we are connected to one.
345  */
346 
347  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
349 
352  {
353  /* this also updates ThisTimeLineID */
354  logptr = GetStandbyFlushRecPtr();
355  }
356  else
357  logptr = GetFlushRecPtr();
358 
359  snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
360 
361  if (MyDatabaseId != InvalidOid)
362  {
364 
365  /* syscache access needs a transaction env. */
367  /* make dbname live outside TX context */
371  /* CommitTransactionCommand switches to TopMemoryContext */
373  }
374 
376  MemSet(nulls, false, sizeof(nulls));
377 
378  /* need a tuple descriptor representing four columns */
379  tupdesc = CreateTemplateTupleDesc(4, false);
380  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
381  TEXTOID, -1, 0);
382  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
383  INT4OID, -1, 0);
384  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
385  TEXTOID, -1, 0);
386  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
387  TEXTOID, -1, 0);
388 
389  /* prepare for projection of tuples */
390  tstate = begin_tup_output_tupdesc(dest, tupdesc);
391 
392  /* column 1: system identifier */
393  values[0] = CStringGetTextDatum(sysid);
394 
395  /* column 2: timeline */
396  values[1] = Int32GetDatum(ThisTimeLineID);
397 
398  /* column 3: xlog position */
399  values[2] = CStringGetTextDatum(xpos);
400 
401  /* column 4: database name, or NULL if none */
402  if (dbname)
403  values[3] = CStringGetTextDatum(dbname);
404  else
405  nulls[3] = true;
406 
407  /* send it to dest */
408  do_tup_output(tstate, values, nulls);
409 
410  end_tup_output(tstate);
411 }
#define TEXTOID
Definition: pg_type.h:324
void CommitTransactionCommand(void)
Definition: xact.c:2747
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define INT4OID
Definition: pg_type.h:316
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8204
bool RecoveryInProgress(void)
Definition: xlog.c:7855
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:563
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2048
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
unsigned int uint32
Definition: c.h:268
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:76
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2677
char * dbname
Definition: streamutil.c:38
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static Datum values[MAXATTR]
Definition: bootstrap.c:162
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4684
#define Int32GetDatum(X)
Definition: postgres.h:485
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2716
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:316
bool am_cascading_walsender
Definition: walsender.c:109
static void InitWalSenderSlot ( void  )
static

Definition at line 2091 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, ereport, errcode(), errmsg(), FATAL, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, NULL, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

Referenced by WalSndShutdown().

2092 {
2093  int i;
2094 
2095  /*
2096  * WalSndCtl should be set up already (we inherit this by fork() or
2097  * EXEC_BACKEND mechanism from the postmaster).
2098  */
2099  Assert(WalSndCtl != NULL);
2100  Assert(MyWalSnd == NULL);
2101 
2102  /*
2103  * Find a free walsender slot and reserve it. If this fails, we must be
2104  * out of WalSnd structures.
2105  */
2106  for (i = 0; i < max_wal_senders; i++)
2107  {
2108  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2109 
2110  SpinLockAcquire(&walsnd->mutex);
2111 
2112  if (walsnd->pid != 0)
2113  {
2114  SpinLockRelease(&walsnd->mutex);
2115  continue;
2116  }
2117  else
2118  {
2119  /*
2120  * Found a free slot. Reserve it for us.
2121  */
2122  walsnd->pid = MyProcPid;
2123  walsnd->sentPtr = InvalidXLogRecPtr;
2124  walsnd->write = InvalidXLogRecPtr;
2125  walsnd->flush = InvalidXLogRecPtr;
2126  walsnd->apply = InvalidXLogRecPtr;
2127  walsnd->writeLag = -1;
2128  walsnd->flushLag = -1;
2129  walsnd->applyLag = -1;
2130  walsnd->state = WALSNDSTATE_STARTUP;
2131  walsnd->latch = &MyProc->procLatch;
2132  SpinLockRelease(&walsnd->mutex);
2133  /* don't need the lock anymore */
2134  MyWalSnd = (WalSnd *) walsnd;
2135 
2136  break;
2137  }
2138  }
2139  if (MyWalSnd == NULL)
2140  ereport(FATAL,
2141  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2142  errmsg("number of requested standby connections "
2143  "exceeds max_wal_senders (currently %d)",
2144  max_wal_senders)));
2145 
2146  /* Arrange to clean up at walsender exit */
2148 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:38
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
int errcode(int sqlerrcode)
Definition: elog.c:575
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2152
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:98
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define FATAL
Definition: elog.h:52
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:114
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:105
TimeOffset applyLag
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
XLogRecPtr apply
static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3241 of file walsender.c.

References Assert, LAG_TRACKER_BUFFER_SIZE, LagTracker, WalTimeSample::lsn, next, and WalTimeSample::time.

Referenced by ProcessStandbyReplyMessage().

3242 {
3243  TimestampTz time = 0;
3244 
3245  /* Read all unread samples up to this LSN or end of buffer. */
3246  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3247  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3248  {
3249  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3250  LagTracker.last_read[head] =
3251  LagTracker.buffer[LagTracker.read_heads[head]];
3252  LagTracker.read_heads[head] =
3253  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3254  }
3255 
3256  if (time > now)
3257  {
3258  /* If the clock somehow went backwards, treat as not found. */
3259  return -1;
3260  }
3261  else if (time == 0)
3262  {
3263  /*
3264  * We didn't cross a time. If there is a future sample that we
3265  * haven't reached yet, and we've already reached at least one sample,
3266  * let's interpolate the local flushed time. This is mainly useful for
3267  * reporting a completely stuck apply position as having increasing
3268  * lag, since otherwise we'd have to wait for it to eventually start
3269  * moving again and cross one of our samples before we can show the
3270  * lag increasing.
3271  */
3272  if (LagTracker.read_heads[head] != LagTracker.write_head &&
3273  LagTracker.last_read[head].time != 0)
3274  {
3275  double fraction;
3276  WalTimeSample prev = LagTracker.last_read[head];
3277  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3278 
3279  Assert(lsn >= prev.lsn);
3280  Assert(prev.lsn < next.lsn);
3281 
3282  if (prev.time > next.time)
3283  {
3284  /* If the clock somehow went backwards, treat as not found. */
3285  return -1;
3286  }
3287 
3288  /* See how far we are between the previous and next samples. */
3289  fraction =
3290  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3291 
3292  /* Scale the local flush time proportionally. */
3293  time = (TimestampTz)
3294  ((double) prev.time + (next.time - prev.time) * fraction);
3295  }
3296  else
3297  {
3298  /* Couldn't interpolate due to lack of data. */
3299  return -1;
3300  }
3301  }
3302 
3303  /* Return the elapsed time since local flush time in microseconds. */
3304  Assert(time != 0);
3305  return now - time;
3306 }
static int32 next
Definition: blutils.c:210
int64 TimestampTz
Definition: timestamp.h:39
TimestampTz time
Definition: walsender.c:197
static struct @25 LagTracker
XLogRecPtr lsn
Definition: walsender.c:196
#define Assert(condition)
Definition: c.h:675
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:201
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)

Definition at line 3176 of file walsender.c.

References am_walsender, i, LAG_TRACKER_BUFFER_SIZE, LagTracker, and NUM_SYNC_REP_WAIT_MODE.

Referenced by XLogSendPhysical().

3177 {
3178  bool buffer_full;
3179  int new_write_head;
3180  int i;
3181 
3182  if (!am_walsender)
3183  return;
3184 
3185  /*
3186  * If the lsn hasn't advanced since last time, then do nothing. This way
3187  * we only record a new sample when new WAL has been written.
3188  */
3189  if (LagTracker.last_lsn == lsn)
3190  return;
3191  LagTracker.last_lsn = lsn;
3192 
3193  /*
3194  * If advancing the write head of the circular buffer would crash into any
3195  * of the read heads, then the buffer is full. In other words, the
3196  * slowest reader (presumably apply) is the one that controls the release
3197  * of space.
3198  */
3199  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3200  buffer_full = false;
3201  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3202  {
3203  if (new_write_head == LagTracker.read_heads[i])
3204  buffer_full = true;
3205  }
3206 
3207  /*
3208  * If the buffer is full, for now we just rewind by one slot and overwrite
3209  * the last sample, as a simple (if somewhat uneven) way to lower the
3210  * sampling rate. There may be better adaptive compaction algorithms.
3211  */
3212  if (buffer_full)
3213  {
3214  new_write_head = LagTracker.write_head;
3215  if (LagTracker.write_head > 0)
3216  LagTracker.write_head--;
3217  else
3218  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3219  }
3220 
3221  /* Store a sample at the current write head position. */
3222  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3223  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3224  LagTracker.write_head = new_write_head;
3225 }
bool am_walsender
Definition: walsender.c:108
static struct @25 LagTracker
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:201
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page,
TimeLineID pageTLI 
)
static

Definition at line 743 of file walsender.c.

References XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WalSndWaitForWal(), XLogRead(), and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

745 {
746  XLogRecPtr flushptr;
747  int count;
748 
749  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
751  sendTimeLine = state->currTLI;
753  sendTimeLineNextTLI = state->nextTLI;
754 
755  /* make sure we have enough WAL available */
756  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
757 
758  /* more than one block available */
759  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
760  count = XLOG_BLCKSZ;
761  /* not enough WAL synced, that can happen during shutdown */
762  else if (targetPagePtr + reqLen > flushptr)
763  return -1;
764  /* part of the page available */
765  else
766  count = flushptr - targetPagePtr;
767 
768  /* now actually read the data, we know it's there */
769  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
770 
771  return count;
772 }
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:799
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:174
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2180
TimeLineID nextTLI
Definition: xlogreader.h:179
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1232
TimeLineID ThisTimeLineID
Definition: xlog.c:179
TimeLineID currTLI
Definition: xlogreader.h:165
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 2938 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

2939 {
2940  Interval *result = palloc(sizeof(Interval));
2941 
2942  result->month = 0;
2943  result->day = 0;
2944  result->time = offset;
2945 
2946  return result;
2947 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:849
static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 778 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

781 {
782  ListCell *lc;
783  bool snapshot_action_given = false;
784  bool reserve_wal_given = false;
785 
786  /* Parse options */
787  foreach (lc, cmd->options)
788  {
789  DefElem *defel = (DefElem *) lfirst(lc);
790 
791  if (strcmp(defel->defname, "export_snapshot") == 0)
792  {
793  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
794  ereport(ERROR,
795  (errcode(ERRCODE_SYNTAX_ERROR),
796  errmsg("conflicting or redundant options")));
797 
798  snapshot_action_given = true;
799  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
801  }
802  else if (strcmp(defel->defname, "use_snapshot") == 0)
803  {
804  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
805  ereport(ERROR,
806  (errcode(ERRCODE_SYNTAX_ERROR),
807  errmsg("conflicting or redundant options")));
808 
809  snapshot_action_given = true;
810  *snapshot_action = CRS_USE_SNAPSHOT;
811  }
812  else if (strcmp(defel->defname, "reserve_wal") == 0)
813  {
814  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
815  ereport(ERROR,
816  (errcode(ERRCODE_SYNTAX_ERROR),
817  errmsg("conflicting or redundant options")));
818 
819  reserve_wal_given = true;
820  *reserve_wal = true;
821  }
822  else
823  elog(ERROR, "unrecognized option: %s", defel->defname);
824  }
825 }
int errcode(int sqlerrcode)
Definition: elog.c:575
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:708
#define elog
Definition: elog.h:219
Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 2954 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), i, Int32GetDatum, IntervalPGetDatum, IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, NULL, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, superuser(), SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

2955 {
2956 #define PG_STAT_GET_WAL_SENDERS_COLS 11
2957  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2958  TupleDesc tupdesc;
2959  Tuplestorestate *tupstore;
2960  MemoryContext per_query_ctx;
2961  MemoryContext oldcontext;
2962  List *sync_standbys;
2963  int i;
2964 
2965  /* check to see if caller supports us returning a tuplestore */
2966  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
2967  ereport(ERROR,
2968  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2969  errmsg("set-valued function called in context that cannot accept a set")));
2970  if (!(rsinfo->allowedModes & SFRM_Materialize))
2971  ereport(ERROR,
2972  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2973  errmsg("materialize mode required, but it is not " \
2974  "allowed in this context")));
2975 
2976  /* Build a tuple descriptor for our result type */
2977  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
2978  elog(ERROR, "return type must be a row type");
2979 
2980  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
2981  oldcontext = MemoryContextSwitchTo(per_query_ctx);
2982 
2983  tupstore = tuplestore_begin_heap(true, false, work_mem);
2984  rsinfo->returnMode = SFRM_Materialize;
2985  rsinfo->setResult = tupstore;
2986  rsinfo->setDesc = tupdesc;
2987 
2988  MemoryContextSwitchTo(oldcontext);
2989 
2990  /*
2991  * Get the currently active synchronous standbys.
2992  */
2993  LWLockAcquire(SyncRepLock, LW_SHARED);
2994  sync_standbys = SyncRepGetSyncStandbys(NULL);
2995  LWLockRelease(SyncRepLock);
2996 
2997  for (i = 0; i < max_wal_senders; i++)
2998  {
2999  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3001  XLogRecPtr write;
3002  XLogRecPtr flush;
3003  XLogRecPtr apply;
3004  TimeOffset writeLag;
3005  TimeOffset flushLag;
3006  TimeOffset applyLag;
3007  int priority;
3010  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3011 
3012  if (walsnd->pid == 0)
3013  continue;
3014 
3015  SpinLockAcquire(&walsnd->mutex);
3016  sentPtr = walsnd->sentPtr;
3017  state = walsnd->state;
3018  write = walsnd->write;
3019  flush = walsnd->flush;
3020  apply = walsnd->apply;
3021  writeLag = walsnd->writeLag;
3022  flushLag = walsnd->flushLag;
3023  applyLag = walsnd->applyLag;
3024  priority = walsnd->sync_standby_priority;
3025  SpinLockRelease(&walsnd->mutex);
3026 
3027  memset(nulls, 0, sizeof(nulls));
3028  values[0] = Int32GetDatum(walsnd->pid);
3029 
3030  if (!superuser())
3031  {
3032  /*
3033  * Only superusers can see details. Other users only get the pid
3034  * value to know it's a walsender, but no details.
3035  */
3036  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3037  }
3038  else
3039  {
3040  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3041 
3042  if (XLogRecPtrIsInvalid(sentPtr))
3043  nulls[2] = true;
3044  values[2] = LSNGetDatum(sentPtr);
3045 
3046  if (XLogRecPtrIsInvalid(write))
3047  nulls[3] = true;
3048  values[3] = LSNGetDatum(write);
3049 
3050  if (XLogRecPtrIsInvalid(flush))
3051  nulls[4] = true;
3052  values[4] = LSNGetDatum(flush);
3053 
3054  if (XLogRecPtrIsInvalid(apply))
3055  nulls[5] = true;
3056  values[5] = LSNGetDatum(apply);
3057 
3058  /*
3059  * Treat a standby such as a pg_basebackup background process
3060  * which always returns an invalid flush location, as an
3061  * asynchronous standby.
3062  */
3063  priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
3064 
3065  if (writeLag < 0)
3066  nulls[6] = true;
3067  else
3068  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3069 
3070  if (flushLag < 0)
3071  nulls[7] = true;
3072  else
3073  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3074 
3075  if (applyLag < 0)
3076  nulls[8] = true;
3077  else
3078  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3079 
3080  values[9] = Int32GetDatum(priority);
3081 
3082  /*
3083  * More easily understood version of standby state. This is purely
3084  * informational.
3085  *
3086  * In quorum-based sync replication, the role of each standby
3087  * listed in synchronous_standby_names can be changing very
3088  * frequently. Any standbys considered as "sync" at one moment can
3089  * be switched to "potential" ones at the next moment. So, it's
3090  * basically useless to report "sync" or "potential" as their sync
3091  * states. We report just "quorum" for them.
3092  */
3093  if (priority == 0)
3094  values[10] = CStringGetTextDatum("async");
3095  else if (list_member_int(sync_standbys, i))
3097  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3098  else
3099  values[10] = CStringGetTextDatum("potential");
3100  }
3101 
3102  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3103  }
3104 
3105  /* clean up and return the tuplestore */
3106  tuplestore_donestoring(tupstore);
3107 
3108  return (Datum) 0;
3109 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:735
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:570
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
#define write(a, b, c)
Definition: win32.h:19
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:857
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:672
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:93
slock_t mutex
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
bool list_member_int(const List *list, int datum)
Definition: list.c:485
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:114
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:316
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:372
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:2921
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:150
int allowedModes
Definition: execnodes.h:201
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:203
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:135
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:206
static Datum values[MAXATTR]
Definition: bootstrap.c:162
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:199
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:207
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:2938
XLogRecPtr apply
Definition: pg_list.h:45
static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1635 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1636 {
1637  bool changed = false;
1639 
1640  Assert(lsn != InvalidXLogRecPtr);
1641  SpinLockAcquire(&slot->mutex);
1642  if (slot->data.restart_lsn != lsn)
1643  {
1644  changed = true;
1645  slot->data.restart_lsn = lsn;
1646  }
1647  SpinLockRelease(&slot->mutex);
1648 
1649  if (changed)
1650  {
1653  }
1654 
1655  /*
1656  * One could argue that the slot should be saved to disk now, but that'd
1657  * be energy wasted - the worst lost information can do here is give us
1658  * wrong information in a statistics view - we'll just potentially be more
1659  * conservative in removing files.
1660  */
1661 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:665
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:675
XLogRecPtr restart_lsn
Definition: slot.h:68
slock_t mutex
Definition: slot.h:88
void ReplicationSlotMarkDirty(void)
Definition: slot.c:581
static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin)
static

Definition at line 1759 of file walsender.c.

References ReplicationSlot::data, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1760 {
1761  bool changed = false;
1763 
1764  SpinLockAcquire(&slot->mutex);
1766 
1767  /*
1768  * For physical replication we don't need the interlock provided by xmin
1769  * and effective_xmin since the consequences of a missed increase are
1770  * limited to query cancellations, so set both at once.
1771  */
1772  if (!TransactionIdIsNormal(slot->data.xmin) ||
1773  !TransactionIdIsNormal(feedbackXmin) ||
1774  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1775  {
1776  changed = true;
1777  slot->data.xmin = feedbackXmin;
1778  slot->effective_xmin = feedbackXmin;
1779  }
1780  SpinLockRelease(&slot->mutex);
1781 
1782  if (changed)
1783  {
1786  }
1787 }
TransactionId xmin
Definition: proc.h:208
ReplicationSlotPersistentData data
Definition: slot.h:115
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:111
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:57
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:88
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:617
void ReplicationSlotMarkDirty(void)
Definition: slot.c:581
static void ProcessRepliesIfAny ( void  )
static

Definition at line 1502 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_reply_timestamp, NULL, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1503 {
1504  unsigned char firstchar;
1505  int r;
1506  bool received = false;
1507 
1508  for (;;)
1509  {
1510  pq_startmsgread();
1511  r = pq_getbyte_if_available(&firstchar);
1512  if (r < 0)
1513  {
1514  /* unexpected error or EOF */
1516  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1517  errmsg("unexpected EOF on standby connection")));
1518  proc_exit(0);
1519  }
1520  if (r == 0)
1521  {
1522  /* no data available without blocking */
1523  pq_endmsgread();
1524  break;
1525  }
1526 
1527  /* Read the message contents */
1529  if (pq_getmessage(&reply_message, 0))
1530  {
1532  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1533  errmsg("unexpected EOF on standby connection")));
1534  proc_exit(0);
1535  }
1536 
1537  /*
1538  * If we already received a CopyDone from the frontend, the frontend
1539  * should not send us anything until we've closed our end of the COPY.
1540  * XXX: In theory, the frontend could already send the next command
1541  * before receiving the CopyDone, but libpq doesn't currently allow
1542  * that.
1543  */
1544  if (streamingDoneReceiving && firstchar != 'X')
1545  ereport(FATAL,
1546  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1547  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1548  firstchar)));
1549 
1550  /* Handle the very limited subset of commands expected in this phase */
1551  switch (firstchar)
1552  {
1553  /*
1554  * 'd' means a standby reply wrapped in a CopyData packet.
1555  */
1556  case 'd':
1558  received = true;
1559  break;
1560 
1561  /*
1562  * CopyDone means the standby requested to finish streaming.
1563  * Reply with CopyDone, if we had not sent that already.
1564  */
1565  case 'c':
1566  if (!streamingDoneSending)
1567  {
1568  pq_putmessage_noblock('c', NULL, 0);
1569  streamingDoneSending = true;
1570  }
1571 
1572  streamingDoneReceiving = true;
1573  received = true;
1574  break;
1575 
1576  /*
1577  * 'X' means that the standby is closing down the socket.
1578  */
1579  case 'X':
1580  proc_exit(0);
1581 
1582  default:
1583  ereport(FATAL,
1584  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1585  errmsg("invalid standby message type \"%c\"",
1586  firstchar)));
1587  }
1588  }
1589 
1590  /*
1591  * Save the last reply timestamp if we've received at least one reply.
1592  */
1593  if (received)
1594  {
1596  waiting_for_ping_response = false;
1597  }
1598 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1604
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
void pq_startmsgread(void)
Definition: pqcomm.c:1191
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1021
static bool streamingDoneSending
Definition: walsender.c:172
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
#define ereport(elevel, rest)
Definition: elog.h:122
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1253
static StringInfoData reply_message
Definition: walsender.c:154
void pq_endmsgread(void)
Definition: pqcomm.c:1215
#define NULL
Definition: c.h:229
static bool streamingDoneReceiving
Definition: walsender.c:173
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:164
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1793 of file walsender.c.

References DEBUG2, elog, GetNextXidAndEpoch(), InvalidTransactionId, MyPgXact, MyReplicationSlot, NULL, PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), TransactionIdIsNormal, TransactionIdPrecedesOrEquals(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1794 {
1795  TransactionId nextXid;
1796  uint32 nextEpoch;
1797  TransactionId feedbackXmin;
1798  uint32 feedbackEpoch;
1799 
1800  /*
1801  * Decipher the reply message. The caller already consumed the msgtype
1802  * byte.
1803  */
1804  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1805  feedbackXmin = pq_getmsgint(&reply_message, 4);
1806  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1807 
1808  elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
1809  feedbackXmin,
1810  feedbackEpoch);
1811 
1812  /* Unset WalSender's xmin if the feedback message value is invalid */
1813  if (!TransactionIdIsNormal(feedbackXmin))
1814  {
1816  if (MyReplicationSlot != NULL)
1817  PhysicalReplicationSlotNewXmin(feedbackXmin);
1818  return;
1819  }
1820 
1821  /*
1822  * Check that the provided xmin/epoch are sane, that is, not in the future
1823  * and not so far back as to be already wrapped around. Ignore if not.
1824  *
1825  * Epoch of nextXid should be same as standby, or if the counter has
1826  * wrapped, then one greater than standby.
1827  */
1828  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1829 
1830  if (feedbackXmin <= nextXid)
1831  {
1832  if (feedbackEpoch != nextEpoch)
1833  return;
1834  }
1835  else
1836  {
1837  if (feedbackEpoch + 1 != nextEpoch)
1838  return;
1839  }
1840 
1841  if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
1842  return; /* epoch OK, but it's wrapped around */
1843 
1844  /*
1845  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1846  * the xmin will be taken into account by GetOldestXmin. This will hold
1847  * back the removal of dead rows and thereby prevent the generation of
1848  * cleanup conflicts on the standby server.
1849  *
1850  * There is a small window for a race condition here: although we just
1851  * checked that feedbackXmin precedes nextXid, the nextXid could have
1852  * gotten advanced between our fetching it and applying the xmin below,
1853  * perhaps far enough to make feedbackXmin wrap around. In that case the
1854  * xmin we set here would be "in the future" and have no effect. No point
1855  * in worrying about this since it's too late to save the desired data
1856  * anyway. Assuming that the standby sends us an increasing sequence of
1857  * xmins, this could only happen during the first reply cycle, else our
1858  * own xmin would prevent nextXid from advancing so far.
1859  *
1860  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1861  * is assumed atomic, and there's no real need to prevent a concurrent
1862  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1863  * safe, and if we're moving it backwards, well, the data is at risk
1864  * already since a VACUUM could have just finished calling GetOldestXmin.)
1865  *
1866  * If we're using a replication slot we reserve the xmin via that,
1867  * otherwise via the walsender's PGXACT entry.
1868  *
1869  * XXX: It might make sense to generalize the ephemeral slot concept and
1870  * always use the slot mechanism to handle the feedback xmin.
1871  */
1872  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1873  PhysicalReplicationSlotNewXmin(feedbackXmin);
1874  else
1875  MyPgXact->xmin = feedbackXmin;
1876 }
uint32 TransactionId
Definition: c.h:397
TransactionId xmin
Definition: proc.h:208
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
Definition: walsender.c:1759
PGXACT * MyPgXact
Definition: proc.c:68
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8273
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:268
static StringInfoData reply_message
Definition: walsender.c:154
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define elog
Definition: elog.h:219
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static void ProcessStandbyMessage ( void  )
static

Definition at line 1604 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1605 {
1606  char msgtype;
1607 
1608  /*
1609  * Check message type from the first byte.
1610  */
1611  msgtype = pq_getmsgbyte(&reply_message);
1612 
1613  switch (msgtype)
1614  {
1615  case 'r':
1617  break;
1618 
1619  case 'h':
1621  break;
1622 
1623  default:
1625  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1626  errmsg("unexpected message type \"%c\"", msgtype)));
1627  proc_exit(0);
1628  }
1629 }
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static StringInfoData reply_message
Definition: walsender.c:154
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1667
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1793
static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1667 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1668 {
1669  XLogRecPtr writePtr,
1670  flushPtr,
1671  applyPtr;
1672  bool replyRequested;
1673  TimeOffset writeLag,
1674  flushLag,
1675  applyLag;
1676  bool clearLagTimes;
1677  TimestampTz now;
1678 
1679  static bool fullyAppliedLastTime = false;
1680 
1681  /* the caller already consumed the msgtype byte */
1682  writePtr = pq_getmsgint64(&reply_message);
1683  flushPtr = pq_getmsgint64(&reply_message);
1684  applyPtr = pq_getmsgint64(&reply_message);
1685  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1686  replyRequested = pq_getmsgbyte(&reply_message);
1687 
1688  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1689  (uint32) (writePtr >> 32), (uint32) writePtr,
1690  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1691  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1692  replyRequested ? " (reply requested)" : "");
1693 
1694  /* See if we can compute the round-trip lag for these positions. */
1695  now = GetCurrentTimestamp();
1696  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1697  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1698  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1699 
1700  /*
1701  * If the standby reports that it has fully replayed the WAL in two
1702  * consecutive reply messages, then the second such message must result
1703  * from wal_receiver_status_interval expiring on the standby. This is a
1704  * convenient time to forget the lag times measured when it last
1705  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1706  * until more WAL traffic arrives.
1707  */
1708  clearLagTimes = false;
1709  if (applyPtr == sentPtr)
1710  {
1711  if (fullyAppliedLastTime)
1712  clearLagTimes = true;
1713  fullyAppliedLastTime = true;
1714  }
1715  else
1716  fullyAppliedLastTime = false;
1717 
1718  /* Send a reply if the standby requested one. */
1719  if (replyRequested)
1720  WalSndKeepalive(false);
1721 
1722  /*
1723  * Update shared state for this WalSender process based on reply data from
1724  * standby.
1725  */
1726  {
1727  WalSnd *walsnd = MyWalSnd;
1728 
1729  SpinLockAcquire(&walsnd->mutex);
1730  walsnd->write = writePtr;
1731  walsnd->flush = flushPtr;
1732  walsnd->apply = applyPtr;
1733  if (writeLag != -1 || clearLagTimes)
1734  walsnd->writeLag = writeLag;
1735  if (flushLag != -1 || clearLagTimes)
1736  walsnd->flushLag = flushLag;
1737  if (applyLag != -1 || clearLagTimes)
1738  walsnd->applyLag = applyLag;
1739  SpinLockRelease(&walsnd->mutex);
1740  }
1741 
1744 
1745  /*
1746  * Advance our local xmin horizon when the client confirmed a flush.
1747  */
1748  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1749  {
1752  else
1754  }
1755 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3117
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1635
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:268
#define SlotIsLogical(slot)
Definition: slot.h:134
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:154
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3241
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:884
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define elog
Definition: elog.h:219
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:405
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
bool am_cascading_walsender
Definition: walsender.c:109
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 418 of file walsender.c.

References buf, BYTEAOID, CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint(), pq_sendstring(), read, TEXTOID, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

419 {
421  char histfname[MAXFNAMELEN];
422  char path[MAXPGPATH];
423  int fd;
424  off_t histfilelen;
425  off_t bytesleft;
426  Size len;
427 
428  /*
429  * Reply with a result set with one row, and two columns. The first col is
430  * the name of the history file, 2nd is the contents.
431  */
432 
433  TLHistoryFileName(histfname, cmd->timeline);
434  TLHistoryFilePath(path, cmd->timeline);
435 
436  /* Send a RowDescription message */
437  pq_beginmessage(&buf, 'T');
438  pq_sendint(&buf, 2, 2); /* 2 fields */
439 
440  /* first field */
441  pq_sendstring(&buf, "filename"); /* col name */
442  pq_sendint(&buf, 0, 4); /* table oid */
443  pq_sendint(&buf, 0, 2); /* attnum */
444  pq_sendint(&buf, TEXTOID, 4); /* type oid */
445  pq_sendint(&buf, -1, 2); /* typlen */
446  pq_sendint(&buf, 0, 4); /* typmod */
447  pq_sendint(&buf, 0, 2); /* format code */
448 
449  /* second field */
450  pq_sendstring(&buf, "content"); /* col name */
451  pq_sendint(&buf, 0, 4); /* table oid */
452  pq_sendint(&buf, 0, 2); /* attnum */
453  pq_sendint(&buf, BYTEAOID, 4); /* type oid */
454  pq_sendint(&buf, -1, 2); /* typlen */
455  pq_sendint(&buf, 0, 4); /* typmod */
456  pq_sendint(&buf, 0, 2); /* format code */
457  pq_endmessage(&buf);
458 
459  /* Send a DataRow message */
460  pq_beginmessage(&buf, 'D');
461  pq_sendint(&buf, 2, 2); /* # of columns */
462  len = strlen(histfname);
463  pq_sendint(&buf, len, 4); /* col1 len */
464  pq_sendbytes(&buf, histfname, len);
465 
466  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
467  if (fd < 0)
468  ereport(ERROR,
470  errmsg("could not open file \"%s\": %m", path)));
471 
472  /* Determine file length and send it to client */
473  histfilelen = lseek(fd, 0, SEEK_END);
474  if (histfilelen < 0)
475  ereport(ERROR,
477  errmsg("could not seek to end of file \"%s\": %m", path)));
478  if (lseek(fd, 0, SEEK_SET) != 0)
479  ereport(ERROR,
481  errmsg("could not seek to beginning of file \"%s\": %m", path)));
482 
483  pq_sendint(&buf, histfilelen, 4); /* col2 len */
484 
485  bytesleft = histfilelen;
486  while (bytesleft > 0)
487  {
488  char rbuf[BLCKSZ];
489  int nread;
490 
492  nread = read(fd, rbuf, sizeof(rbuf));
494  if (nread <= 0)
495  ereport(ERROR,
497  errmsg("could not read file \"%s\": %m",
498  path)));
499  pq_sendbytes(&buf, rbuf, nread);
500  bytesleft -= nread;
501  }
502  CloseTransientFile(fd);
503 
504  pq_endmessage(&buf);
505 }
#define TEXTOID
Definition: pg_type.h:324
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
TimeLineID timeline
Definition: replnodes.h:96
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:65
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2107
int errcode_for_file_access(void)
Definition: elog.c:598
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1205
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2268
#define MAXFNAMELEN
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1181
#define BYTEAOID
Definition: pg_type.h:292
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:115
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define read(a, b, c)
Definition: win32.h:18
#define TLHistoryFilePath(path, tli)
static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1030 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), walsender_ready_to_stop, WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1031 {
1033 
1034  /* make sure that our requirements are still fulfilled */
1036 
1038 
1040 
1041  /*
1042  * Force a disconnect, so that the decoding code doesn't need to care
1043  * about an eventual switch from running in recovery, to running in a
1044  * normal environment. Client code is expected to handle reconnects.
1045  */
1047  {
1048  ereport(LOG,
1049  (errmsg("terminating walsender process after promotion")));
1050  walsender_ready_to_stop = true;
1051  }
1052 
1054 
1055  /* Send a CopyBothResponse message, and start streaming */
1056  pq_beginmessage(&buf, 'W');
1057  pq_sendbyte(&buf, 0);
1058  pq_sendint(&buf, 0, 2);
1059  pq_endmessage(&buf);
1060  pq_flush();
1061 
1062  /*
1063  * Initialize position to the last ack'ed one, then the xlog records begin
1064  * to be shipped from that position.
1065  */
1067  cmd->startpoint, cmd->options,
1070 
1071  /* Start reading WAL from the oldest required WAL. */
1073 
1074  /*
1075  * Report the location after which we'll send out further commits as the
1076  * current sentPtr.
1077  */
1079 
1080  /* Also update the sent position status in shared memory */
1081  {
1082  WalSnd *walsnd = MyWalSnd;
1083 
1084  SpinLockAcquire(&walsnd->mutex);
1086  SpinLockRelease(&walsnd->mutex);
1087  }
1088 
1089  replication_active = true;
1090 
1092 
1093  /* Main loop of walsender */
1095 
1098 
1099  replication_active = false;
1101  proc_exit(0);
1103 
1104  /* Get out of COPY mode (CommandComplete). */
1105  EndCommand("COPY 0", DestRemote);
1106 }
XLogRecPtr startpoint
Definition: replnodes.h:84
#define pq_flush()
Definition: libpq.h:39
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void proc_exit(int code)
Definition: ipc.c:99
ReplicationSlotPersistentData data
Definition: slot.h:115
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7855
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
XLogRecPtr confirmed_flush
Definition: slot.h:76
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:743
#define SpinLockAcquire(lock)
Definition: spin.h:62
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:190
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:327
static char * buf
Definition: pg_test_fsync.c:65
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1144
static XLogRecPtr logical_startptr
Definition: walsender.c:191
void ReplicationSlotRelease(void)
Definition: slot.c:375
void SyncRepInitConfig(void)
Definition: syncrep.c:377
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:1955
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1117
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:675
void WalSndSetState(WalSndState state)
Definition: walsender.c:2902
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:457
static void XLogSendLogical(void)
Definition: walsender.c:2614
XLogRecPtr restart_lsn
Definition: slot.h:68
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
bool am_cascading_walsender
Definition: walsender.c:109
static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 514 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), Int64GetDatum(), INT8OID, InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TEXTOID, ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TupleDescInitBuiltinEntry(), values, walsender_ready_to_stop, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

515 {
517  XLogRecPtr FlushPtr;
518 
519  if (ThisTimeLineID == 0)
520  ereport(ERROR,
521  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
522  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
523 
524  /*
525  * We assume here that we're logging enough information in the WAL for
526  * log-shipping, since this is checked in PostmasterMain().
527  *
528  * NOTE: wal_level can only change at shutdown, so in most cases it is
529  * difficult for there to be WAL data that we can still see that was
530  * written at wal_level='minimal'.
531  */
532 
533  if (cmd->slotname)
534  {
537  ereport(ERROR,
538  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
539  (errmsg("cannot use a logical replication slot for physical replication"))));
540  }
541 
542  /*
543  * Select the timeline. If it was given explicitly by the client, use
544  * that. Otherwise use the timeline of the last replayed record, which is
545  * kept in ThisTimeLineID.
546  */
548  {
549  /* this also updates ThisTimeLineID */
550  FlushPtr = GetStandbyFlushRecPtr();
551  }
552  else
553  FlushPtr = GetFlushRecPtr();
554 
555  if (cmd->timeline != 0)
556  {
557  XLogRecPtr switchpoint;
558 
559  sendTimeLine = cmd->timeline;
561  {
562  sendTimeLineIsHistoric = false;
564  }
565  else
566  {
567  List *timeLineHistory;
568 
569  sendTimeLineIsHistoric = true;
570 
571  /*
572  * Check that the timeline the client requested for exists, and
573  * the requested start location is on that timeline.
574  */
575  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
576  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
578  list_free_deep(timeLineHistory);
579 
580  /*
581  * Found the requested timeline in the history. Check that
582  * requested startpoint is on that timeline in our history.
583  *
584  * This is quite loose on purpose. We only check that we didn't
585  * fork off the requested timeline before the switchpoint. We
586  * don't check that we switched *to* it before the requested
587  * starting point. This is because the client can legitimately
588  * request to start replication from the beginning of the WAL
589  * segment that contains switchpoint, but on the new timeline, so
590  * that it doesn't end up with a partial segment. If you ask for a
591  * too old starting point, you'll get an error later when we fail
592  * to find the requested WAL segment in pg_wal.
593  *
594  * XXX: we could be more strict here and only allow a startpoint
595  * that's older than the switchpoint, if it's still in the same
596  * WAL segment.
597  */
598  if (!XLogRecPtrIsInvalid(switchpoint) &&
599  switchpoint < cmd->startpoint)
600  {
601  ereport(ERROR,
602  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
603  (uint32) (cmd->startpoint >> 32),
604  (uint32) (cmd->startpoint),
605  cmd->timeline),
606  errdetail("This server's history forked from timeline %u at %X/%X.",
607  cmd->timeline,
608  (uint32) (switchpoint >> 32),
609  (uint32) (switchpoint))));
610  }
611  sendTimeLineValidUpto = switchpoint;
612  }
613  }
614  else
615  {
618  sendTimeLineIsHistoric = false;
619  }
620 
622 
623  /* If there is nothing to stream, don't even enter COPY mode */
625  {
626  /*
627  * When we first start replication the standby will be behind the
628  * primary. For some applications, for example, synchronous
629  * replication, it is important to have a clear state for this initial
630  * catchup mode, so we can trigger actions when we change streaming
631  * state later. We may stay in this state for a long time, which is
632  * exactly why we want to be able to monitor whether or not we are
633  * still here.
634  */
636 
637  /* Send a CopyBothResponse message, and start streaming */
638  pq_beginmessage(&buf, 'W');
639  pq_sendbyte(&buf, 0);
640  pq_sendint(&buf, 0, 2);
641  pq_endmessage(&buf);
642  pq_flush();
643 
644  /*
645  * Don't allow a request to stream from a future point in WAL that
646  * hasn't been flushed to disk in this server yet.
647  */
648  if (FlushPtr < cmd->startpoint)
649  {
650  ereport(ERROR,
651  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
652  (uint32) (cmd->startpoint >> 32),
653  (uint32) (cmd->startpoint),
654  (uint32) (FlushPtr >> 32),
655  (uint32) (FlushPtr))));
656  }
657 
658  /* Start streaming from the requested point */
659  sentPtr = cmd->startpoint;
660 
661  /* Initialize shared memory status, too */
662  {
663  WalSnd *walsnd = MyWalSnd;
664 
665  SpinLockAcquire(&walsnd->mutex);
666  walsnd->sentPtr = sentPtr;
667  SpinLockRelease(&walsnd->mutex);
668  }
669 
671 
672  /* Main loop of walsender */
673  replication_active = true;
674 
676 
677  replication_active = false;
679  proc_exit(0);
681 
683  }
684 
685  if (cmd->slotname)
687 
688  /*
689  * Copy is finished now. Send a single-row result set indicating the next
690  * timeline.
691  */
693  {
694  char startpos_str[8 + 1 + 8 + 1];
695  DestReceiver *dest;
696  TupOutputState *tstate;
697  TupleDesc tupdesc;
698  Datum values[2];
699  bool nulls[2];
700 
701  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
702  (uint32) (sendTimeLineValidUpto >> 32),
704 
706  MemSet(nulls, false, sizeof(nulls));
707 
708  /*
709  * Need a tuple descriptor representing two columns.
710  * int8 may seem like a surprising data type for this, but in theory
711  * int4 would not be wide enough for this, as TimeLineID is unsigned.
712  */
713  tupdesc = CreateTemplateTupleDesc(2, false);
714  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
715  INT8OID, -1, 0);
716  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
717  TEXTOID, -1, 0);
718 
719  /* prepare for projection of tuple */
720  tstate = begin_tup_output_tupdesc(dest, tupdesc);
721 
722  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
723  values[1] = CStringGetTextDatum(startpos_str);
724 
725  /* send it to dest */
726  do_tup_output(tstate, values, nulls);
727 
728  end_tup_output(tstate);
729  }
730 
731  /* Send CommandComplete message */
732  pq_puttextmessage('C', "START_STREAMING");
733 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:84
#define pq_flush()
Definition: libpq.h:39
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define TEXTOID
Definition: pg_type.h:324
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2355
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8204
void list_free_deep(List *list)
Definition: list.c:1147
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:563
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:65
static bool streamingDoneSending
Definition: walsender.c:172
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
int errdetail(const char *fmt,...)
Definition: elog.c:873
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:375
#define SlotIsLogical(slot)
Definition: slot.h:134
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:2152
void SyncRepInitConfig(void)
Definition: syncrep.c:377
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:1955
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
uintptr_t Datum
Definition: postgres.h:372
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define INT8OID
Definition: pg_type.h:304
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
void WalSndSetState(WalSndState state)
Definition: walsender.c:2902
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static bool streamingDoneReceiving
Definition: walsender.c:173
static Datum values[MAXATTR]
Definition: bootstrap.c:162
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
TimeLineID timeline
Definition: replnodes.h:83
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2716
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:400
Definition: pg_list.h:45
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
bool am_cascading_walsender
Definition: walsender.c:109
static void WalSndCheckTimeOut ( TimestampTz  now)
static

Definition at line 1928 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1929 {
1930  TimestampTz timeout;
1931 
1932  /* don't bail out if we're doing something that doesn't require timeouts */
1933  if (last_reply_timestamp <= 0)
1934  return;
1935 
1938 
1939  if (wal_sender_timeout > 0 && now >= timeout)
1940  {
1941  /*
1942  * Since typically expiration of replication timeout means
1943  * communication problem, we don't send the error message to the
1944  * standby.
1945  */
1947  (errmsg("terminating walsender process due to replication timeout")));
1948 
1949  WalSndShutdown();
1950  }
1951 }
int wal_sender_timeout
Definition: walsender.c:115
int64 TimestampTz
Definition: timestamp.h:39
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndShutdown(void)
Definition: walsender.c:223
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:797
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 1886 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1887 {
1888  long sleeptime = 10000; /* 10 s */
1889 
1891  {
1892  TimestampTz wakeup_time;
1893  long sec_to_timeout;
1894  int microsec_to_timeout;
1895 
1896  /*
1897  * At the latest stop sleeping once wal_sender_timeout has been
1898  * reached.
1899  */
1902 
1903  /*
1904  * If no ping has been sent yet, wakeup when it's time to do so.
1905  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
1906  * the timeout passed without a response.
1907  */
1910  wal_sender_timeout / 2);
1911 
1912  /* Compute relative time until wakeup. */
1913  TimestampDifference(now, wakeup_time,
1914  &sec_to_timeout, &microsec_to_timeout);
1915 
1916  sleeptime = sec_to_timeout * 1000 +
1917  microsec_to_timeout / 1000;
1918  }
1919 
1920  return sleeptime;
1921 }
int wal_sender_timeout
Definition: walsender.c:115
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1623
static bool waiting_for_ping_response
Definition: walsender.c:164
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2675 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2676 {
2677  XLogRecPtr replicatedPtr;
2678 
2679  /* ... let's just be real sure we're caught up ... */
2680  send_data();
2681 
2682  /*
2683  * To figure out whether all WAL has successfully been replicated, check
2684  * flush location if valid, write otherwise. Tools like pg_receivewal
2685  * will usually (unless in synchronous mode) return an invalid flush
2686  * location.
2687  */
2688  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2690 
2691  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2692  !pq_is_send_pending())
2693  {
2694  /* Inform the standby that XLOG streaming is done */
2695  EndCommand("COPY 0", DestRemote);
2696  pq_flush();
2697 
2698  proc_exit(0);
2699  }
2701  {
2702  WalSndKeepalive(true);
2704  }
2705 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3117
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static bool WalSndCaughtUp
Definition: walsender.c:176
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:164
void WalSndErrorCleanup ( void  )

Definition at line 283 of file walsender.c.

References close, ConditionVariableCancelSleep(), LWLockReleaseAll(), MyReplicationSlot, NULL, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, walsender_ready_to_stop, WalSndSetState(), and WALSNDSTATE_STARTUP.

Referenced by PostgresMain().

284 {
288 
289  if (sendFile >= 0)
290  {
291  close(sendFile);
292  sendFile = -1;
293  }
294 
295  if (MyReplicationSlot != NULL)
297 
299 
300  replication_active = false;
302  proc_exit(0);
303 
304  /* Revert back to startup state */
306 }
static int sendFile
Definition: walsender.c:128
void proc_exit(int code)
Definition: ipc.c:99
void ReplicationSlotCleanup()
Definition: slot.c:413
void ConditionVariableCancelSleep(void)
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
void ReplicationSlotRelease(void)
Definition: slot.c:375
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1205
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
void WalSndSetState(WalSndState state)
Definition: walsender.c:2902
void LWLockReleaseAll(void)
Definition: lwlock.c:1814
#define close(a)
Definition: win32.h:17
static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 2921 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

2922 {
2923  switch (state)
2924  {
2925  case WALSNDSTATE_STARTUP:
2926  return "startup";
2927  case WALSNDSTATE_BACKUP:
2928  return "backup";
2929  case WALSNDSTATE_CATCHUP:
2930  return "catchup";
2931  case WALSNDSTATE_STREAMING:
2932  return "streaming";
2933  }
2934  return "UNKNOWN";
2935 }
Definition: regguts.h:298
static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3117 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3118 {
3119  elog(DEBUG2, "sending replication keepalive");
3120 
3121  /* construct the message... */
3123  pq_sendbyte(&output_message, 'k');
3126  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3127 
3128  /* ... and send it wrapped in CopyData */
3130 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
static StringInfoData output_message
Definition: walsender.c:153
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
static XLogRecPtr sentPtr
Definition: walsender.c:150
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog
Definition: elog.h:219
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
static void WalSndKeepaliveIfNecessary ( TimestampTz  now)
static

Definition at line 3136 of file walsender.c.

References last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3137 {
3138  TimestampTz ping_time;
3139 
3140  /*
3141  * Don't send keepalive messages if timeouts are globally disabled or
3142  * we're doing something not partaking in timeouts.
3143  */
3144  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3145  return;
3146 
3148  return;
3149 
3150  /*
3151  * If half of wal_sender_timeout has lapsed without receiving any reply
3152  * from the standby, send a keep-alive message to the standby requesting
3153  * an immediate reply.
3154  */
3156  wal_sender_timeout / 2);
3157  if (now >= ping_time)
3158  {
3159  WalSndKeepalive(true);
3161 
3162  /* Try to flush pending output to the client */
3163  if (pq_flush_if_writable() != 0)
3164  WalSndShutdown();
3165  }
3166 }
int wal_sender_timeout
Definition: walsender.c:115
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3117
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:223
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:164
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2152 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, NULL, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2153 {
2154  WalSnd *walsnd = MyWalSnd;
2155 
2156  Assert(walsnd != NULL);
2157 
2158  MyWalSnd = NULL;
2159 
2160  SpinLockAcquire(&walsnd->mutex);
2161  /* clear latch while holding the spinlock, so it can safely be read */
2162  walsnd->latch = NULL;
2163  /* Mark WalSnd struct as no longer being in use. */
2164  walsnd->pid = 0;
2165  SpinLockRelease(&walsnd->mutex);
2166 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:105
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2789 of file walsender.c.

References MyLatch, MyProcPid, replication_active, SetLatch(), and walsender_ready_to_stop.

Referenced by WalSndSignals().

2790 {
2791  int save_errno = errno;
2792 
2793  /*
2794  * If replication has not yet started, die like with SIGTERM. If
2795  * replication is active, only set a flag and wake up the main loop. It
2796  * will send any outstanding WAL, wait for it to be replicated to the
2797  * standby, and then exit gracefully.
2798  */
2799  if (!replication_active)
2800  kill(MyProcPid, SIGTERM);
2801 
2802  walsender_ready_to_stop = true;
2803  SetLatch(MyLatch);
2804 
2805  errno = save_errno;
2806 }
int MyProcPid
Definition: globals.c:38
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
struct Latch * MyLatch
Definition: globals.c:51
static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 1955 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGHUP, last_reply_timestamp, MyLatch, MyProcPort, now(), PGC_SIGHUP, pgstat_report_activity(), PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, STATE_RUNNING, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), walsender_ready_to_stop, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

1956 {
1957  /*
1958  * Initialize the last reply timestamp. That enables timeout processing
1959  * from hereon.
1960  */
1962  waiting_for_ping_response = false;
1963 
1964  /* Report to pgstat that this process is a WAL sender */
1965  pgstat_report_activity(STATE_RUNNING, "walsender");
1966 
1967  /*
1968  * Loop until we reach the end of this timeline or the client requests to
1969  * stop streaming.
1970  */
1971  for (;;)
1972  {
1973  TimestampTz now;
1974 
1975  /*
1976  * Emergency bailout if postmaster has died. This is to avoid the
1977  * necessity for manual cleanup of all postmaster children.
1978  */
1979  if (!PostmasterIsAlive())
1980  exit(1);
1981 
1982  /* Clear any already-pending wakeups */
1984 
1986 
1987  /* Process any requests or signals received recently */
1988  if (got_SIGHUP)
1989  {
1990  got_SIGHUP = false;
1993  }
1994 
1995  /* Check for input from the client */
1997 
1998  /*
1999  * If we have received CopyDone from the client, sent CopyDone
2000  * ourselves, and the output buffer is empty, it's time to exit
2001  * streaming.
2002  */
2004  break;
2005 
2006  /*
2007  * If we don't have any pending data in the output buffer, try to send
2008  * some more. If there is some, we don't bother to call send_data
2009  * again until we've flushed it ... but we'd better assume we are not
2010  * caught up.
2011  */
2012  if (!pq_is_send_pending())
2013  send_data();
2014  else
2015  WalSndCaughtUp = false;
2016 
2017  /* Try to flush pending output to the client */
2018  if (pq_flush_if_writable() != 0)
2019  WalSndShutdown();
2020 
2021  /* If nothing remains to be sent right now ... */
2023  {
2024  /*
2025  * If we're in catchup state, move to streaming. This is an
2026  * important state change for users to know about, since before
2027  * this point data loss might occur if the primary dies and we
2028  * need to failover to the standby. The state change is also
2029  * important for synchronous replication, since commits that
2030  * started to wait at that point might wait for some time.
2031  */
2033  {
2034  ereport(DEBUG1,
2035  (errmsg("standby \"%s\" has now caught up with primary",
2036  application_name)));
2038  }
2039 
2040  /*
2041  * When SIGUSR2 arrives, we send any outstanding logs up to the
2042  * shutdown checkpoint record (i.e., the latest record), wait for
2043  * them to be replicated to the standby, and exit. This may be a
2044  * normal termination at shutdown, or a promotion, the walsender
2045  * is not sure which.
2046  */
2048  WalSndDone(send_data);
2049  }
2050 
2051  now = GetCurrentTimestamp();
2052 
2053  /* Check for replication timeout. */
2054  WalSndCheckTimeOut(now);
2055 
2056  /* Send keepalive if the time has come */
2058 
2059  /*
2060  * We don't block if not caught up, unless there is unsent data
2061  * pending in which case we'd better block until the socket is
2062  * write-ready. This test is only needed for the case where the
2063  * send_data callback handled a subset of the available data but then
2064  * pq_flush_if_writable flushed it all --- we should immediately try
2065  * to send more.
2066  */
2068  {
2069  long sleeptime;
2070  int wakeEvents;
2071 
2072  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2074 
2075  sleeptime = WalSndComputeSleeptime(now);
2076 
2077  if (pq_is_send_pending())
2078  wakeEvents |= WL_SOCKET_WRITEABLE;
2079 
2080  /* Sleep until something happens or we time out */
2081  WaitLatchOrSocket(MyLatch, wakeEvents,
2082  MyProcPort->sock, sleeptime,
2084  }
2085  }
2086  return;
2087 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2806
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2675
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
pgsocket sock
Definition: libpq-be.h:118
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:179
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:172
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void SyncRepInitConfig(void)
Definition: syncrep.c:377
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:176
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:223
WalSnd * MyWalSnd
Definition: walsender.c:105
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3136
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1886
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1928
void WalSndSetState(WalSndState state)
Definition: walsender.c:2902
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
static bool streamingDoneReceiving
Definition: walsender.c:173
char * application_name
Definition: guc.c:472
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static bool waiting_for_ping_response
Definition: walsender.c:164
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1502
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1117 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1118 {
1119  /* can't have sync rep confused by sending the same LSN several times */
1120  if (!last_write)
1121  lsn = InvalidXLogRecPtr;
1122 
1123  resetStringInfo(ctx->out);
1124 
1125  pq_sendbyte(ctx->out, 'w');
1126  pq_sendint64(ctx->out, lsn); /* dataStart */
1127  pq_sendint64(ctx->out, lsn); /* walEnd */
1128 
1129  /*
1130  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1131  * reserve space here.
1132  */
1133  pq_sendint64(ctx->out, 0); /* sendtime */
1134 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
StringInfo out
Definition: logical.h:59
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
void WalSndRqstFileReload ( void  )

Definition at line 2746 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2747 {
2748  int i;
2749 
2750  for (i = 0; i < max_wal_senders; i++)
2751  {
2752  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2753 
2754  if (walsnd->pid == 0)
2755  continue;
2756 
2757  SpinLockAcquire(&walsnd->mutex);
2758  walsnd->needreload = true;
2759  SpinLockRelease(&walsnd->mutex);
2760  }
2761 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:114
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i
void WalSndSetState ( WalSndState  state)

Definition at line 2902 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), and WalSndLoop().

2903 {
2904  WalSnd *walsnd = MyWalSnd;
2905 
2907 
2908  if (walsnd->state == state)
2909  return;
2910 
2911  SpinLockAcquire(&walsnd->mutex);
2912  walsnd->state = state;
2913  SpinLockRelease(&walsnd->mutex);
2914 }
slock_t mutex
bool am_walsender
Definition: walsender.c:108
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:105
#define Assert(condition)
Definition: c.h:675
Definition: regguts.h:298
void WalSndShmemInit ( void  )

Definition at line 2846 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

2847 {
2848  bool found;
2849  int i;
2850 
2851  WalSndCtl = (WalSndCtlData *)
2852  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2853 
2854  if (!found)
2855  {
2856  /* First time through, so initialize */
2858 
2859  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2861 
2862  for (i = 0; i < max_wal_senders; i++)
2863  {
2864  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2865 
2866  SpinLockInit(&walsnd->mutex);
2867  }
2868  }
2869 }
Size WalSndShmemSize(void)
Definition: walsender.c:2834
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
#define MemSet(start, val, len)
Definition: c.h:857
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:114
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
Size WalSndShmemSize ( void  )

Definition at line 2834 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2835 {
2836  Size size = 0;
2837 
2838  size = offsetof(WalSndCtlData, walsnds);
2839  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2840 
2841  return size;
2842 }
int max_wal_senders
Definition: walsender.c:114
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:356
#define offsetof(type, field)
Definition: c.h:555
static void WalSndShutdown ( void  )
static

Definition at line 223 of file walsender.c.

References am_cascading_walsender, CurrentResourceOwner, InitWalSenderSlot(), LagTracker, MarkPostmasterChildWalSender(), NULL, PMSIGNAL_ADVANCE_STATE_MACHINE, RecoveryInProgress(), ResourceOwnerCreate(), and SendPostmasterSignal().

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

252 {
254 
255  /* Create a per-walsender data structure in shared memory */
257 
258  /* Set up resource owner */
259  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
260 
261  /*
262  * Let postmaster know that we're a WAL sender. Once we've declared us as
263  * a WAL sender process, postmaster will let us outlive the bgwriter and
264  * kill us last in the shutdown sequence, so we get a chance to stream all
265  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
266  * there's no going back, and we mustn't write any WAL records after this.
267  */
270 
271  /* Initialize empty timestamp buffer for lag tracking. */
272  memset(&LagTracker, 0, sizeof(LagTracker));
273 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2091
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
bool RecoveryInProgress(void)
Definition: xlog.c:7855
static struct @25 LagTracker
#define NULL
Definition: c.h:229
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
bool am_cascading_walsender
Definition: walsender.c:109
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
static void WalSndSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 2765 of file walsender.c.

References got_SIGHUP, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2766 {
2767  int save_errno = errno;
2768 
2769  got_SIGHUP = true;
2770 
2771  SetLatch(MyLatch);
2772 
2773  errno = save_errno;
2774 }
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:179
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
struct Latch * MyLatch
Definition: globals.c:51
void WalSndSignals ( void  )

Definition at line 2810 of file walsender.c.

References die(), InitializeTimeouts(), pqsignal(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalSndLastCycleHandler(), WalSndSigHupHandler(), and WalSndXLogSendHandler().

Referenced by PostgresMain().

2811 {
2812  /* Set up signal handlers */
2813  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2814  * file */
2815  pqsignal(SIGINT, SIG_IGN); /* not used */
2816  pqsignal(SIGTERM, die); /* request shutdown */
2817  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2818  InitializeTimeouts(); /* establishes SIGALRM handler */
2820  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2821  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2822  * shutdown */
2823 
2824  /* Reset some signals that are accepted by postmaster but not here */
2830 }
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2778
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGUSR1
Definition: win32.h:211
#define SIGCONT
Definition: win32.h:205
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2789
#define SIGWINCH
Definition: win32.h:209
#define SIGTTIN
Definition: win32.h:207
#define SIGQUIT
Definition: win32.h:197
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2765
#define SIG_IGN
Definition: win32.h:193
#define SIGPIPE
Definition: win32.h:201
#define SIGHUP
Definition: win32.h:196
#define SIG_DFL
Definition: win32.h:191
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIGTTOU
Definition: win32.h:208
void die(SIGNAL_ARGS)
Definition: postgres.c:2619
#define SIGCHLD
Definition: win32.h:206
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2558
#define SIGUSR2
Definition: win32.h:212
static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1232 of file walsender.c.

References CHECK_FOR_INTERRUPTS, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGHUP, InvalidXLogRecPtr, MyLatch, MyProcPort, now(), NULL, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), walsender_ready_to_stop, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, and WalSnd::write.

Referenced by logical_read_xlog_page().

1233 {
1234  int wakeEvents;
1235  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1236 
1237 
1238  /*
1239  * Fast path to avoid acquiring the spinlock in the we already know we
1240  * have enough WAL available. This is particularly interesting if we're
1241  * far behind.
1242  */
1243  if (RecentFlushPtr != InvalidXLogRecPtr &&
1244  loc <= RecentFlushPtr)
1245  return RecentFlushPtr;
1246 
1247  /* Get a more recent flush pointer. */
1248  if (!RecoveryInProgress())
1249  RecentFlushPtr = GetFlushRecPtr();
1250  else
1251  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1252 
1253  for (;;)
1254  {
1255  long sleeptime;
1256  TimestampTz now;
1257 
1258  /*
1259  * Emergency bailout if postmaster has died. This is to avoid the
1260  * necessity for manual cleanup of all postmaster children.
1261  */
1262  if (!PostmasterIsAlive())
1263  exit(1);
1264 
1265  /* Clear any already-pending wakeups */
1267 
1269 
1270  /* Process any requests or signals received recently */
1271  if (got_SIGHUP)
1272  {
1273  got_SIGHUP = false;
1276  }
1277 
1278  /* Check for input from the client */
1280 
1281  /* Update our idea of the currently flushed position. */
1282  if (!RecoveryInProgress())
1283  RecentFlushPtr = GetFlushRecPtr();
1284  else
1285  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1286 
1287  /*
1288  * If postmaster asked us to stop, don't wait here anymore. This will
1289  * cause the xlogreader to return without reading a full record, which
1290  * is the fastest way to reach the mainloop which then can quit.
1291  *
1292  * It's important to do this check after the recomputation of
1293  * RecentFlushPtr, so we can send all remaining data before shutting
1294  * down.
1295  */
1297  break;
1298 
1299  /*
1300  * We only send regular messages to the client for full decoded
1301  * transactions, but a synchronous replication and walsender shutdown
1302  * possibly are waiting for a later location. So we send pings
1303  * containing the flush location every now and then.
1304  */
1305  if (MyWalSnd->flush < sentPtr &&
1306  MyWalSnd->write < sentPtr &&
1308  {
1309  WalSndKeepalive(false);
1311  }
1312 
1313  /* check whether we're done */
1314  if (loc <= RecentFlushPtr)
1315  break;
1316 
1317  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1318  WalSndCaughtUp = true;
1319 
1320  /*
1321  * Try to flush pending output to the client. Also wait for the socket
1322  * becoming writable, if there's still pending output after an attempt
1323  * to flush. Otherwise we might just sit on output data while waiting
1324  * for new WAL being generated.
1325  */
1326  if (pq_flush_if_writable() != 0)
1327  WalSndShutdown();
1328 
1329  now = GetCurrentTimestamp();
1330 
1331  /* die if timeout was reached */
1332  WalSndCheckTimeOut(now);
1333 
1334  /* Send keepalive if the time has come */
1336 
1337  sleeptime = WalSndComputeSleeptime(now);
1338 
1339  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1341 
1342  if (pq_is_send_pending())
1343  wakeEvents |= WL_SOCKET_WRITEABLE;
1344 
1345  /* Sleep until something happens or we time out */
1346  WaitLatchOrSocket(MyLatch, wakeEvents,
1347  MyProcPort->sock, sleeptime,
1349  }
1350 
1351  /* reactivate latch so WalSndLoop knows to continue */
1352  SetLatch(MyLatch);
1353  return RecentFlushPtr;
1354 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8204
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
bool RecoveryInProgress(void)
Definition: xlog.c:7855
pgsocket sock
Definition: libpq-be.h:118
XLogRecPtr flush
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:179
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3117
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11037
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void SyncRepInitConfig(void)
Definition: syncrep.c:377
static bool WalSndCaughtUp
Definition: walsender.c:176
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:223
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3136
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1886
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1928
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static bool waiting_for_ping_response
Definition: walsender.c:164
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1502
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
void WalSndWakeup ( void  )

Definition at line 2878 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, NULL, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

2879 {
2880  int i;
2881 
2882  for (i = 0; i < max_wal_senders; i++)
2883  {
2884  Latch *latch;
2885  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2886 
2887  /*
2888  * Get latch pointer with spinlock held, for the unlikely case that
2889  * pointer reads aren't atomic (as they're 8 bytes).
2890  */
2891  SpinLockAcquire(&walsnd->mutex);
2892  latch = walsnd->latch;
2893  SpinLockRelease(&walsnd->mutex);
2894 
2895  if (latch != NULL)
2896  SetLatch(latch);
2897  }
2898 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:114
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
#define NULL
Definition: c.h:229
int i
static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1144 of file walsender.c.

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), got_SIGHUP, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1146 {
1147  /* output previously gathered data in a CopyData packet */
1148  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1149 
1150  /*
1151  * Fill the send timestamp last, so that it is taken as late as possible.
1152  * This is somewhat ugly, but the protocol's set as it's already used for
1153  * several releases by streaming physical replication.
1154  */
1157  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1158  tmpbuf.data, sizeof(int64));
1159 
1160  /* fast path */
1161  /* Try to flush pending output to the client */
1162  if (pq_flush_if_writable() != 0)
1163  WalSndShutdown();
1164 
1165  if (!pq_is_send_pending())
1166  return;
1167 
1168  for (;;)
1169  {
1170  int wakeEvents;
1171  long sleeptime;
1172  TimestampTz now;
1173 
1174  /*
1175  * Emergency bailout if postmaster has died. This is to avoid the
1176  * necessity for manual cleanup of all postmaster children.
1177  */
1178  if (!PostmasterIsAlive())
1179  exit(1);
1180 
1181  /* Clear any already-pending wakeups */
1183 
1185 
1186  /* Process any requests or signals received recently */
1187  if (got_SIGHUP)
1188  {
1189  got_SIGHUP = false;
1192  }
1193 
1194  /* Check for input from the client */
1196 
1197  /* Try to flush pending output to the client */
1198  if (pq_flush_if_writable() != 0)
1199  WalSndShutdown();
1200 
1201  /* If we finished clearing the buffered data, we're done here. */
1202  if (!pq_is_send_pending())
1203  break;
1204 
1205  now = GetCurrentTimestamp();
1206 
1207  /* die if timeout was reached */
1208  WalSndCheckTimeOut(now);
1209 
1210  /* Send keepalive if the time has come */
1212 
1213  sleeptime = WalSndComputeSleeptime(now);
1214 
1215  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1217 
1218  /* Sleep until something happens or we time out */
1219  WaitLatchOrSocket(MyLatch, wakeEvents,
1220  MyProcPort->sock, sleeptime,
1222  }
1223 
1224  /* reactivate latch so WalSndLoop knows to continue */
1225  SetLatch(MyLatch);
1226 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
pgsocket sock
Definition: libpq-be.h:118
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:179
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
void SyncRepInitConfig(void)
Definition: syncrep.c:377
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:223
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3136
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1886
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1928
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
static StringInfoData tmpbuf
Definition: walsender.c:155
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:59
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1502
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static void WalSndXLogSendHandler ( SIGNAL_ARGS  )
static

Definition at line 2778 of file walsender.c.

References latch_sigusr1_handler().

Referenced by WalSndSignals().

2779 {
2780  int save_errno = errno;
2781 
2783 
2784  errno = save_errno;
2785 }
void latch_sigusr1_handler(void)
Definition: latch.c:1572
static void XLogRead ( char *  buf,
XLogRecPtr  startptr,
Size  count 
)
static

Definition at line 2180 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), buf, CheckXLogRemoved(), close, curFileTimeLine, ereport, errcode_for_file_access(), errmsg(), ERROR, MAXPGPATH, WalSnd::mutex, MyWalSnd, WalSnd::needreload, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sendFile, sendOff, sendSegNo, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, WAIT_EVENT_WAL_READ, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegSize.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2181 {
2182  char *p;
2183  XLogRecPtr recptr;
2184  Size nbytes;
2185  XLogSegNo segno;
2186 
2187 retry:
2188  p = buf;
2189  recptr = startptr;
2190  nbytes = count;
2191 
2192  while (nbytes > 0)
2193  {
2194  uint32 startoff;
2195  int segbytes;
2196  int readbytes;
2197 
2198  startoff = recptr % XLogSegSize;
2199 
2200  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2201  {
2202  char path[MAXPGPATH];
2203 
2204  /* Switch to another logfile segment */
2205  if (sendFile >= 0)
2206  close(sendFile);
2207 
2208  XLByteToSeg(recptr, sendSegNo);
2209 
2210  /*-------
2211  * When reading from a historic timeline, and there is a timeline
2212  * switch within this segment, read from the WAL segment belonging
2213  * to the new timeline.
2214  *
2215  * For example, imagine that this server is currently on timeline
2216  * 5, and we're streaming timeline 4. The switch from timeline 4
2217  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2218  *
2219  * ...
2220  * 000000040000000000000012
2221  * 000000040000000000000013
2222  * 000000050000000000000013
2223  * 000000050000000000000014
2224  * ...
2225  *
2226  * In this situation, when requested to send the WAL from
2227  * segment 0x13, on timeline 4, we read the WAL from file
2228  * 000000050000000000000013. Archive recovery prefers files from
2229  * newer timelines, so if the segment was restored from the
2230  * archive on this server, the file belonging to the old timeline,
2231  * 000000040000000000000013, might not exist. Their contents are
2232  * equal up to the switchpoint, because at a timeline switch, the
2233  * used portion of the old segment is copied to the new file.
2234  *-------
2235  */
2238  {
2239  XLogSegNo endSegNo;
2240 
2242  if (sendSegNo == endSegNo)
2244  }
2245 
2247 
2248  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2249  if (sendFile < 0)
2250  {
2251  /*
2252  * If the file is not found, assume it's because the standby
2253  * asked for a too old WAL segment that has already been
2254  * removed or recycled.
2255  */
2256  if (errno == ENOENT)
2257  ereport(ERROR,
2259  errmsg("requested WAL segment %s has already been removed",
2261  else
2262  ereport(ERROR,
2264  errmsg("could not open file \"%s\": %m",
2265  path)));
2266  }
2267  sendOff = 0;
2268  }
2269 
2270  /* Need to seek in the file? */
2271  if (sendOff != startoff)
2272  {
2273  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2274  ereport(ERROR,
2276  errmsg("could not seek in log segment %s to offset %u: %m",
2278  startoff)));
2279  sendOff = startoff;
2280  }
2281 
2282  /* How many bytes are within this segment? */
2283  if (nbytes > (XLogSegSize - startoff))
2284  segbytes = XLogSegSize - startoff;
2285  else
2286  segbytes = nbytes;
2287 
2289  readbytes = read(sendFile, p, segbytes);
2291  if (readbytes <= 0)
2292  {
2293  ereport(ERROR,
2295  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2297  sendOff, (unsigned long) segbytes)));
2298  }
2299 
2300  /* Update state for read */
2301  recptr += readbytes;
2302 
2303  sendOff += readbytes;
2304  nbytes -= readbytes;
2305  p += readbytes;
2306  }
2307 
2308  /*
2309  * After reading into the buffer, check that what we read was valid. We do
2310  * this after reading, because even though the segment was present when we
2311  * opened it, it might get recycled or removed while we read it. The
2312  * read() succeeds in that case, but the data we tried to read might
2313  * already have been overwritten with new WAL records.
2314  */
2315  XLByteToSeg(startptr, segno);
2317 
2318  /*
2319  * During recovery, the currently-open WAL file might be replaced with the
2320  * file of the same name retrieved from archive. So we always need to
2321  * check what we read was valid after reading into the buffer. If it's
2322  * invalid, we try to open and read the file again.
2323  */
2325  {
2326  WalSnd *walsnd = MyWalSnd;
2327  bool reload;
2328 
2329  SpinLockAcquire(&walsnd->mutex);
2330  reload = walsnd->needreload;
2331  walsnd->needreload = false;
2332  SpinLockRelease(&walsnd->mutex);
2333 
2334  if (reload && sendFile >= 0)
2335  {
2336  close(sendFile);
2337  sendFile = -1;
2338 
2339  goto retry;
2340  }
2341  }
2342 }
#define XLogSegSize
Definition: xlog_internal.h:92
static int sendFile
Definition: walsender.c:128
#define PG_BINARY
Definition: c.h:1038
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3756
#define MAXPGPATH
static TimeLineID curFileTimeLine
Definition: walsender.c:133
#define XLogFilePath(path, tli, logSegNo)
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10092
static char * buf
Definition: pg_test_fsync.c:65
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1205
#define ereport(elevel, rest)
Definition: elog.h:122
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:105
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define XLByteToSeg(xlrp, logSegNo)
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool needreload
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1181
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
static XLogSegNo sendSegNo
Definition: walsender.c:129
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 sendOff
Definition: walsender.c:130
#define close(a)
Definition: win32.h:17
#define XLByteInSeg(xlrp, logSegNo)
#define read(a, b, c)
Definition: win32.h:18
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
bool am_cascading_walsender
Definition: walsender.c:109
int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:899
static void XLogSendLogical ( void  )
static

Definition at line 2614 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, NULL, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2615 {
2616  XLogRecord *record;
2617  char *errm;
2618 
2619  /*
2620  * Don't know whether we've caught up yet. We'll set it to true in
2621  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2622  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2623  * i.e. when we're shutting down.
2624  */
2625  WalSndCaughtUp = false;
2626 
2629 
2630  /* xlog record was invalid */
2631  if (errm != NULL)
2632  elog(ERROR, "%s", errm);
2633 
2634  if (record != NULL)
2635  {
2636  /*
2637  * Note the lack of any call to LagTrackerWrite() which is the responsibility
2638  * of the logical decoding plugin. Response messages are handled normally,
2639  * so this responsibility does not extend to needing to call LagTrackerRead().
2640  */
2642 
2644  }
2645  else
2646  {
2647  /*
2648  * If the record we just wanted read is at or beyond the flushed
2649  * point, then we're caught up.
2650  */
2652  WalSndCaughtUp = true;
2653  }
2654 
2655  /* Update shared memory status */
2656  {
2657  WalSnd *walsnd = MyWalSnd;
2658 
2659  SpinLockAcquire(&walsnd->mutex);
2660  walsnd->sentPtr = sentPtr;
2661  SpinLockRelease(&walsnd->mutex);
2662  }
2663 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8204
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:190
static XLogRecPtr logical_startptr
Definition: walsender.c:191
static bool WalSndCaughtUp
Definition: walsender.c:176
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
#define NULL
Definition: c.h:229
XLogReaderState * reader
Definition: logical.h:38
#define elog
Definition: elog.h:219
static void XLogSendPhysical ( void  )
static

Definition at line 2355 of file walsender.c.

References am_cascading_walsender, Assert, close, StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), LagTrackerWrite(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, NULL, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendFile, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, WalSndCaughtUp, and XLogRead().

Referenced by StartReplication().

2356 {
2357  XLogRecPtr SendRqstPtr;
2358  XLogRecPtr startptr;
2359  XLogRecPtr endptr;
2360  Size nbytes;
2361 
2363  {
2364  WalSndCaughtUp = true;
2365  return;
2366  }
2367 
2368  /* Figure out how far we can safely send the WAL. */
2370  {
2371  /*
2372  * Streaming an old timeline that's in this server's history, but is
2373  * not the one we're currently inserting or replaying. It can be
2374  * streamed up to the point where we switched off that timeline.
2375  */
2376  SendRqstPtr = sendTimeLineValidUpto;
2377  }
2378  else if (am_cascading_walsender)
2379  {
2380  /*
2381  * Streaming the latest timeline on a standby.
2382  *
2383  * Attempt to send all WAL that has already been replayed, so that we
2384  * know it's valid. If we're receiving WAL through streaming
2385  * replication, it's also OK to send any WAL that has been received
2386  * but not replayed.
2387  *
2388  * The timeline we're recovering from can change, or we can be
2389  * promoted. In either case, the current timeline becomes historic. We
2390  * need to detect that so that we don't try to stream past the point
2391  * where we switched to another timeline. We check for promotion or
2392  * timeline switch after calculating FlushPtr, to avoid a race
2393  * condition: if the timeline becomes historic just after we checked
2394  * that it was still current, it's still be OK to stream it up to the
2395  * FlushPtr that was calculated before it became historic.
2396  */
2397  bool becameHistoric = false;
2398 
2399  SendRqstPtr = GetStandbyFlushRecPtr();
2400 
2401  if (!RecoveryInProgress())
2402  {
2403  /*
2404  * We have been promoted. RecoveryInProgress() updated
2405  * ThisTimeLineID to the new current timeline.
2406  */
2407  am_cascading_walsender = false;
2408  becameHistoric = true;
2409  }
2410  else
2411  {
2412  /*
2413  * Still a cascading standby. But is the timeline we're sending
2414  * still the one recovery is recovering from? ThisTimeLineID was
2415  * updated by the GetStandbyFlushRecPtr() call above.
2416  */
2418  becameHistoric = true;
2419  }
2420 
2421  if (becameHistoric)
2422  {
2423  /*
2424  * The timeline we were sending has become historic. Read the
2425  * timeline history file of the new timeline to see where exactly
2426  * we forked off from the timeline we were sending.
2427  */
2428  List *history;
2429 
2432 
2434  list_free_deep(history);
2435 
2436  sendTimeLineIsHistoric = true;
2437 
2438  SendRqstPtr = sendTimeLineValidUpto;
2439  }
2440  }
2441  else
2442  {
2443  /*
2444  * Streaming the current timeline on a master.
2445  *
2446  * Attempt to send all data that's already been written out and
2447  * fsync'd to disk. We cannot go further than what's been written out
2448  * given the current implementation of XLogRead(). And in any case
2449  * it's unsafe to send WAL that is not securely down to disk on the
2450  * master: if the master subsequently crashes and restarts, slaves
2451  * must not have applied any WAL that gets lost on the master.
2452  */
2453  SendRqstPtr = GetFlushRecPtr();
2454  }
2455 
2456  /*
2457  * Record the current system time as an approximation of the time at which
2458  * this WAL position was written for the purposes of lag tracking.
2459  *
2460  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2461  * is flushed and we could get that time as well as the LSN when we call
2462  * GetFlushRecPtr() above (and likewise for the cascading standby
2463  * equivalent), but rather than putting any new code into the hot WAL path
2464  * it seems good enough to capture the time here. We should reach this
2465  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2466  * may take some time, we read the WAL flush pointer and take the time
2467  * very close to together here so that we'll get a later position if it
2468  * is still moving.
2469  *
2470  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2471  * this gives us a cheap approximation for the WAL flush time for this
2472  * LSN.
2473  *
2474  * Note that the LSN is not necessarily the LSN for the data contained in
2475  * the present message; it's the end of the the WAL, which might be
2476  * further ahead. All the lag tracking machinery cares about is finding
2477  * out when that arbitrary LSN is eventually reported as written, flushed
2478  * and applied, so that it can measure the elapsed time.
2479  */
2480  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2481 
2482  /*
2483  * If this is a historic timeline and we've reached the point where we
2484  * forked to the next timeline, stop streaming.
2485  *
2486  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2487  * startup process will normally replay all WAL that has been received
2488  * from the master, before promoting, but if the WAL streaming is
2489  * terminated at a WAL page boundary, the valid portion of the timeline
2490  * might end in the middle of a WAL record. We might've already sent the
2491  * first half of that partial WAL record to the cascading standby, so that
2492  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2493  * replay the partial WAL record either, so it can still follow our
2494  * timeline switch.
2495  */
2497  {
2498  /* close the current file. */
2499  if (sendFile >= 0)
2500  close(sendFile);
2501  sendFile = -1;
2502 
2503  /* Send CopyDone */
2504  pq_putmessage_noblock('c', NULL, 0);
2505  streamingDoneSending = true;
2506 
2507  WalSndCaughtUp = true;
2508 
2509  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2511  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2512  return;
2513  }
2514 
2515  /* Do we have any work to do? */
2516  Assert(sentPtr <= SendRqstPtr);
2517  if (SendRqstPtr <= sentPtr)
2518  {
2519  WalSndCaughtUp = true;
2520  return;
2521  }
2522 
2523  /*
2524  * Figure out how much to send in one message. If there's no more than
2525  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2526  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2527  *
2528  * The rounding is not only for performance reasons. Walreceiver relies on
2529  * the fact that we never split a WAL record across two messages. Since a
2530  * long WAL record is split at page boundary into continuation records,
2531  * page boundary is always a safe cut-off point. We also assume that
2532  * SendRqstPtr never points to the middle of a WAL record.
2533  */
2534  startptr = sentPtr;
2535  endptr = startptr;
2536  endptr += MAX_SEND_SIZE;
2537 
2538  /* if we went beyond SendRqstPtr, back off */
2539  if (SendRqstPtr <= endptr)
2540  {
2541  endptr = SendRqstPtr;
2543  WalSndCaughtUp = false;
2544  else
2545  WalSndCaughtUp = true;
2546  }
2547  else
2548  {
2549  /* round down to page boundary. */
2550  endptr -= (endptr % XLOG_BLCKSZ);
2551  WalSndCaughtUp = false;
2552  }
2553 
2554  nbytes = endptr - startptr;
2555  Assert(nbytes <= MAX_SEND_SIZE);
2556 
2557  /*
2558  * OK to read and send the slice.
2559  */
2561  pq_sendbyte(&output_message, 'w');
2562 
2563  pq_sendint64(&output_message, startptr); /* dataStart */
2564  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2565  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2566 
2567  /*
2568  * Read the log directly into the output buffer to avoid extra memcpy
2569  * calls.
2570  */
2572  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2573  output_message.len += nbytes;
2575 
2576  /*
2577  * Fill the send timestamp last, so that it is taken as late as possible.
2578  */
2581  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2582  tmpbuf.data, sizeof(int64));
2583 
2585 
2586  sentPtr = endptr;
2587 
2588  /* Update shared memory status */
2589  {
2590  WalSnd *walsnd = MyWalSnd;
2591 
2592  SpinLockAcquire(&walsnd->mutex);
2593  walsnd->sentPtr = sentPtr;
2594  SpinLockRelease(&walsnd->mutex);
2595  }
2596 
2597  /* Report progress of XLOG streaming in PS display */
2599  {
2600  char activitymsg[50];
2601 
2602  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2603  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2604  set_ps_display(activitymsg, false);
2605  }
2606 
2607  return;
2608 }
#define DEBUG1
Definition: elog.h:25
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
bool update_process_title
Definition: ps_status.c:35
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3176
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static int sendFile
Definition: walsender.c:128
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
static StringInfoData output_message
Definition: walsender.c:153
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8204
bool RecoveryInProgress(void)
Definition: xlog.c:7855
void list_free_deep(List *list)
Definition: list.c:1147
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
static bool streamingDoneSending
Definition: walsender.c:172
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:277
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
unsigned int uint32
Definition: c.h:268
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2180
static bool WalSndCaughtUp
Definition: walsender.c:176
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define NULL
Definition: c.h:229
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
static StringInfoData tmpbuf
Definition: walsender.c:155
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2716
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:17
Definition: pg_list.h:45
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define MAX_SEND_SIZE
Definition: walsender.c:99
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
bool am_cascading_walsender
Definition: walsender.c:109

Variable Documentation

bool am_db_walsender = false

Definition at line 111 of file walsender.c.

Referenced by check_db(), InitPostgres(), and ProcessStartupPacket().

Definition at line 207 of file walsender.c.

Referenced by _ShowOption(), allocNewBuffer(), bitgetpage(), blbulkdelete(), blgetbitmap(), blinsert(), BloomNewBuffer(), blvacuumcleanup(), BootStrapXLOG(), brin_xlog_insert_update(), brin_xlog_samepage_update(), brin_xlog_update(), bt_metap(), bt_page_items(), bt_page_stats(), btree_xlog_delete(), btree_xlog_insert(), btree_xlog_mark_page_halfdead(), btree_xlog_newroot(), btree_xlog_split(), btree_xlog_unlink_page(), btree_xlog_vacuum(), collect_corrupt_items(), collect_visibility_data(), convertToJsonb(), copy_file(), copyFile(), CreateLockFile(), createPostingTree(), CreateSharedBackendStatus(), des_cipher(), DllRegisterServer(), ecpg_process_output(), EvalPlanQualFetch(), EvalPlanQualFetchRowMarks(), ExecCheckTIDVisible(), ExecLockRows(), ExecOnConflictUpdate(), ExecStoreTuple(), fill_hba_line(), flushCachedPage(), ForgetPrivateRefCountEntry(), GenericXLogRegisterBuffer(), GetConfigOption(), GetConfigOptionByNum(), GetConfigOptionResetString(), getObjectDescription(), getObjectIdentityParts(), getObjectTypeDescription(), GetPrivateRefCountEntry(), gets_fromFile(), GetTupleForTrigger(), ginbulkdelete(), ginFindParents(), ginHeapTupleFastInsert(), ginInsertCleanup(), GinNewBuffer(), ginRedoClearIncompleteSplit(), ginRedoCreatePTree(), ginRedoDeleteListPages(), ginRedoInsert(), ginRedoInsertListPage(), ginRedoUpdateMetapage(), ginRedoVacuumDataLeafPage(), ginRedoVacuumPage(), ginScanToDelete(), ginvacuumcleanup(), ginVacuumPostingTreeLeaves(), gistBufferingFindCorrectParent(), gistbuild(), gistbuildempty(), gistbulkdelete(), gistFindPath(), gistGetMaxLevel(), gistkillitems(), gistNewBuffer(), gistplacetopage(), gistProcessItup(), gistRedoClearFollowRight(), gistRedoCreateIndex(), gistRedoPageSplitRecord(), gistRedoPageUpdateRecord(), gistScanPage(), gistvacuumcleanup(), hash_xlog_insert(), hash_xlog_split_cleanup(), hash_xlog_vacuum_one_page(), heap_abort_speculative(), heap_delete(), heap_fetch(), heap_finish_speculative(), heap_get_latest_tid(), heap_hot_search(), heap_inplace_update(), heap_insert(), heap_multi_insert(), heap_update(), heap_xlog_clean(), heap_xlog_confirm(), heap_xlog_delete(), heap_xlog_freeze_page(), heap_xlog_inplace(), heap_xlog_insert(), heap_xlog_lock(), heap_xlog_lock_updated(), heap_xlog_multi_insert(), heap_xlog_visible(), heapgetpage(), helpSQL(), init(), initBloomState(), main(), NewPrivateRefCountEntry(), palloc_btree_page(), pg_timezone_abbrevs(), pg_vfprintf(), pg_visibility(), pgss_shmem_startup(), pgstat_get_crashed_backend_activity(), pgstat_heap(), pgstatginindex_internal(), pgstatindex_impl(), PGTYPESnumeric_from_double(), PLy_get_sqlerrcode(), PQunescapeBytea(), print_addr(), ReadBufferBI(), ReadControlFile(), readfile(), RecheckDataDirLockFile(), RelationAddExtraBlocks(), RelationGetBufferForTuple(), ReleaseAndReadBuffer(), ReorderBufferAllocate(), replace_variables(), report_fork_failure_to_client(), report_multiple_error_messages(), RestoreArchive(), rewind_parseTimeLineHistory(), RewriteControlFile(), rewriteVisibilityMap(), scanPostingTree(), seq_redo(), slurpFile(), smgrextend(), smgrread(), smgrwrite(), SpGistGetBuffer(), SpGistNewBuffer(), spgprocesspending(), spgRedoAddLeaf(), spgRedoAddNode(), spgRedoCreateIndex(), spgRedoMoveLeafs(), spgRedoSplitTuple(), spgRedoVacuumLeaf(), spgRedoVacuumRedirect(), spgRedoVacuumRoot(), spgvacuumpage(), spgWalk(), TidNext(), TouchSocketLockFiles(), updateControlFile(), uuid_recv(), uuid_send(), WriteControlFile(), WriteEmptyXLOG(), writeTimeLineHistory(), xlog_redo(), XLogFileCopy(), XLogReadBufferExtended(), and XLogReadRecord().

TimeLineID curFileTimeLine = 0
static

Definition at line 133 of file walsender.c.

Referenced by XLogRead().

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 179 of file walsender.c.

Referenced by WalSndLoop(), WalSndSigHupHandler(), WalSndWaitForWal(), and WalSndWriteData().

struct { ... } LagTracker
XLogRecPtr last_lsn

Definition at line 206 of file walsender.c.

Referenced by UpdateWorkerStats().

Definition at line 210 of file walsender.c.

Referenced by autovac_refresh_stats().

bool log_replication_commands = false

Definition at line 117 of file walsender.c.

Referenced by exec_replication_command().

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 190 of file walsender.c.

XLogRecPtr logical_startptr = InvalidXLogRecPtr
static

Definition at line 191 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

StringInfoData output_message
static

Definition at line 153 of file walsender.c.

int read_heads[NUM_SYNC_REP_WAIT_MODE]

Definition at line 209 of file walsender.c.

volatile sig_atomic_t replication_active = false
static
StringInfoData reply_message
static

Definition at line 154 of file walsender.c.

int sendFile = -1
static

Definition at line 128 of file walsender.c.

Referenced by WalSndErrorCleanup(), XLogDumpXLogRead(), XLogRead(), and XLogSendPhysical().

uint32 sendOff = 0
static

Definition at line 130 of file walsender.c.

Referenced by XLogDumpXLogRead(), and XLogRead().

XLogSegNo sendSegNo = 0
static

Definition at line 129 of file walsender.c.

Referenced by XLogDumpXLogRead(), and XLogRead().

TimeLineID sendTimeLine = 0
static