PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   11
 

Typedefs

typedef void(* WalSndSendDataCallback )(void)
 

Functions

static void WalSndSigHupHandler (SIGNAL_ARGS)
 
static void WalSndXLogSendHandler (SIGNAL_ARGS)
 
static void WalSndSwitchStopping (SIGNAL_ARGS)
 
static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (TimestampTz now)
 
static void XLogRead (char *buf, XLogRecPtr startptr, Size count)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (TimestampTz now)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static int sendFile = -1
 
static XLogSegNo sendSegNo = 0
 
static uint32 sendOff = 0
 
static TimeLineID curFileTimeLine = 0
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGINT = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
struct {
   XLogRecPtr   last_lsn
 
   WalTimeSample   buffer [LAG_TRACKER_BUFFER_SIZE]
 
   int   write_head
 
   int   read_heads [NUM_SYNC_REP_WAIT_MODE]
 
   WalTimeSample   last_read [NUM_SYNC_REP_WAIT_MODE]
 
LagTracker
 

Macro Definition Documentation

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 205 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 102 of file walsender.c.

Referenced by XLogSendPhysical().

#define PG_STAT_GET_WAL_SENDERS_COLS   11

Referenced by pg_stat_get_wal_senders().

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

typedef void(* WalSndSendDataCallback)(void)

Definition at line 224 of file walsender.c.

Function Documentation

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 842 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, NULL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf(), CreateReplicationSlotCmd::temporary, TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), and main().

843 {
844  const char *snapshot_name = NULL;
845  char xloc[MAXFNAMELEN];
846  char *slot_name;
847  bool reserve_wal = false;
848  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
849  DestReceiver *dest;
850  TupOutputState *tstate;
851  TupleDesc tupdesc;
852  Datum values[4];
853  bool nulls[4];
854 
856 
857  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
858 
859  /* setup state for XLogReadPage */
860  sendTimeLineIsHistoric = false;
862 
863  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
864  {
865  ReplicationSlotCreate(cmd->slotname, false,
867  }
868  else
869  {
871 
872  /*
873  * Initially create persistent slot as ephemeral - that allows us to
874  * nicely handle errors during initialization because it'll get
875  * dropped if this transaction fails. We'll make it persistent at the
876  * end. Temporary slots can be created as temporary from beginning as
877  * they get dropped on error as well.
878  */
879  ReplicationSlotCreate(cmd->slotname, true,
881  }
882 
883  if (cmd->kind == REPLICATION_KIND_LOGICAL)
884  {
886  bool need_full_snapshot = false;
887 
888  /*
889  * Do options check early so that we can bail before calling the
890  * DecodingContextFindStartpoint which can take long time.
891  */
892  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
893  {
894  if (IsTransactionBlock())
895  ereport(ERROR,
896  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
897  "must not be called inside a transaction")));
898 
899  need_full_snapshot = true;
900  }
901  else if (snapshot_action == CRS_USE_SNAPSHOT)
902  {
903  if (!IsTransactionBlock())
904  ereport(ERROR,
905  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
906  "must be called inside a transaction")));
907 
909  ereport(ERROR,
910  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
911  "must be called in REPEATABLE READ isolation mode transaction")));
912 
913  if (FirstSnapshotSet)
914  ereport(ERROR,
915  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
916  "must be called before any query")));
917 
918  if (IsSubTransaction())
919  ereport(ERROR,
920  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
921  "must not be called in a subtransaction")));
922 
923  need_full_snapshot = true;
924  }
925 
926  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
930 
931  /*
932  * Signal that we don't need the timeout mechanism. We're just
933  * creating the replication slot and don't yet accept feedback
934  * messages or send keepalives. As we possibly need to wait for
935  * further WAL the walsender would otherwise possibly be killed too
936  * soon.
937  */
939 
940  /* build initial snapshot, might take a while */
942 
943  /*
944  * Export or use the snapshot if we've been asked to do so.
945  *
946  * NB. We will convert the snapbuild.c kind of snapshot to normal
947  * snapshot when doing this.
948  */
949  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
950  {
951  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
952  }
953  else if (snapshot_action == CRS_USE_SNAPSHOT)
954  {
955  Snapshot snap;
956 
959  }
960 
961  /* don't need the decoding context anymore */
962  FreeDecodingContext(ctx);
963 
964  if (!cmd->temporary)
966  }
967  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
968  {
970 
972 
973  /* Write this slot to disk if it's a permanent one. */
974  if (!cmd->temporary)
976  }
977 
978  snprintf(xloc, sizeof(xloc), "%X/%X",
981 
983  MemSet(nulls, false, sizeof(nulls));
984 
985  /*----------
986  * Need a tuple descriptor representing four columns:
987  * - first field: the slot name
988  * - second field: LSN at which we became consistent
989  * - third field: exported snapshot's name
990  * - fourth field: output plugin
991  *----------
992  */
993  tupdesc = CreateTemplateTupleDesc(4, false);
994  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
995  TEXTOID, -1, 0);
996  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
997  TEXTOID, -1, 0);
998  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
999  TEXTOID, -1, 0);
1000  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1001  TEXTOID, -1, 0);
1002 
1003  /* prepare for projection of tuples */
1004  tstate = begin_tup_output_tupdesc(dest, tupdesc);
1005 
1006  /* slot_name */
1007  slot_name = NameStr(MyReplicationSlot->data.name);
1008  values[0] = CStringGetTextDatum(slot_name);
1009 
1010  /* consistent wal location */
1011  values[1] = CStringGetTextDatum(xloc);
1012 
1013  /* snapshot name, or NULL if none */
1014  if (snapshot_name != NULL)
1015  values[2] = CStringGetTextDatum(snapshot_name);
1016  else
1017  nulls[2] = true;
1018 
1019  /* plugin, or NULL if none */
1020  if (cmd->plugin != NULL)
1021  values[3] = CStringGetTextDatum(cmd->plugin);
1022  else
1023  nulls[3] = true;
1024 
1025  /* send it to dest */
1026  do_tup_output(tstate, values, nulls);
1027  end_tup_output(tstate);
1028 
1030 }
#define NIL
Definition: pg_list.h:69
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:789
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:30
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2142
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:220
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ReplicationSlotSave(void)
Definition: slot.c:579
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:216
ReplicationSlotPersistentData data
Definition: slot.h:115
XLogRecPtr confirmed_flush
Definition: slot.h:76
ReplicationKind kind
Definition: replnodes.h:56
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:754
bool IsTransactionBlock(void)
Definition: xact.c:4306
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:635
void ReplicationSlotReserveWal(void)
Definition: slot.c:926
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:432
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:203
void ReplicationSlotPersist(void)
Definition: slot.c:614
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1162
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:375
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1135
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:372
TimeLineID ThisTimeLineID
Definition: xlog.c:179
struct SnapBuild * snapshot_builder
Definition: logical.h:46
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
static TimeLineID sendTimeLine
Definition: walsender.c:144
#define Assert(condition)
Definition: c.h:675
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:475
int XactIsoLevel
Definition: xact.c:74
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
bool IsSubTransaction(void)
Definition: xact.c:4378
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:545
static Datum values[MAXATTR]
Definition: bootstrap.c:163
int errmsg(const char *fmt,...)
Definition: elog.c:797
CRSSnapshotAction
Definition: walsender.h:22
#define NameStr(name)
Definition: c.h:499
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
static bool sendTimeLineIsHistoric
Definition: walsender.c:146
void ReplicationSlotMarkDirty(void)
Definition: slot.c:597
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1252
static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1036 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), and DropReplicationSlotCmd::slotname.

Referenced by exec_replication_command(), and main().

1037 {
1039  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1040 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void ReplicationSlotDrop(const char *name)
Definition: slot.c:456
bool exec_replication_command ( const char *  cmd_string)

Definition at line 1413 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_SIGUSR2, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, PreventTransactionChain(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), WalSnd::state, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1414 {
1415  int parse_rc;
1416  Node *cmd_node;
1417  MemoryContext cmd_context;
1418  MemoryContext old_context;
1419 
1420  /*
1421  * If WAL sender has been told that shutdown is getting close, switch its
1422  * status accordingly to handle the next replication commands correctly.
1423  */
1424  if (got_SIGUSR2)
1426 
1427  /*
1428  * Throw error if in stopping mode. We need prevent commands that could
1429  * generate WAL while the shutdown checkpoint is being written. To be
1430  * safe, we just prohibit all new commands.
1431  */
1433  ereport(ERROR,
1434  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1435 
1436  /*
1437  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1438  * command arrives. Clean up the old stuff if there's anything.
1439  */
1441 
1443 
1445  "Replication command context",
1447  old_context = MemoryContextSwitchTo(cmd_context);
1448 
1449  replication_scanner_init(cmd_string);
1450  parse_rc = replication_yyparse();
1451  if (parse_rc != 0)
1452  ereport(ERROR,
1453  (errcode(ERRCODE_SYNTAX_ERROR),
1454  (errmsg_internal("replication command parser returned %d",
1455  parse_rc))));
1456 
1457  cmd_node = replication_parse_result;
1458 
1459  /*
1460  * Log replication command if log_replication_commands is enabled. Even
1461  * when it's disabled, log the command with DEBUG1 level for backward
1462  * compatibility. Note that SQL commands are not logged here, and will be
1463  * logged later if log_statement is enabled.
1464  */
1465  if (cmd_node->type != T_SQLCmd)
1467  (errmsg("received replication command: %s", cmd_string)));
1468 
1469  /*
1470  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1471  * called outside of transaction the snapshot should be cleared here.
1472  */
1473  if (!IsTransactionBlock())
1475 
1476  /*
1477  * For aborted transactions, don't allow anything except pure SQL, the
1478  * exec_simple_query() will handle it correctly.
1479  */
1480  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1481  ereport(ERROR,
1482  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1483  errmsg("current transaction is aborted, "
1484  "commands ignored until end of transaction block")));
1485 
1487 
1488  /*
1489  * Allocate buffers that will be used for each outgoing and incoming
1490  * message. We do this just once per command to reduce palloc overhead.
1491  */
1495 
1496  switch (cmd_node->type)
1497  {
1498  case T_IdentifySystemCmd:
1499  IdentifySystem();
1500  break;
1501 
1502  case T_BaseBackupCmd:
1503  PreventTransactionChain(true, "BASE_BACKUP");
1504  SendBaseBackup((BaseBackupCmd *) cmd_node);
1505  break;
1506 
1509  break;
1510 
1513  break;
1514 
1515  case T_StartReplicationCmd:
1516  {
1517  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1518 
1519  PreventTransactionChain(true, "START_REPLICATION");
1520 
1521  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1522  StartReplication(cmd);
1523  else
1525  break;
1526  }
1527 
1528  case T_TimeLineHistoryCmd:
1529  PreventTransactionChain(true, "TIMELINE_HISTORY");
1531  break;
1532 
1533  case T_VariableShowStmt:
1534  {
1536  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1537 
1538  GetPGVariable(n->name, dest);
1539  }
1540  break;
1541 
1542  case T_SQLCmd:
1543  if (MyDatabaseId == InvalidOid)
1544  ereport(ERROR,
1545  (errmsg("not connected to database")));
1546 
1547  /* Tell the caller that this wasn't a WalSender command. */
1548  return false;
1549 
1550  default:
1551  elog(ERROR, "unrecognized replication command node tag: %u",
1552  cmd_node->type);
1553  }
1554 
1555  /* done */
1556  MemoryContextSwitchTo(old_context);
1557  MemoryContextDelete(cmd_context);
1558 
1559  /* Send CommandComplete message */
1560  EndCommand("SELECT", DestRemote);
1561 
1562  return true;
1563 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:429
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1036
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:370
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:509
static StringInfoData output_message
Definition: walsender.c:156
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7894
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4306
ReplicationKind kind
Definition: replnodes.h:81
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:686
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
WalSndState state
NodeTag type
Definition: nodes.h:511
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:525
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:184
#define ereport(elevel, rest)
Definition: elog.h:122
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static StringInfoData reply_message
Definition: walsender.c:157
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1047
Oid MyDatabaseId
Definition: globals.c:76
WalSnd * MyWalSnd
Definition: walsender.c:108
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
void WalSndSetState(WalSndState state)
Definition: walsender.c:3085
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:842
static StringInfoData tmpbuf
Definition: walsender.c:158
bool log_replication_commands
Definition: walsender.c:120
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
static void IdentifySystem(void)
Definition: walsender.c:340
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:696
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3156
void replication_scanner_init(const char *query_string)
static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2839 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), NULL, receiveTLI, result, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2840 {
2841  XLogRecPtr replayPtr;
2842  TimeLineID replayTLI;
2843  XLogRecPtr receivePtr;
2846 
2847  /*
2848  * We can safely send what's already been replayed. Also, if walreceiver
2849  * is streaming WAL from the same timeline, we can send anything that it
2850  * has streamed, but hasn't been replayed yet.
2851  */
2852 
2853  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2854  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2855 
2856  ThisTimeLineID = replayTLI;
2857 
2858  result = replayPtr;
2859  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2860  result = receivePtr;
2861 
2862  return result;
2863 }
uint32 TimeLineID
Definition: xlogdefs.h:45
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
return result
Definition: formatting.c:1632
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11084
static TimeLineID receiveTLI
Definition: xlog.c:201
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void IdentifySystem ( void  )
static

Definition at line 340 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, INT4OID, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, NULL, RecoveryInProgress(), snprintf(), StartTransactionCommand(), TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

341 {
342  char sysid[32];
343  char xloc[MAXFNAMELEN];
344  XLogRecPtr logptr;
345  char *dbname = NULL;
346  DestReceiver *dest;
347  TupOutputState *tstate;
348  TupleDesc tupdesc;
349  Datum values[4];
350  bool nulls[4];
351 
352  /*
353  * Reply with a result set with one row, four columns. First col is system
354  * ID, second is timeline ID, third is current xlog location and the
355  * fourth contains the database name if we are connected to one.
356  */
357 
358  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
360 
363  {
364  /* this also updates ThisTimeLineID */
365  logptr = GetStandbyFlushRecPtr();
366  }
367  else
368  logptr = GetFlushRecPtr();
369 
370  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
371 
372  if (MyDatabaseId != InvalidOid)
373  {
375 
376  /* syscache access needs a transaction env. */
378  /* make dbname live outside TX context */
382  /* CommitTransactionCommand switches to TopMemoryContext */
384  }
385 
387  MemSet(nulls, false, sizeof(nulls));
388 
389  /* need a tuple descriptor representing four columns */
390  tupdesc = CreateTemplateTupleDesc(4, false);
391  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
392  TEXTOID, -1, 0);
393  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
394  INT4OID, -1, 0);
395  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
396  TEXTOID, -1, 0);
397  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
398  TEXTOID, -1, 0);
399 
400  /* prepare for projection of tuples */
401  tstate = begin_tup_output_tupdesc(dest, tupdesc);
402 
403  /* column 1: system identifier */
404  values[0] = CStringGetTextDatum(sysid);
405 
406  /* column 2: timeline */
407  values[1] = Int32GetDatum(ThisTimeLineID);
408 
409  /* column 3: wal location */
410  values[2] = CStringGetTextDatum(xloc);
411 
412  /* column 4: database name, or NULL if none */
413  if (dbname)
414  values[3] = CStringGetTextDatum(dbname);
415  else
416  nulls[3] = true;
417 
418  /* send it to dest */
419  do_tup_output(tstate, values, nulls);
420 
421  end_tup_output(tstate);
422 }
#define TEXTOID
Definition: pg_type.h:324
void CommitTransactionCommand(void)
Definition: xact.c:2749
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define INT4OID
Definition: pg_type.h:316
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8222
bool RecoveryInProgress(void)
Definition: xlog.c:7873
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2056
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
unsigned int uint32
Definition: c.h:268
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:76
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2679
char * dbname
Definition: streamutil.c:38
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static Datum values[MAXATTR]
Definition: bootstrap.c:163
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4690
#define Int32GetDatum(X)
Definition: postgres.h:485
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2839
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:316
bool am_cascading_walsender
Definition: walsender.c:112
static void InitWalSenderSlot ( void  )
static

Definition at line 2215 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, ereport, errcode(), errmsg(), FATAL, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, NULL, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

Referenced by WalSndShutdown().

2216 {
2217  int i;
2218 
2219  /*
2220  * WalSndCtl should be set up already (we inherit this by fork() or
2221  * EXEC_BACKEND mechanism from the postmaster).
2222  */
2223  Assert(WalSndCtl != NULL);
2224  Assert(MyWalSnd == NULL);
2225 
2226  /*
2227  * Find a free walsender slot and reserve it. If this fails, we must be
2228  * out of WalSnd structures.
2229  */
2230  for (i = 0; i < max_wal_senders; i++)
2231  {
2232  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2233 
2234  SpinLockAcquire(&walsnd->mutex);
2235 
2236  if (walsnd->pid != 0)
2237  {
2238  SpinLockRelease(&walsnd->mutex);
2239  continue;
2240  }
2241  else
2242  {
2243  /*
2244  * Found a free slot. Reserve it for us.
2245  */
2246  walsnd->pid = MyProcPid;
2247  walsnd->sentPtr = InvalidXLogRecPtr;
2248  walsnd->write = InvalidXLogRecPtr;
2249  walsnd->flush = InvalidXLogRecPtr;
2250  walsnd->apply = InvalidXLogRecPtr;
2251  walsnd->writeLag = -1;
2252  walsnd->flushLag = -1;
2253  walsnd->applyLag = -1;
2254  walsnd->state = WALSNDSTATE_STARTUP;
2255  walsnd->latch = &MyProc->procLatch;
2256  SpinLockRelease(&walsnd->mutex);
2257  /* don't need the lock anymore */
2258  MyWalSnd = (WalSnd *) walsnd;
2259 
2260  break;
2261  }
2262  }
2263  if (MyWalSnd == NULL)
2264  ereport(FATAL,
2265  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2266  errmsg("number of requested standby connections "
2267  "exceeds max_wal_senders (currently %d)",
2268  max_wal_senders)));
2269 
2270  /* Arrange to clean up at walsender exit */
2272 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:38
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
int errcode(int sqlerrcode)
Definition: elog.c:575
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2276
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:103
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define FATAL
Definition: elog.h:52
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:117
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:108
TimeOffset applyLag
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
XLogRecPtr apply
static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3425 of file walsender.c.

References Assert, LAG_TRACKER_BUFFER_SIZE, LagTracker, WalTimeSample::lsn, next, and WalTimeSample::time.

Referenced by ProcessStandbyReplyMessage().

3426 {
3427  TimestampTz time = 0;
3428 
3429  /* Read all unread samples up to this LSN or end of buffer. */
3430  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3431  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3432  {
3433  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3434  LagTracker.last_read[head] =
3435  LagTracker.buffer[LagTracker.read_heads[head]];
3436  LagTracker.read_heads[head] =
3437  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3438  }
3439 
3440  if (time > now)
3441  {
3442  /* If the clock somehow went backwards, treat as not found. */
3443  return -1;
3444  }
3445  else if (time == 0)
3446  {
3447  /*
3448  * We didn't cross a time. If there is a future sample that we
3449  * haven't reached yet, and we've already reached at least one sample,
3450  * let's interpolate the local flushed time. This is mainly useful
3451  * for reporting a completely stuck apply position as having
3452  * increasing lag, since otherwise we'd have to wait for it to
3453  * eventually start moving again and cross one of our samples before
3454  * we can show the lag increasing.
3455  */
3456  if (LagTracker.read_heads[head] != LagTracker.write_head &&
3457  LagTracker.last_read[head].time != 0)
3458  {
3459  double fraction;
3460  WalTimeSample prev = LagTracker.last_read[head];
3461  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3462 
3463  if (lsn < prev.lsn)
3464  {
3465  /*
3466  * Reported LSNs shouldn't normally go backwards, but it's
3467  * possible when there is a timeline change. Treat as not
3468  * found.
3469  */
3470  return -1;
3471  }
3472 
3473  Assert(prev.lsn < next.lsn);
3474 
3475  if (prev.time > next.time)
3476  {
3477  /* If the clock somehow went backwards, treat as not found. */
3478  return -1;
3479  }
3480 
3481  /* See how far we are between the previous and next samples. */
3482  fraction =
3483  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3484 
3485  /* Scale the local flush time proportionally. */
3486  time = (TimestampTz)
3487  ((double) prev.time + (next.time - prev.time) * fraction);
3488  }
3489  else
3490  {
3491  /* Couldn't interpolate due to lack of data. */
3492  return -1;
3493  }
3494  }
3495 
3496  /* Return the elapsed time since local flush time in microseconds. */
3497  Assert(time != 0);
3498  return now - time;
3499 }
static int32 next
Definition: blutils.c:210
int64 TimestampTz
Definition: timestamp.h:39
TimestampTz time
Definition: walsender.c:201
static struct @27 LagTracker
XLogRecPtr lsn
Definition: walsender.c:200
#define Assert(condition)
Definition: c.h:675
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:205
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3360 of file walsender.c.

References am_walsender, i, LAG_TRACKER_BUFFER_SIZE, LagTracker, and NUM_SYNC_REP_WAIT_MODE.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3361 {
3362  bool buffer_full;
3363  int new_write_head;
3364  int i;
3365 
3366  if (!am_walsender)
3367  return;
3368 
3369  /*
3370  * If the lsn hasn't advanced since last time, then do nothing. This way
3371  * we only record a new sample when new WAL has been written.
3372  */
3373  if (LagTracker.last_lsn == lsn)
3374  return;
3375  LagTracker.last_lsn = lsn;
3376 
3377  /*
3378  * If advancing the write head of the circular buffer would crash into any
3379  * of the read heads, then the buffer is full. In other words, the
3380  * slowest reader (presumably apply) is the one that controls the release
3381  * of space.
3382  */
3383  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3384  buffer_full = false;
3385  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3386  {
3387  if (new_write_head == LagTracker.read_heads[i])
3388  buffer_full = true;
3389  }
3390 
3391  /*
3392  * If the buffer is full, for now we just rewind by one slot and overwrite
3393  * the last sample, as a simple (if somewhat uneven) way to lower the
3394  * sampling rate. There may be better adaptive compaction algorithms.
3395  */
3396  if (buffer_full)
3397  {
3398  new_write_head = LagTracker.write_head;
3399  if (LagTracker.write_head > 0)
3400  LagTracker.write_head--;
3401  else
3402  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3403  }
3404 
3405  /* Store a sample at the current write head position. */
3406  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3407  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3408  LagTracker.write_head = new_write_head;
3409 }
bool am_walsender
Definition: walsender.c:111
static struct @27 LagTracker
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:205
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page,
TimeLineID pageTLI 
)
static

Definition at line 754 of file walsender.c.

References XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WalSndWaitForWal(), XLogRead(), and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

756 {
757  XLogRecPtr flushptr;
758  int count;
759 
760  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
762  sendTimeLine = state->currTLI;
764  sendTimeLineNextTLI = state->nextTLI;
765 
766  /* make sure we have enough WAL available */
767  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
768 
769  /* more than one block available */
770  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
771  count = XLOG_BLCKSZ;
772  /* not enough WAL synced, that can happen during shutdown */
773  else if (targetPagePtr + reqLen > flushptr)
774  return -1;
775  /* part of the page available */
776  else
777  count = flushptr - targetPagePtr;
778 
779  /* now actually read the data, we know it's there */
780  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
781 
782  return count;
783 }
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:799
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:175
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2304
TimeLineID nextTLI
Definition: xlogreader.h:181
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1274
TimeLineID ThisTimeLineID
Definition: xlog.c:179
TimeLineID currTLI
Definition: xlogreader.h:165
static TimeLineID sendTimeLine
Definition: walsender.c:144
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:145
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:147
static bool sendTimeLineIsHistoric
Definition: walsender.c:146
static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3123 of file walsender.c.

References Interval::day, Interval::month, palloc(), result, and Interval::time.

Referenced by pg_stat_get_wal_senders().

3124 {
3125  Interval *result = palloc(sizeof(Interval));
3126 
3127  result->month = 0;
3128  result->day = 0;
3129  result->time = offset;
3130 
3131  return result;
3132 }
return result
Definition: formatting.c:1632
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:849
static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 789 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

792 {
793  ListCell *lc;
794  bool snapshot_action_given = false;
795  bool reserve_wal_given = false;
796 
797  /* Parse options */
798  foreach(lc, cmd->options)
799  {
800  DefElem *defel = (DefElem *) lfirst(lc);
801 
802  if (strcmp(defel->defname, "export_snapshot") == 0)
803  {
804  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
805  ereport(ERROR,
806  (errcode(ERRCODE_SYNTAX_ERROR),
807  errmsg("conflicting or redundant options")));
808 
809  snapshot_action_given = true;
810  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
812  }
813  else if (strcmp(defel->defname, "use_snapshot") == 0)
814  {
815  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
816  ereport(ERROR,
817  (errcode(ERRCODE_SYNTAX_ERROR),
818  errmsg("conflicting or redundant options")));
819 
820  snapshot_action_given = true;
821  *snapshot_action = CRS_USE_SNAPSHOT;
822  }
823  else if (strcmp(defel->defname, "reserve_wal") == 0)
824  {
825  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
826  ereport(ERROR,
827  (errcode(ERRCODE_SYNTAX_ERROR),
828  errmsg("conflicting or redundant options")));
829 
830  reserve_wal_given = true;
831  *reserve_wal = true;
832  }
833  else
834  elog(ERROR, "unrecognized option: %s", defel->defname);
835  }
836 }
int errcode(int sqlerrcode)
Definition: elog.c:575
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:720
#define elog
Definition: elog.h:219
Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3139 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), i, Int32GetDatum, IntervalPGetDatum, IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, NULL, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, superuser(), SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3140 {
3141 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3142  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3143  TupleDesc tupdesc;
3144  Tuplestorestate *tupstore;
3145  MemoryContext per_query_ctx;
3146  MemoryContext oldcontext;
3147  List *sync_standbys;
3148  int i;
3149 
3150  /* check to see if caller supports us returning a tuplestore */
3151  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3152  ereport(ERROR,
3153  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3154  errmsg("set-valued function called in context that cannot accept a set")));
3155  if (!(rsinfo->allowedModes & SFRM_Materialize))
3156  ereport(ERROR,
3157  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3158  errmsg("materialize mode required, but it is not " \
3159  "allowed in this context")));
3160 
3161  /* Build a tuple descriptor for our result type */
3162  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3163  elog(ERROR, "return type must be a row type");
3164 
3165  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3166  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3167 
3168  tupstore = tuplestore_begin_heap(true, false, work_mem);
3169  rsinfo->returnMode = SFRM_Materialize;
3170  rsinfo->setResult = tupstore;
3171  rsinfo->setDesc = tupdesc;
3172 
3173  MemoryContextSwitchTo(oldcontext);
3174 
3175  /*
3176  * Get the currently active synchronous standbys.
3177  */
3178  LWLockAcquire(SyncRepLock, LW_SHARED);
3179  sync_standbys = SyncRepGetSyncStandbys(NULL);
3180  LWLockRelease(SyncRepLock);
3181 
3182  for (i = 0; i < max_wal_senders; i++)
3183  {
3184  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3186  XLogRecPtr write;
3187  XLogRecPtr flush;
3188  XLogRecPtr apply;
3189  TimeOffset writeLag;
3190  TimeOffset flushLag;
3191  TimeOffset applyLag;
3192  int priority;
3195  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3196 
3197  if (walsnd->pid == 0)
3198  continue;
3199 
3200  SpinLockAcquire(&walsnd->mutex);
3201  sentPtr = walsnd->sentPtr;
3202  state = walsnd->state;
3203  write = walsnd->write;
3204  flush = walsnd->flush;
3205  apply = walsnd->apply;
3206  writeLag = walsnd->writeLag;
3207  flushLag = walsnd->flushLag;
3208  applyLag = walsnd->applyLag;
3209  priority = walsnd->sync_standby_priority;
3210  SpinLockRelease(&walsnd->mutex);
3211 
3212  memset(nulls, 0, sizeof(nulls));
3213  values[0] = Int32GetDatum(walsnd->pid);
3214 
3215  if (!superuser())
3216  {
3217  /*
3218  * Only superusers can see details. Other users only get the pid
3219  * value to know it's a walsender, but no details.
3220  */
3221  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3222  }
3223  else
3224  {
3225  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3226 
3227  if (XLogRecPtrIsInvalid(sentPtr))
3228  nulls[2] = true;
3229  values[2] = LSNGetDatum(sentPtr);
3230 
3231  if (XLogRecPtrIsInvalid(write))
3232  nulls[3] = true;
3233  values[3] = LSNGetDatum(write);
3234 
3235  if (XLogRecPtrIsInvalid(flush))
3236  nulls[4] = true;
3237  values[4] = LSNGetDatum(flush);
3238 
3239  if (XLogRecPtrIsInvalid(apply))
3240  nulls[5] = true;
3241  values[5] = LSNGetDatum(apply);
3242 
3243  /*
3244  * Treat a standby such as a pg_basebackup background process
3245  * which always returns an invalid flush location, as an
3246  * asynchronous standby.
3247  */
3248  priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
3249 
3250  if (writeLag < 0)
3251  nulls[6] = true;
3252  else
3253  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3254 
3255  if (flushLag < 0)
3256  nulls[7] = true;
3257  else
3258  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3259 
3260  if (applyLag < 0)
3261  nulls[8] = true;
3262  else
3263  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3264 
3265  values[9] = Int32GetDatum(priority);
3266 
3267  /*
3268  * More easily understood version of standby state. This is purely
3269  * informational.
3270  *
3271  * In quorum-based sync replication, the role of each standby
3272  * listed in synchronous_standby_names can be changing very
3273  * frequently. Any standbys considered as "sync" at one moment can
3274  * be switched to "potential" ones at the next moment. So, it's
3275  * basically useless to report "sync" or "potential" as their sync
3276  * states. We report just "quorum" for them.
3277  */
3278  if (priority == 0)
3279  values[10] = CStringGetTextDatum("async");
3280  else if (list_member_int(sync_standbys, i))
3282  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3283  else
3284  values[10] = CStringGetTextDatum("potential");
3285  }
3286 
3287  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3288  }
3289 
3290  /* clean up and return the tuplestore */
3291  tuplestore_donestoring(tupstore);
3292 
3293  return (Datum) 0;
3294 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
#define write(a, b, c)
Definition: win32.h:14
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:857
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:677
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
bool list_member_int(const List *list, int datum)
Definition: list.c:485
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:117
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:372
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3104
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:153
int allowedModes
Definition: execnodes.h:268
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:273
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:266
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:274
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3123
XLogRecPtr apply
Definition: pg_list.h:45
static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1703 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1704 {
1705  bool changed = false;
1707 
1708  Assert(lsn != InvalidXLogRecPtr);
1709  SpinLockAcquire(&slot->mutex);
1710  if (slot->data.restart_lsn != lsn)
1711  {
1712  changed = true;
1713  slot->data.restart_lsn = lsn;
1714  }
1715  SpinLockRelease(&slot->mutex);
1716 
1717  if (changed)
1718  {
1721  }
1722 
1723  /*
1724  * One could argue that the slot should be saved to disk now, but that'd
1725  * be energy wasted - the worst lost information can do here is give us
1726  * wrong information in a statistics view - we'll just potentially be more
1727  * conservative in removing files.
1728  */
1729 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:682
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:675
XLogRecPtr restart_lsn
Definition: slot.h:68
slock_t mutex
Definition: slot.h:88
void ReplicationSlotMarkDirty(void)
Definition: slot.c:597
static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1827 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1828 {
1829  bool changed = false;
1831 
1832  SpinLockAcquire(&slot->mutex);
1834 
1835  /*
1836  * For physical replication we don't need the interlock provided by xmin
1837  * and effective_xmin since the consequences of a missed increase are
1838  * limited to query cancellations, so set both at once.
1839  */
1840  if (!TransactionIdIsNormal(slot->data.xmin) ||
1841  !TransactionIdIsNormal(feedbackXmin) ||
1842  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1843  {
1844  changed = true;
1845  slot->data.xmin = feedbackXmin;
1846  slot->effective_xmin = feedbackXmin;
1847  }
1848  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1849  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1850  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1851  {
1852  changed = true;
1853  slot->data.catalog_xmin = feedbackCatalogXmin;
1854  slot->effective_catalog_xmin = feedbackCatalogXmin;
1855  }
1856  SpinLockRelease(&slot->mutex);
1857 
1858  if (changed)
1859  {
1862  }
1863 }
TransactionId xmin
Definition: proc.h:213
ReplicationSlotPersistentData data
Definition: slot.h:115
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:111
TransactionId catalog_xmin
Definition: slot.h:65
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:57
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:112
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:88
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:636
void ReplicationSlotMarkDirty(void)
Definition: slot.c:597
static void ProcessRepliesIfAny ( void  )
static

Definition at line 1570 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_reply_timestamp, NULL, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1571 {
1572  unsigned char firstchar;
1573  int r;
1574  bool received = false;
1575 
1576  for (;;)
1577  {
1578  pq_startmsgread();
1579  r = pq_getbyte_if_available(&firstchar);
1580  if (r < 0)
1581  {
1582  /* unexpected error or EOF */
1584  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1585  errmsg("unexpected EOF on standby connection")));
1586  proc_exit(0);
1587  }
1588  if (r == 0)
1589  {
1590  /* no data available without blocking */
1591  pq_endmsgread();
1592  break;
1593  }
1594 
1595  /* Read the message contents */
1597  if (pq_getmessage(&reply_message, 0))
1598  {
1600  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1601  errmsg("unexpected EOF on standby connection")));
1602  proc_exit(0);
1603  }
1604 
1605  /*
1606  * If we already received a CopyDone from the frontend, the frontend
1607  * should not send us anything until we've closed our end of the COPY.
1608  * XXX: In theory, the frontend could already send the next command
1609  * before receiving the CopyDone, but libpq doesn't currently allow
1610  * that.
1611  */
1612  if (streamingDoneReceiving && firstchar != 'X')
1613  ereport(FATAL,
1614  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1615  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1616  firstchar)));
1617 
1618  /* Handle the very limited subset of commands expected in this phase */
1619  switch (firstchar)
1620  {
1621  /*
1622  * 'd' means a standby reply wrapped in a CopyData packet.
1623  */
1624  case 'd':
1626  received = true;
1627  break;
1628 
1629  /*
1630  * CopyDone means the standby requested to finish streaming.
1631  * Reply with CopyDone, if we had not sent that already.
1632  */
1633  case 'c':
1634  if (!streamingDoneSending)
1635  {
1636  pq_putmessage_noblock('c', NULL, 0);
1637  streamingDoneSending = true;
1638  }
1639 
1640  streamingDoneReceiving = true;
1641  received = true;
1642  break;
1643 
1644  /*
1645  * 'X' means that the standby is closing down the socket.
1646  */
1647  case 'X':
1648  proc_exit(0);
1649 
1650  default:
1651  ereport(FATAL,
1652  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1653  errmsg("invalid standby message type \"%c\"",
1654  firstchar)));
1655  }
1656  }
1657 
1658  /*
1659  * Save the last reply timestamp if we've received at least one reply.
1660  */
1661  if (received)
1662  {
1664  waiting_for_ping_response = false;
1665  }
1666 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1672
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
void pq_startmsgread(void)
Definition: pqcomm.c:1191
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1021
static bool streamingDoneSending
Definition: walsender.c:175
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:122
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1253
static StringInfoData reply_message
Definition: walsender.c:157
void pq_endmsgread(void)
Definition: pqcomm.c:1215
#define NULL
Definition: c.h:229
static bool streamingDoneReceiving
Definition: walsender.c:176
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:167
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1904 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, MyPgXact, MyReplicationSlot, NULL, PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1905 {
1906  TransactionId feedbackXmin;
1907  uint32 feedbackEpoch;
1908  TransactionId feedbackCatalogXmin;
1909  uint32 feedbackCatalogEpoch;
1910 
1911  /*
1912  * Decipher the reply message. The caller already consumed the msgtype
1913  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1914  * of this message.
1915  */
1916  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1917  feedbackXmin = pq_getmsgint(&reply_message, 4);
1918  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1919  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1920  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1921 
1922  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1923  feedbackXmin,
1924  feedbackEpoch,
1925  feedbackCatalogXmin,
1926  feedbackCatalogEpoch);
1927 
1928  /*
1929  * Unset WalSender's xmins if the feedback message values are invalid.
1930  * This happens when the downstream turned hot_standby_feedback off.
1931  */
1932  if (!TransactionIdIsNormal(feedbackXmin)
1933  && !TransactionIdIsNormal(feedbackCatalogXmin))
1934  {
1936  if (MyReplicationSlot != NULL)
1937  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1938  return;
1939  }
1940 
1941  /*
1942  * Check that the provided xmin/epoch are sane, that is, not in the future
1943  * and not so far back as to be already wrapped around. Ignore if not.
1944  */
1945  if (TransactionIdIsNormal(feedbackXmin) &&
1946  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1947  return;
1948 
1949  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1950  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1951  return;
1952 
1953  /*
1954  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1955  * the xmin will be taken into account by GetOldestXmin. This will hold
1956  * back the removal of dead rows and thereby prevent the generation of
1957  * cleanup conflicts on the standby server.
1958  *
1959  * There is a small window for a race condition here: although we just
1960  * checked that feedbackXmin precedes nextXid, the nextXid could have
1961  * gotten advanced between our fetching it and applying the xmin below,
1962  * perhaps far enough to make feedbackXmin wrap around. In that case the
1963  * xmin we set here would be "in the future" and have no effect. No point
1964  * in worrying about this since it's too late to save the desired data
1965  * anyway. Assuming that the standby sends us an increasing sequence of
1966  * xmins, this could only happen during the first reply cycle, else our
1967  * own xmin would prevent nextXid from advancing so far.
1968  *
1969  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1970  * is assumed atomic, and there's no real need to prevent a concurrent
1971  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1972  * safe, and if we're moving it backwards, well, the data is at risk
1973  * already since a VACUUM could have just finished calling GetOldestXmin.)
1974  *
1975  * If we're using a replication slot we reserve the xmin via that,
1976  * otherwise via the walsender's PGXACT entry. We can only track the
1977  * catalog xmin separately when using a slot, so we store the least of the
1978  * two provided when not using a slot.
1979  *
1980  * XXX: It might make sense to generalize the ephemeral slot concept and
1981  * always use the slot mechanism to handle the feedback xmin.
1982  */
1983  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1984  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1985  else
1986  {
1987  if (TransactionIdIsNormal(feedbackCatalogXmin)
1988  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
1989  MyPgXact->xmin = feedbackCatalogXmin;
1990  else
1991  MyPgXact->xmin = feedbackXmin;
1992  }
1993 }
uint32 TransactionId
Definition: c.h:397
TransactionId xmin
Definition: proc.h:213
PGXACT * MyPgXact
Definition: proc.c:68
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1876
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:268
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1827
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define elog
Definition: elog.h:219
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static void ProcessStandbyMessage ( void  )
static

Definition at line 1672 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1673 {
1674  char msgtype;
1675 
1676  /*
1677  * Check message type from the first byte.
1678  */
1679  msgtype = pq_getmsgbyte(&reply_message);
1680 
1681  switch (msgtype)
1682  {
1683  case 'r':
1685  break;
1686 
1687  case 'h':
1689  break;
1690 
1691  default:
1693  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1694  errmsg("unexpected message type \"%c\"", msgtype)));
1695  proc_exit(0);
1696  }
1697 }
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static StringInfoData reply_message
Definition: walsender.c:157
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1735
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1904
static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1735 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1736 {
1737  XLogRecPtr writePtr,
1738  flushPtr,
1739  applyPtr;
1740  bool replyRequested;
1741  TimeOffset writeLag,
1742  flushLag,
1743  applyLag;
1744  bool clearLagTimes;
1745  TimestampTz now;
1746 
1747  static bool fullyAppliedLastTime = false;
1748 
1749  /* the caller already consumed the msgtype byte */
1750  writePtr = pq_getmsgint64(&reply_message);
1751  flushPtr = pq_getmsgint64(&reply_message);
1752  applyPtr = pq_getmsgint64(&reply_message);
1753  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1754  replyRequested = pq_getmsgbyte(&reply_message);
1755 
1756  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1757  (uint32) (writePtr >> 32), (uint32) writePtr,
1758  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1759  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1760  replyRequested ? " (reply requested)" : "");
1761 
1762  /* See if we can compute the round-trip lag for these positions. */
1763  now = GetCurrentTimestamp();
1764  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1765  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1766  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1767 
1768  /*
1769  * If the standby reports that it has fully replayed the WAL in two
1770  * consecutive reply messages, then the second such message must result
1771  * from wal_receiver_status_interval expiring on the standby. This is a
1772  * convenient time to forget the lag times measured when it last
1773  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1774  * until more WAL traffic arrives.
1775  */
1776  clearLagTimes = false;
1777  if (applyPtr == sentPtr)
1778  {
1779  if (fullyAppliedLastTime)
1780  clearLagTimes = true;
1781  fullyAppliedLastTime = true;
1782  }
1783  else
1784  fullyAppliedLastTime = false;
1785 
1786  /* Send a reply if the standby requested one. */
1787  if (replyRequested)
1788  WalSndKeepalive(false);
1789 
1790  /*
1791  * Update shared state for this WalSender process based on reply data from
1792  * standby.
1793  */
1794  {
1795  WalSnd *walsnd = MyWalSnd;
1796 
1797  SpinLockAcquire(&walsnd->mutex);
1798  walsnd->write = writePtr;
1799  walsnd->flush = flushPtr;
1800  walsnd->apply = applyPtr;
1801  if (writeLag != -1 || clearLagTimes)
1802  walsnd->writeLag = writeLag;
1803  if (flushLag != -1 || clearLagTimes)
1804  walsnd->flushLag = flushLag;
1805  if (applyLag != -1 || clearLagTimes)
1806  walsnd->applyLag = applyLag;
1807  SpinLockRelease(&walsnd->mutex);
1808  }
1809 
1812 
1813  /*
1814  * Advance our local xmin horizon when the client confirmed a flush.
1815  */
1816  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1817  {
1820  else
1822  }
1823 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3302
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1703
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:268
#define SlotIsLogical(slot)
Definition: slot.h:134
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:157
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3425
WalSnd * MyWalSnd
Definition: walsender.c:108
static XLogRecPtr sentPtr
Definition: walsender.c:153
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:914
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define elog
Definition: elog.h:219
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:409
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
bool am_cascading_walsender
Definition: walsender.c:112
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 429 of file walsender.c.

References buf, BYTEAOID, CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint(), pq_sendstring(), read, TEXTOID, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

430 {
432  char histfname[MAXFNAMELEN];
433  char path[MAXPGPATH];
434  int fd;
435  off_t histfilelen;
436  off_t bytesleft;
437  Size len;
438 
439  /*
440  * Reply with a result set with one row, and two columns. The first col is
441  * the name of the history file, 2nd is the contents.
442  */
443 
444  TLHistoryFileName(histfname, cmd->timeline);
445  TLHistoryFilePath(path, cmd->timeline);
446 
447  /* Send a RowDescription message */
448  pq_beginmessage(&buf, 'T');
449  pq_sendint(&buf, 2, 2); /* 2 fields */
450 
451  /* first field */
452  pq_sendstring(&buf, "filename"); /* col name */
453  pq_sendint(&buf, 0, 4); /* table oid */
454  pq_sendint(&buf, 0, 2); /* attnum */
455  pq_sendint(&buf, TEXTOID, 4); /* type oid */
456  pq_sendint(&buf, -1, 2); /* typlen */
457  pq_sendint(&buf, 0, 4); /* typmod */
458  pq_sendint(&buf, 0, 2); /* format code */
459 
460  /* second field */
461  pq_sendstring(&buf, "content"); /* col name */
462  pq_sendint(&buf, 0, 4); /* table oid */
463  pq_sendint(&buf, 0, 2); /* attnum */
464  pq_sendint(&buf, BYTEAOID, 4); /* type oid */
465  pq_sendint(&buf, -1, 2); /* typlen */
466  pq_sendint(&buf, 0, 4); /* typmod */
467  pq_sendint(&buf, 0, 2); /* format code */
468  pq_endmessage(&buf);
469 
470  /* Send a DataRow message */
471  pq_beginmessage(&buf, 'D');
472  pq_sendint(&buf, 2, 2); /* # of columns */
473  len = strlen(histfname);
474  pq_sendint(&buf, len, 4); /* col1 len */
475  pq_sendbytes(&buf, histfname, len);
476 
477  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
478  if (fd < 0)
479  ereport(ERROR,
481  errmsg("could not open file \"%s\": %m", path)));
482 
483  /* Determine file length and send it to client */
484  histfilelen = lseek(fd, 0, SEEK_END);
485  if (histfilelen < 0)
486  ereport(ERROR,
488  errmsg("could not seek to end of file \"%s\": %m", path)));
489  if (lseek(fd, 0, SEEK_SET) != 0)
490  ereport(ERROR,
492  errmsg("could not seek to beginning of file \"%s\": %m", path)));
493 
494  pq_sendint(&buf, histfilelen, 4); /* col2 len */
495 
496  bytesleft = histfilelen;
497  while (bytesleft > 0)
498  {
499  char rbuf[BLCKSZ];
500  int nread;
501 
503  nread = read(fd, rbuf, sizeof(rbuf));
505  if (nread <= 0)
506  ereport(ERROR,
508  errmsg("could not read file \"%s\": %m",
509  path)));
510  pq_sendbytes(&buf, rbuf, nread);
511  bytesleft -= nread;
512  }
513  CloseTransientFile(fd);
514 
515  pq_endmessage(&buf);
516 }
#define TEXTOID
Definition: pg_type.h:324
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
TimeLineID timeline
Definition: replnodes.h:96
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:66
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2305
#define MAXFNAMELEN
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
#define BYTEAOID
Definition: pg_type.h:292
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:115
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)
static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1047 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_SIGINT, LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1048 {
1050 
1051  /* make sure that our requirements are still fulfilled */
1053 
1055 
1057 
1058  /*
1059  * Force a disconnect, so that the decoding code doesn't need to care
1060  * about an eventual switch from running in recovery, to running in a
1061  * normal environment. Client code is expected to handle reconnects.
1062  */
1064  {
1065  ereport(LOG,
1066  (errmsg("terminating walsender process after promotion")));
1067  got_SIGINT = true;
1068  }
1069 
1071 
1072  /* Send a CopyBothResponse message, and start streaming */
1073  pq_beginmessage(&buf, 'W');
1074  pq_sendbyte(&buf, 0);
1075  pq_sendint(&buf, 0, 2);
1076  pq_endmessage(&buf);
1077  pq_flush();
1078 
1079  /*
1080  * Initialize position to the last ack'ed one, then the xlog records begin
1081  * to be shipped from that position.
1082  */
1088 
1089  /* Start reading WAL from the oldest required WAL. */
1091 
1092  /*
1093  * Report the location after which we'll send out further commits as the
1094  * current sentPtr.
1095  */
1097 
1098  /* Also update the sent position status in shared memory */
1099  {
1100  WalSnd *walsnd = MyWalSnd;
1101 
1102  SpinLockAcquire(&walsnd->mutex);
1104  SpinLockRelease(&walsnd->mutex);
1105  }
1106 
1107  replication_active = true;
1108 
1110 
1111  /* Main loop of walsender */
1113 
1116 
1117  replication_active = false;
1118  if (got_SIGINT)
1119  proc_exit(0);
1121 
1122  /* Get out of COPY mode (CommandComplete). */
1123  EndCommand("COPY 0", DestRemote);
1124 }
XLogRecPtr startpoint
Definition: replnodes.h:84
#define pq_flush()
Definition: libpq.h:39
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void proc_exit(int code)
Definition: ipc.c:99
ReplicationSlotPersistentData data
Definition: slot.h:115
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7873
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
XLogRecPtr confirmed_flush
Definition: slot.h:76
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:754
#define SpinLockAcquire(lock)
Definition: spin.h:62
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:194
static char * buf
Definition: pg_test_fsync.c:66
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1162
static XLogRecPtr logical_startptr
Definition: walsender.c:195
void ReplicationSlotRelease(void)
Definition: slot.c:375
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2072
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1135
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:192
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:108
static XLogRecPtr sentPtr
Definition: walsender.c:153
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:343
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:675
void WalSndSetState(WalSndState state)
Definition: walsender.c:3085
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:475
static void XLogSendLogical(void)
Definition: walsender.c:2738
XLogRecPtr restart_lsn
Definition: slot.h:68
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
static volatile sig_atomic_t got_SIGINT
Definition: walsender.c:183
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
bool am_cascading_walsender
Definition: walsender.c:112
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1252
static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 525 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_SIGINT, Int64GetDatum(), INT8OID, InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TEXTOID, ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TupleDescInitBuiltinEntry(), values, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

526 {
528  XLogRecPtr FlushPtr;
529 
530  if (ThisTimeLineID == 0)
531  ereport(ERROR,
532  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
533  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
534 
535  /*
536  * We assume here that we're logging enough information in the WAL for
537  * log-shipping, since this is checked in PostmasterMain().
538  *
539  * NOTE: wal_level can only change at shutdown, so in most cases it is
540  * difficult for there to be WAL data that we can still see that was
541  * written at wal_level='minimal'.
542  */
543 
544  if (cmd->slotname)
545  {
548  ereport(ERROR,
549  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
550  (errmsg("cannot use a logical replication slot for physical replication"))));
551  }
552 
553  /*
554  * Select the timeline. If it was given explicitly by the client, use
555  * that. Otherwise use the timeline of the last replayed record, which is
556  * kept in ThisTimeLineID.
557  */
559  {
560  /* this also updates ThisTimeLineID */
561  FlushPtr = GetStandbyFlushRecPtr();
562  }
563  else
564  FlushPtr = GetFlushRecPtr();
565 
566  if (cmd->timeline != 0)
567  {
568  XLogRecPtr switchpoint;
569 
570  sendTimeLine = cmd->timeline;
572  {
573  sendTimeLineIsHistoric = false;
575  }
576  else
577  {
578  List *timeLineHistory;
579 
580  sendTimeLineIsHistoric = true;
581 
582  /*
583  * Check that the timeline the client requested exists, and the
584  * requested start location is on that timeline.
585  */
586  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
587  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
589  list_free_deep(timeLineHistory);
590 
591  /*
592  * Found the requested timeline in the history. Check that
593  * requested startpoint is on that timeline in our history.
594  *
595  * This is quite loose on purpose. We only check that we didn't
596  * fork off the requested timeline before the switchpoint. We
597  * don't check that we switched *to* it before the requested
598  * starting point. This is because the client can legitimately
599  * request to start replication from the beginning of the WAL
600  * segment that contains switchpoint, but on the new timeline, so
601  * that it doesn't end up with a partial segment. If you ask for
602  * too old a starting point, you'll get an error later when we
603  * fail to find the requested WAL segment in pg_wal.
604  *
605  * XXX: we could be more strict here and only allow a startpoint
606  * that's older than the switchpoint, if it's still in the same
607  * WAL segment.
608  */
609  if (!XLogRecPtrIsInvalid(switchpoint) &&
610  switchpoint < cmd->startpoint)
611  {
612  ereport(ERROR,
613  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
614  (uint32) (cmd->startpoint >> 32),
615  (uint32) (cmd->startpoint),
616  cmd->timeline),
617  errdetail("This server's history forked from timeline %u at %X/%X.",
618  cmd->timeline,
619  (uint32) (switchpoint >> 32),
620  (uint32) (switchpoint))));
621  }
622  sendTimeLineValidUpto = switchpoint;
623  }
624  }
625  else
626  {
629  sendTimeLineIsHistoric = false;
630  }
631 
633 
634  /* If there is nothing to stream, don't even enter COPY mode */
636  {
637  /*
638  * When we first start replication the standby will be behind the
639  * primary. For some applications, for example synchronous
640  * replication, it is important to have a clear state for this initial
641  * catchup mode, so we can trigger actions when we change streaming
642  * state later. We may stay in this state for a long time, which is
643  * exactly why we want to be able to monitor whether or not we are
644  * still here.
645  */
647 
648  /* Send a CopyBothResponse message, and start streaming */
649  pq_beginmessage(&buf, 'W');
650  pq_sendbyte(&buf, 0);
651  pq_sendint(&buf, 0, 2);
652  pq_endmessage(&buf);
653  pq_flush();
654 
655  /*
656  * Don't allow a request to stream from a future point in WAL that
657  * hasn't been flushed to disk in this server yet.
658  */
659  if (FlushPtr < cmd->startpoint)
660  {
661  ereport(ERROR,
662  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
663  (uint32) (cmd->startpoint >> 32),
664  (uint32) (cmd->startpoint),
665  (uint32) (FlushPtr >> 32),
666  (uint32) (FlushPtr))));
667  }
668 
669  /* Start streaming from the requested point */
670  sentPtr = cmd->startpoint;
671 
672  /* Initialize shared memory status, too */
673  {
674  WalSnd *walsnd = MyWalSnd;
675 
676  SpinLockAcquire(&walsnd->mutex);
677  walsnd->sentPtr = sentPtr;
678  SpinLockRelease(&walsnd->mutex);
679  }
680 
682 
683  /* Main loop of walsender */
684  replication_active = true;
685 
687 
688  replication_active = false;
689  if (got_SIGINT)
690  proc_exit(0);
692 
694  }
695 
696  if (cmd->slotname)
698 
699  /*
700  * Copy is finished now. Send a single-row result set indicating the next
701  * timeline.
702  */
704  {
705  char startpos_str[8 + 1 + 8 + 1];
706  DestReceiver *dest;
707  TupOutputState *tstate;
708  TupleDesc tupdesc;
709  Datum values[2];
710  bool nulls[2];
711 
712  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
713  (uint32) (sendTimeLineValidUpto >> 32),
715 
717  MemSet(nulls, false, sizeof(nulls));
718 
719  /*
720  * Need a tuple descriptor representing two columns. int8 may seem
721  * like a surprising data type for this, but in theory int4 would not
722  * be wide enough for this, as TimeLineID is unsigned.
723  */
724  tupdesc = CreateTemplateTupleDesc(2, false);
725  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
726  INT8OID, -1, 0);
727  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
728  TEXTOID, -1, 0);
729 
730  /* prepare for projection of tuple */
731  tstate = begin_tup_output_tupdesc(dest, tupdesc);
732 
733  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
734  values[1] = CStringGetTextDatum(startpos_str);
735 
736  /* send it to dest */
737  do_tup_output(tstate, values, nulls);
738 
739  end_tup_output(tstate);
740  }
741 
742  /* Send CommandComplete message */
743  pq_puttextmessage('C', "START_STREAMING");
744 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:84
#define pq_flush()
Definition: libpq.h:39
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define TEXTOID
Definition: pg_type.h:324
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2479
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8222
void list_free_deep(List *list)
Definition: list.c:1147
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:66
static bool streamingDoneSending
Definition: walsender.c:175
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
int errdetail(const char *fmt,...)
Definition: elog.c:873
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:375
#define SlotIsLogical(slot)
Definition: slot.h:134
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1791
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2072
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:192
uintptr_t Datum
Definition: postgres.h:372
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:108
static XLogRecPtr sentPtr
Definition: walsender.c:153
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define INT8OID
Definition: pg_type.h:304
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:144
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
void WalSndSetState(WalSndState state)
Definition: walsender.c:3085
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:145
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:147
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static bool streamingDoneReceiving
Definition: walsender.c:176
static Datum values[MAXATTR]
Definition: bootstrap.c:163
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
TimeLineID timeline
Definition: replnodes.h:83
static volatile sig_atomic_t got_SIGINT
Definition: walsender.c:183
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2839
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:400
Definition: pg_list.h:45
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:146
bool am_cascading_walsender
Definition: walsender.c:112
static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 1876 of file walsender.c.

References GetNextXidAndEpoch(), and TransactionIdPrecedesOrEquals().

Referenced by ProcessStandbyHSFeedbackMessage().

1877 {
1878  TransactionId nextXid;
1879  uint32 nextEpoch;
1880 
1881  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1882 
1883  if (xid <= nextXid)
1884  {
1885  if (epoch != nextEpoch)
1886  return false;
1887  }
1888  else
1889  {
1890  if (epoch + 1 != nextEpoch)
1891  return false;
1892  }
1893 
1894  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1895  return false; /* epoch OK, but it's wrapped around */
1896 
1897  return true;
1898 }
uint32 TransactionId
Definition: c.h:397
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8291
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:268
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
static void WalSndCheckTimeOut ( TimestampTz  now)
static

Definition at line 2045 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2046 {
2047  TimestampTz timeout;
2048 
2049  /* don't bail out if we're doing something that doesn't require timeouts */
2050  if (last_reply_timestamp <= 0)
2051  return;
2052 
2055 
2056  if (wal_sender_timeout > 0 && now >= timeout)
2057  {
2058  /*
2059  * Since typically expiration of replication timeout means
2060  * communication problem, we don't send the error message to the
2061  * standby.
2062  */
2064  (errmsg("terminating walsender process due to replication timeout")));
2065 
2066  WalSndShutdown();
2067  }
2068 }
int wal_sender_timeout
Definition: walsender.c:118
int64 TimestampTz
Definition: timestamp.h:39
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndShutdown(void)
Definition: walsender.c:228
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:797
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2003 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2004 {
2005  long sleeptime = 10000; /* 10 s */
2006 
2008  {
2009  TimestampTz wakeup_time;
2010  long sec_to_timeout;
2011  int microsec_to_timeout;
2012 
2013  /*
2014  * At the latest stop sleeping once wal_sender_timeout has been
2015  * reached.
2016  */
2019 
2020  /*
2021  * If no ping has been sent yet, wakeup when it's time to do so.
2022  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2023  * the timeout passed without a response.
2024  */
2027  wal_sender_timeout / 2);
2028 
2029  /* Compute relative time until wakeup. */
2030  TimestampDifference(now, wakeup_time,
2031  &sec_to_timeout, &microsec_to_timeout);
2032 
2033  sleeptime = sec_to_timeout * 1000 +
2034  microsec_to_timeout / 1000;
2035  }
2036 
2037  return sleeptime;
2038 }
int wal_sender_timeout
Definition: walsender.c:118
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
static bool waiting_for_ping_response
Definition: walsender.c:167
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2799 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2800 {
2801  XLogRecPtr replicatedPtr;
2802 
2803  /* ... let's just be real sure we're caught up ... */
2804  send_data();
2805 
2806  /*
2807  * To figure out whether all WAL has successfully been replicated, check
2808  * flush location if valid, write otherwise. Tools like pg_receivewal will
2809  * usually (unless in synchronous mode) return an invalid flush location.
2810  */
2811  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2813 
2814  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2815  !pq_is_send_pending())
2816  {
2817  /* Inform the standby that XLOG streaming is done */
2818  EndCommand("COPY 0", DestRemote);
2819  pq_flush();
2820 
2821  proc_exit(0);
2822  }
2824  {
2825  WalSndKeepalive(true);
2827  }
2828 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3302
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static bool WalSndCaughtUp
Definition: walsender.c:179
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:108
static XLogRecPtr sentPtr
Definition: walsender.c:153
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:167
void WalSndErrorCleanup ( void  )

Definition at line 291 of file walsender.c.

References close, ConditionVariableCancelSleep(), got_SIGINT, got_SIGUSR2, LWLockReleaseAll(), MyReplicationSlot, NULL, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, WalSndSetState(), WALSNDSTATE_STARTUP, and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

292 {
296 
297  if (sendFile >= 0)
298  {
299  close(sendFile);
300  sendFile = -1;
301  }
302 
303  if (MyReplicationSlot != NULL)
305 
307 
308  replication_active = false;
309  if (got_SIGINT)
310  proc_exit(0);
311 
312  /* Revert back to startup state */
314 
315  if (got_SIGUSR2)
317 }
static int sendFile
Definition: walsender.c:131
void proc_exit(int code)
Definition: ipc.c:99
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:375
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:184
static volatile sig_atomic_t replication_active
Definition: walsender.c:192
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
void WalSndSetState(WalSndState state)
Definition: walsender.c:3085
static volatile sig_atomic_t got_SIGINT
Definition: walsender.c:183
void ReplicationSlotCleanup(void)
Definition: slot.c:429
void LWLockReleaseAll(void)
Definition: lwlock.c:1814
#define close(a)
Definition: win32.h:12
static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3104 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3105 {
3106  switch (state)
3107  {
3108  case WALSNDSTATE_STARTUP:
3109  return "startup";
3110  case WALSNDSTATE_BACKUP:
3111  return "backup";
3112  case WALSNDSTATE_CATCHUP:
3113  return "catchup";
3114  case WALSNDSTATE_STREAMING:
3115  return "streaming";
3116  case WALSNDSTATE_STOPPING:
3117  return "stopping";
3118  }
3119  return "UNKNOWN";
3120 }
Definition: regguts.h:298
static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3302 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3303 {
3304  elog(DEBUG2, "sending replication keepalive");
3305 
3306  /* construct the message... */
3308  pq_sendbyte(&output_message, 'k');
3311  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3312 
3313  /* ... and send it wrapped in CopyData */
3315 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
static StringInfoData output_message
Definition: walsender.c:156
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static XLogRecPtr sentPtr
Definition: walsender.c:153
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog
Definition: elog.h:219
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
static void WalSndKeepaliveIfNecessary ( TimestampTz  now)
static

Definition at line 3321 of file walsender.c.

References last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3322 {
3323  TimestampTz ping_time;
3324 
3325  /*
3326  * Don't send keepalive messages if timeouts are globally disabled or
3327  * we're doing something not partaking in timeouts.
3328  */
3329  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3330  return;
3331 
3333  return;
3334 
3335  /*
3336  * If half of wal_sender_timeout has lapsed without receiving any reply
3337  * from the standby, send a keep-alive message to the standby requesting
3338  * an immediate reply.
3339  */
3341  wal_sender_timeout / 2);
3342  if (now >= ping_time)
3343  {
3344  WalSndKeepalive(true);
3346 
3347  /* Try to flush pending output to the client */
3348  if (pq_flush_if_writable() != 0)
3349  WalSndShutdown();
3350  }
3351 }
int wal_sender_timeout
Definition: walsender.c:118
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3302
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:228
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:167
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2276 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, NULL, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2277 {
2278  WalSnd *walsnd = MyWalSnd;
2279 
2280  Assert(walsnd != NULL);
2281 
2282  MyWalSnd = NULL;
2283 
2284  SpinLockAcquire(&walsnd->mutex);
2285  /* clear latch while holding the spinlock, so it can safely be read */
2286  walsnd->latch = NULL;
2287  /* Mark WalSnd struct as no longer being in use. */
2288  walsnd->pid = 0;
2289  SpinLockRelease(&walsnd->mutex);
2290 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:108
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2928 of file walsender.c.

References got_SIGINT, MyLatch, MyProcPid, replication_active, and SetLatch().

Referenced by WalSndSignals().

2929 {
2930  int save_errno = errno;
2931 
2932  /*
2933  * If replication has not yet started, die like with SIGTERM. If
2934  * replication is active, only set a flag and wake up the main loop. It
2935  * will send any outstanding WAL, wait for it to be replicated to the
2936  * standby, and then exit gracefully.
2937  */
2938  if (!replication_active)
2939  kill(MyProcPid, SIGTERM);
2940 
2941  got_SIGINT = true;
2942  SetLatch(MyLatch);
2943 
2944  errno = save_errno;
2945 }
int MyProcPid
Definition: globals.c:38
static volatile sig_atomic_t replication_active
Definition: walsender.c:192
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
static volatile sig_atomic_t got_SIGINT
Definition: walsender.c:183
struct Latch * MyLatch
Definition: globals.c:51
static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2072 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGHUP, got_SIGINT, got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, now(), NULL, PGC_SIGHUP, pgstat_report_activity(), PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, STATE_RUNNING, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STOPPING, WALSNDSTATE_STREAMING, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

2073 {
2074  /*
2075  * Initialize the last reply timestamp. That enables timeout processing
2076  * from hereon.
2077  */
2079  waiting_for_ping_response = false;
2080 
2081  /* Report to pgstat that this process is running */
2083 
2084  /*
2085  * Loop until we reach the end of this timeline or the client requests to
2086  * stop streaming.
2087  */
2088  for (;;)
2089  {
2090  TimestampTz now;
2091 
2092  /*
2093  * Emergency bailout if postmaster has died. This is to avoid the
2094  * necessity for manual cleanup of all postmaster children.
2095  */
2096  if (!PostmasterIsAlive())
2097  exit(1);
2098 
2099  /* Clear any already-pending wakeups */
2101 
2103 
2104  /* Process any requests or signals received recently */
2105  if (got_SIGHUP)
2106  {
2107  got_SIGHUP = false;
2110  }
2111 
2112  /* Check for input from the client */
2114 
2115  /*
2116  * If we have received CopyDone from the client, sent CopyDone
2117  * ourselves, and the output buffer is empty, it's time to exit
2118  * streaming.
2119  */
2121  break;
2122 
2123  /*
2124  * If we don't have any pending data in the output buffer, try to send
2125  * some more. If there is some, we don't bother to call send_data
2126  * again until we've flushed it ... but we'd better assume we are not
2127  * caught up.
2128  */
2129  if (!pq_is_send_pending())
2130  send_data();
2131  else
2132  WalSndCaughtUp = false;
2133 
2134  /* Try to flush pending output to the client */
2135  if (pq_flush_if_writable() != 0)
2136  WalSndShutdown();
2137 
2138  /* If nothing remains to be sent right now ... */
2140  {
2141  /*
2142  * If we're in catchup state, move to streaming. This is an
2143  * important state change for users to know about, since before
2144  * this point data loss might occur if the primary dies and we
2145  * need to failover to the standby. The state change is also
2146  * important for synchronous replication, since commits that
2147  * started to wait at that point might wait for some time.
2148  */
2150  {
2151  ereport(DEBUG1,
2152  (errmsg("standby \"%s\" has now caught up with primary",
2153  application_name)));
2155  }
2156 
2157  /*
2158  * At the reception of SIGUSR2, switch the WAL sender to the
2159  * stopping state.
2160  */
2161  if (got_SIGUSR2)
2163 
2164  /*
2165  * When SIGINT arrives, we send any outstanding logs up to the
2166  * shutdown checkpoint record (i.e., the latest record), wait for
2167  * them to be replicated to the standby, and exit. This may be a
2168  * normal termination at shutdown, or a promotion, the walsender
2169  * is not sure which.
2170  */
2171  if (got_SIGINT)
2172  WalSndDone(send_data);
2173  }
2174 
2175  now = GetCurrentTimestamp();
2176 
2177  /* Check for replication timeout. */
2178  WalSndCheckTimeOut(now);
2179 
2180  /* Send keepalive if the time has come */
2182 
2183  /*
2184  * We don't block if not caught up, unless there is unsent data
2185  * pending in which case we'd better block until the socket is
2186  * write-ready. This test is only needed for the case where the
2187  * send_data callback handled a subset of the available data but then
2188  * pq_flush_if_writable flushed it all --- we should immediately try
2189  * to send more.
2190  */
2192  {
2193  long sleeptime;
2194  int wakeEvents;
2195 
2196  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2198 
2199  sleeptime = WalSndComputeSleeptime(now);
2200 
2201  if (pq_is_send_pending())
2202  wakeEvents |= WL_SOCKET_WRITEABLE;
2203 
2204  /* Sleep until something happens or we time out */
2205  WaitLatchOrSocket(MyLatch, wakeEvents,
2206  MyProcPort->sock, sleeptime,
2208  }
2209  }
2210  return;
2211 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2799
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
pgsocket sock
Definition: libpq-be.h:118
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:182
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:175
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:184
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:179
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:228
WalSnd * MyWalSnd
Definition: walsender.c:108
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3321
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2003
#define NULL
Definition: c.h:229
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2045
void WalSndSetState(WalSndState state)
Definition: walsender.c:3085
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static bool streamingDoneReceiving
Definition: walsender.c:176
char * application_name
Definition: guc.c:469
int errmsg(const char *fmt,...)
Definition: elog.c:797
static volatile sig_atomic_t got_SIGINT
Definition: walsender.c:183
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static bool waiting_for_ping_response
Definition: walsender.c:167
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1570
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1135 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1136 {
1137  /* can't have sync rep confused by sending the same LSN several times */
1138  if (!last_write)
1139  lsn = InvalidXLogRecPtr;
1140 
1141  resetStringInfo(ctx->out);
1142 
1143  pq_sendbyte(ctx->out, 'w');
1144  pq_sendint64(ctx->out, lsn); /* dataStart */
1145  pq_sendint64(ctx->out, lsn); /* walEnd */
1146 
1147  /*
1148  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1149  * reserve space here.
1150  */
1151  pq_sendint64(ctx->out, 0); /* sendtime */
1152 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
StringInfo out
Definition: logical.h:66
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
void WalSndRqstFileReload ( void  )

Definition at line 2869 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2870 {
2871  int i;
2872 
2873  for (i = 0; i < max_wal_senders; i++)
2874  {
2875  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2876 
2877  if (walsnd->pid == 0)
2878  continue;
2879 
2880  SpinLockAcquire(&walsnd->mutex);
2881  walsnd->needreload = true;
2882  SpinLockRelease(&walsnd->mutex);
2883  }
2884 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:117
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i
void WalSndSetState ( WalSndState  state)

Definition at line 3085 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and WalSndWaitForWal().

3086 {
3087  WalSnd *walsnd = MyWalSnd;
3088 
3090 
3091  if (walsnd->state == state)
3092  return;
3093 
3094  SpinLockAcquire(&walsnd->mutex);
3095  walsnd->state = state;
3096  SpinLockRelease(&walsnd->mutex);
3097 }
slock_t mutex
bool am_walsender
Definition: walsender.c:111
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:108
#define Assert(condition)
Definition: c.h:675
Definition: regguts.h:298
void WalSndShmemInit ( void  )

Definition at line 2985 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

2986 {
2987  bool found;
2988  int i;
2989 
2990  WalSndCtl = (WalSndCtlData *)
2991  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2992 
2993  if (!found)
2994  {
2995  /* First time through, so initialize */
2997 
2998  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3000 
3001  for (i = 0; i < max_wal_senders; i++)
3002  {
3003  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3004 
3005  SpinLockInit(&walsnd->mutex);
3006  }
3007  }
3008 }
Size WalSndShmemSize(void)
Definition: walsender.c:2973
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
#define MemSet(start, val, len)
Definition: c.h:857
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:117
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
Size WalSndShmemSize ( void  )

Definition at line 2973 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2974 {
2975  Size size = 0;
2976 
2977  size = offsetof(WalSndCtlData, walsnds);
2978  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2979 
2980  return size;
2981 }
int max_wal_senders
Definition: walsender.c:117
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:356
#define offsetof(type, field)
Definition: c.h:555
static void WalSndShutdown ( void  )
static

Definition at line 228 of file walsender.c.

References am_cascading_walsender, CurrentResourceOwner, InitWalSenderSlot(), LagTracker, MarkPostmasterChildWalSender(), NULL, PMSIGNAL_ADVANCE_STATE_MACHINE, RecoveryInProgress(), ResourceOwnerCreate(), and SendPostmasterSignal().

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

260 {
262 
263  /* Create a per-walsender data structure in shared memory */
265 
266  /* Set up resource owner */
267  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
268 
269  /*
270  * Let postmaster know that we're a WAL sender. Once we've declared us as
271  * a WAL sender process, postmaster will let us outlive the bgwriter and
272  * kill us last in the shutdown sequence, so we get a chance to stream all
273  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
274  * there's no going back, and we mustn't write any WAL records after this.
275  */
278 
279  /* Initialize empty timestamp buffer for lag tracking. */
280  memset(&LagTracker, 0, sizeof(LagTracker));
281 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2215
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
bool RecoveryInProgress(void)
Definition: xlog.c:7873
static struct @27 LagTracker
#define NULL
Definition: c.h:229
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
bool am_cascading_walsender
Definition: walsender.c:112
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
static void WalSndSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 2888 of file walsender.c.

References got_SIGHUP, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2889 {
2890  int save_errno = errno;
2891 
2892  got_SIGHUP = true;
2893 
2894  SetLatch(MyLatch);
2895 
2896  errno = save_errno;
2897 }
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:182
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
struct Latch * MyLatch
Definition: globals.c:51
void WalSndSignals ( void  )

Definition at line 2949 of file walsender.c.

References die(), InitializeTimeouts(), pqsignal(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalSndLastCycleHandler(), WalSndSigHupHandler(), WalSndSwitchStopping(), and WalSndXLogSendHandler().

Referenced by PostgresMain().

2950 {
2951  /* Set up signal handlers */
2952  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2953  * file */
2954  pqsignal(SIGINT, WalSndLastCycleHandler); /* request a last cycle and
2955  * shutdown */
2956  pqsignal(SIGTERM, die); /* request shutdown */
2957  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2958  InitializeTimeouts(); /* establishes SIGALRM handler */
2960  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2961  pqsignal(SIGUSR2, WalSndSwitchStopping); /* switch to stopping state */
2962 
2963  /* Reset some signals that are accepted by postmaster but not here */
2969 }
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2901
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGUSR1
Definition: win32.h:202
#define SIGCONT
Definition: win32.h:197
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2928
#define SIGWINCH
Definition: win32.h:201
#define SIGTTIN
Definition: win32.h:199
#define SIGQUIT
Definition: win32.h:189
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2888
#define SIG_IGN
Definition: win32.h:185
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
#define SIG_DFL
Definition: win32.h:183
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIGTTOU
Definition: win32.h:200
static void WalSndSwitchStopping(SIGNAL_ARGS)
Definition: walsender.c:2912
void die(SIGNAL_ARGS)
Definition: postgres.c:2623
#define SIGCHLD
Definition: win32.h:198
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2562
#define SIGUSR2
Definition: win32.h:203
static void WalSndSwitchStopping ( SIGNAL_ARGS  )
static

Definition at line 2912 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2913 {
2914  int save_errno = errno;
2915 
2916  got_SIGUSR2 = true;
2917  SetLatch(MyLatch);
2918 
2919  errno = save_errno;
2920 }
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:184
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
struct Latch * MyLatch
Definition: globals.c:51
static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1252 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1253 {
1254  static TimestampTz sendTime = 0;
1256 
1257  /*
1258  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1259  * avoid flooding the lag tracker when we commit frequently.
1260  */
1261 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1262  if (!TimestampDifferenceExceeds(sendTime, now,
1264  return;
1265 
1266  LagTrackerWrite(lsn, now);
1267  sendTime = now;
1268 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3360
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1274 of file walsender.c.

References CHECK_FOR_INTERRUPTS, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGHUP, got_SIGINT, got_SIGUSR2, InvalidXLogRecPtr, MyLatch, MyProcPort, now(), NULL, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_STOPPING, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, and WalSnd::write.

Referenced by logical_read_xlog_page().

1275 {
1276  int wakeEvents;
1277  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1278 
1279 
1280  /*
1281  * Fast path to avoid acquiring the spinlock in case we already know we
1282  * have enough WAL available. This is particularly interesting if we're
1283  * far behind.
1284  */
1285  if (RecentFlushPtr != InvalidXLogRecPtr &&
1286  loc <= RecentFlushPtr)
1287  return RecentFlushPtr;
1288 
1289  /* Get a more recent flush pointer. */
1290  if (!RecoveryInProgress())
1291  RecentFlushPtr = GetFlushRecPtr();
1292  else
1293  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1294 
1295  for (;;)
1296  {
1297  long sleeptime;
1298  TimestampTz now;
1299 
1300  /*
1301  * Emergency bailout if postmaster has died. This is to avoid the
1302  * necessity for manual cleanup of all postmaster children.
1303  */
1304  if (!PostmasterIsAlive())
1305  exit(1);
1306 
1307  /* Clear any already-pending wakeups */
1309 
1311 
1312  /* Process any requests or signals received recently */
1313  if (got_SIGHUP)
1314  {
1315  got_SIGHUP = false;
1318  }
1319 
1320  /* Check for input from the client */
1322 
1323  /* Update our idea of the currently flushed position. */
1324  if (!RecoveryInProgress())
1325  RecentFlushPtr = GetFlushRecPtr();
1326  else
1327  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1328 
1329  /*
1330  * If postmaster asked us to switch to the stopping state, do so.
1331  * Shutdown is in progress and this will allow the checkpointer to
1332  * move on with the shutdown checkpoint.
1333  */
1334  if (got_SIGUSR2)
1336 
1337  /*
1338  * If postmaster asked us to stop, don't wait here anymore. This will
1339  * cause the xlogreader to return without reading a full record, which
1340  * is the fastest way to reach the mainloop which then can quit.
1341  *
1342  * It's important to do this check after the recomputation of
1343  * RecentFlushPtr, so we can send all remaining data before shutting
1344  * down.
1345  */
1346  if (got_SIGINT)
1347  break;
1348 
1349  /*
1350  * We only send regular messages to the client for full decoded
1351  * transactions, but a synchronous replication and walsender shutdown
1352  * possibly are waiting for a later location. So we send pings
1353  * containing the flush location every now and then.
1354  */
1355  if (MyWalSnd->flush < sentPtr &&
1356  MyWalSnd->write < sentPtr &&
1358  {
1359  WalSndKeepalive(false);
1361  }
1362 
1363  /* check whether we're done */
1364  if (loc <= RecentFlushPtr)
1365  break;
1366 
1367  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1368  WalSndCaughtUp = true;
1369 
1370  /*
1371  * Try to flush pending output to the client. Also wait for the socket
1372  * becoming writable, if there's still pending output after an attempt
1373  * to flush. Otherwise we might just sit on output data while waiting
1374  * for new WAL being generated.
1375  */
1376  if (pq_flush_if_writable() != 0)
1377  WalSndShutdown();
1378 
1379  now = GetCurrentTimestamp();
1380 
1381  /* die if timeout was reached */
1382  WalSndCheckTimeOut(now);
1383 
1384  /* Send keepalive if the time has come */
1386 
1387  sleeptime = WalSndComputeSleeptime(now);
1388 
1389  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1391 
1392  if (pq_is_send_pending())
1393  wakeEvents |= WL_SOCKET_WRITEABLE;
1394 
1395  /* Sleep until something happens or we time out */
1396  WaitLatchOrSocket(MyLatch, wakeEvents,
1397  MyProcPort->sock, sleeptime,
1399  }
1400 
1401  /* reactivate latch so WalSndLoop knows to continue */
1402  SetLatch(MyLatch);
1403  return RecentFlushPtr;
1404 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8222
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
bool RecoveryInProgress(void)
Definition: xlog.c:7873
pgsocket sock
Definition: libpq-be.h:118
XLogRecPtr flush
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:182
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3302
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11084
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:184
void SyncRepInitConfig(void)
Definition: syncrep.c:381
static bool WalSndCaughtUp
Definition: walsender.c:179
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:228
WalSnd * MyWalSnd
Definition: walsender.c:108
static XLogRecPtr sentPtr
Definition: walsender.c:153
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3321
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2003
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2045
void WalSndSetState(WalSndState state)
Definition: walsender.c:3085
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static volatile sig_atomic_t got_SIGINT
Definition: walsender.c:183
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static bool waiting_for_ping_response
Definition: walsender.c:167
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1570
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void WalSndWaitStopping ( void  )

Definition at line 3045 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3046 {
3047  for (;;)
3048  {
3049  int i;
3050  bool all_stopped = true;
3051 
3052  for (i = 0; i < max_wal_senders; i++)
3053  {
3055  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3056 
3057  SpinLockAcquire(&walsnd->mutex);
3058 
3059  if (walsnd->pid == 0)
3060  {
3061  SpinLockRelease(&walsnd->mutex);
3062  continue;
3063  }
3064 
3065  state = walsnd->state;
3066  SpinLockRelease(&walsnd->mutex);
3067 
3068  if (state != WALSNDSTATE_STOPPING)
3069  {
3070  all_stopped = false;
3071  break;
3072  }
3073  }
3074 
3075  /* safe to leave if confirmation is done for all WAL senders */
3076  if (all_stopped)
3077  return;
3078 
3079  pg_usleep(10000L); /* wait for 10 msec */
3080  }
3081 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:117
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: regguts.h:298
WalSndState
int i
void WalSndWakeup ( void  )

Definition at line 3017 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, NULL, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3018 {
3019  int i;
3020 
3021  for (i = 0; i < max_wal_senders; i++)
3022  {
3023  Latch *latch;
3024  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3025 
3026  /*
3027  * Get latch pointer with spinlock held, for the unlikely case that
3028  * pointer reads aren't atomic (as they're 8 bytes).
3029  */
3030  SpinLockAcquire(&walsnd->mutex);
3031  latch = walsnd->latch;
3032  SpinLockRelease(&walsnd->mutex);
3033 
3034  if (latch != NULL)
3035  SetLatch(latch);
3036  }
3037 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:117
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define NULL
Definition: c.h:229
int i
static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1162 of file walsender.c.

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), got_SIGHUP, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1164 {
1165  /* output previously gathered data in a CopyData packet */
1166  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1167 
1168  /*
1169  * Fill the send timestamp last, so that it is taken as late as possible.
1170  * This is somewhat ugly, but the protocol is set as it's already used for
1171  * several releases by streaming physical replication.
1172  */
1175  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1176  tmpbuf.data, sizeof(int64));
1177 
1178  /* fast path */
1179  /* Try to flush pending output to the client */
1180  if (pq_flush_if_writable() != 0)
1181  WalSndShutdown();
1182 
1183  if (!pq_is_send_pending())
1184  return;
1185 
1186  for (;;)
1187  {
1188  int wakeEvents;
1189  long sleeptime;
1190  TimestampTz now;
1191 
1192  /*
1193  * Emergency bailout if postmaster has died. This is to avoid the
1194  * necessity for manual cleanup of all postmaster children.
1195  */
1196  if (!PostmasterIsAlive())
1197  exit(1);
1198 
1199  /* Clear any already-pending wakeups */
1201 
1203 
1204  /* Process any requests or signals received recently */
1205  if (got_SIGHUP)
1206  {
1207  got_SIGHUP = false;
1210  }
1211 
1212  /* Check for input from the client */
1214 
1215  /* Try to flush pending output to the client */
1216  if (pq_flush_if_writable() != 0)
1217  WalSndShutdown();
1218 
1219  /* If we finished clearing the buffered data, we're done here. */
1220  if (!pq_is_send_pending())
1221  break;
1222 
1223  now = GetCurrentTimestamp();
1224 
1225  /* die if timeout was reached */
1226  WalSndCheckTimeOut(now);
1227 
1228  /* Send keepalive if the time has come */
1230 
1231  sleeptime = WalSndComputeSleeptime(now);
1232 
1233  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1235 
1236  /* Sleep until something happens or we time out */
1237  WaitLatchOrSocket(MyLatch, wakeEvents,
1238  MyProcPort->sock, sleeptime,
1240  }
1241 
1242  /* reactivate latch so WalSndLoop knows to continue */
1243  SetLatch(MyLatch);
1244 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
pgsocket sock
Definition: libpq-be.h:118
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:182
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
void SyncRepInitConfig(void)
Definition: syncrep.c:381
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:228
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3321
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2003
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2045
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static StringInfoData tmpbuf
Definition: walsender.c:158
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:66
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1570
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndXLogSendHandler ( SIGNAL_ARGS  )
static

Definition at line 2901 of file walsender.c.

References latch_sigusr1_handler().

Referenced by WalSndSignals().

2902 {
2903  int save_errno = errno;
2904 
2906 
2907  errno = save_errno;
2908 }
void latch_sigusr1_handler(void)
Definition: latch.c:1467
static void XLogRead ( char *  buf,
XLogRecPtr  startptr,
Size  count 
)
static

Definition at line 2304 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), buf, CheckXLogRemoved(), close, curFileTimeLine, ereport, errcode_for_file_access(), errmsg(), ERROR, MAXPGPATH, WalSnd::mutex, MyWalSnd, WalSnd::needreload, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sendFile, sendOff, sendSegNo, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, WAIT_EVENT_WAL_READ, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegSize.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2305 {
2306  char *p;
2307  XLogRecPtr recptr;
2308  Size nbytes;
2309  XLogSegNo segno;
2310 
2311 retry:
2312  p = buf;
2313  recptr = startptr;
2314  nbytes = count;
2315 
2316  while (nbytes > 0)
2317  {
2318  uint32 startoff;
2319  int segbytes;
2320  int readbytes;
2321 
2322  startoff = recptr % XLogSegSize;
2323 
2324  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2325  {
2326  char path[MAXPGPATH];
2327 
2328  /* Switch to another logfile segment */
2329  if (sendFile >= 0)
2330  close(sendFile);
2331 
2332  XLByteToSeg(recptr, sendSegNo);
2333 
2334  /*-------
2335  * When reading from a historic timeline, and there is a timeline
2336  * switch within this segment, read from the WAL segment belonging
2337  * to the new timeline.
2338  *
2339  * For example, imagine that this server is currently on timeline
2340  * 5, and we're streaming timeline 4. The switch from timeline 4
2341  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2342  *
2343  * ...
2344  * 000000040000000000000012
2345  * 000000040000000000000013
2346  * 000000050000000000000013
2347  * 000000050000000000000014
2348  * ...
2349  *
2350  * In this situation, when requested to send the WAL from
2351  * segment 0x13, on timeline 4, we read the WAL from file
2352  * 000000050000000000000013. Archive recovery prefers files from
2353  * newer timelines, so if the segment was restored from the
2354  * archive on this server, the file belonging to the old timeline,
2355  * 000000040000000000000013, might not exist. Their contents are
2356  * equal up to the switchpoint, because at a timeline switch, the
2357  * used portion of the old segment is copied to the new file.
2358  *-------
2359  */
2362  {
2363  XLogSegNo endSegNo;
2364 
2366  if (sendSegNo == endSegNo)
2368  }
2369 
2371 
2372  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2373  if (sendFile < 0)
2374  {
2375  /*
2376  * If the file is not found, assume it's because the standby
2377  * asked for a too old WAL segment that has already been
2378  * removed or recycled.
2379  */
2380  if (errno == ENOENT)
2381  ereport(ERROR,
2383  errmsg("requested WAL segment %s has already been removed",
2385  else
2386  ereport(ERROR,
2388  errmsg("could not open file \"%s\": %m",
2389  path)));
2390  }
2391  sendOff = 0;
2392  }
2393 
2394  /* Need to seek in the file? */
2395  if (sendOff != startoff)
2396  {
2397  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2398  ereport(ERROR,
2400  errmsg("could not seek in log segment %s to offset %u: %m",
2402  startoff)));
2403  sendOff = startoff;
2404  }
2405 
2406  /* How many bytes are within this segment? */
2407  if (nbytes > (XLogSegSize - startoff))
2408  segbytes = XLogSegSize - startoff;
2409  else
2410  segbytes = nbytes;
2411 
2413  readbytes = read(sendFile, p, segbytes);
2415  if (readbytes <= 0)
2416  {
2417  ereport(ERROR,
2419  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2421  sendOff, (unsigned long) segbytes)));
2422  }
2423 
2424  /* Update state for read */
2425  recptr += readbytes;
2426 
2427  sendOff += readbytes;
2428  nbytes -= readbytes;
2429  p += readbytes;
2430  }
2431 
2432  /*
2433  * After reading into the buffer, check that what we read was valid. We do
2434  * this after reading, because even though the segment was present when we
2435  * opened it, it might get recycled or removed while we read it. The
2436  * read() succeeds in that case, but the data we tried to read might
2437  * already have been overwritten with new WAL records.
2438  */
2439  XLByteToSeg(startptr, segno);
2441 
2442  /*
2443  * During recovery, the currently-open WAL file might be replaced with the
2444  * file of the same name retrieved from archive. So we always need to
2445  * check what we read was valid after reading into the buffer. If it's
2446  * invalid, we try to open and read the file again.
2447  */
2449  {
2450  WalSnd *walsnd = MyWalSnd;
2451  bool reload;
2452 
2453  SpinLockAcquire(&walsnd->mutex);
2454  reload = walsnd->needreload;
2455  walsnd->needreload = false;
2456  SpinLockRelease(&walsnd->mutex);
2457 
2458  if (reload && sendFile >= 0)
2459  {
2460  close(sendFile);
2461  sendFile = -1;
2462 
2463  goto retry;
2464  }
2465  }
2466 }
#define XLogSegSize
Definition: xlog_internal.h:92
static int sendFile
Definition: walsender.c:131
#define PG_BINARY
Definition: c.h:1038
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3766
#define MAXPGPATH
static TimeLineID curFileTimeLine
Definition: walsender.c:136
#define XLogFilePath(path, tli, logSegNo)
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10126
static char * buf
Definition: pg_test_fsync.c:66
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
#define ereport(elevel, rest)
Definition: elog.h:122
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:108
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define XLByteToSeg(xlrp, logSegNo)
static TimeLineID sendTimeLine
Definition: walsender.c:144
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool needreload
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:145
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:147
static XLogSegNo sendSegNo
Definition: walsender.c:132
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 sendOff
Definition: walsender.c:133
#define close(a)
Definition: win32.h:12
#define XLByteInSeg(xlrp, logSegNo)
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:146
bool am_cascading_walsender
Definition: walsender.c:112
int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:936
static void XLogSendLogical ( void  )
static

Definition at line 2738 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, NULL, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2739 {
2740  XLogRecord *record;
2741  char *errm;
2742 
2743  /*
2744  * Don't know whether we've caught up yet. We'll set it to true in
2745  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2746  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2747  * i.e. when we're shutting down.
2748  */
2749  WalSndCaughtUp = false;
2750 
2753 
2754  /* xlog record was invalid */
2755  if (errm != NULL)
2756  elog(ERROR, "%s", errm);
2757 
2758  if (record != NULL)
2759  {
2760  /*
2761  * Note the lack of any call to LagTrackerWrite() which is handled by
2762  * WalSndUpdateProgress which is called by output plugin through
2763  * logical decoding write api.
2764  */
2766 
2768  }
2769  else
2770  {
2771  /*
2772  * If the record we just wanted read is at or beyond the flushed
2773  * point, then we're caught up.
2774  */
2776  WalSndCaughtUp = true;
2777  }
2778 
2779  /* Update shared memory status */
2780  {
2781  WalSnd *walsnd = MyWalSnd;
2782 
2783  SpinLockAcquire(&walsnd->mutex);
2784  walsnd->sentPtr = sentPtr;
2785  SpinLockRelease(&walsnd->mutex);
2786  }
2787 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8222
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:194
static XLogRecPtr logical_startptr
Definition: walsender.c:195
static bool WalSndCaughtUp
Definition: walsender.c:179
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:108
static XLogRecPtr sentPtr
Definition: walsender.c:153
#define NULL
Definition: c.h:229
XLogReaderState * reader
Definition: logical.h:44
#define elog
Definition: elog.h:219
static void XLogSendPhysical ( void  )
static

Definition at line 2479 of file walsender.c.

References am_cascading_walsender, Assert, close, StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), LagTrackerWrite(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, NULL, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendFile, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, WalSndCaughtUp, and XLogRead().

Referenced by StartReplication().

2480 {
2481  XLogRecPtr SendRqstPtr;
2482  XLogRecPtr startptr;
2483  XLogRecPtr endptr;
2484  Size nbytes;
2485 
2487  {
2488  WalSndCaughtUp = true;
2489  return;
2490  }
2491 
2492  /* Figure out how far we can safely send the WAL. */
2494  {
2495  /*
2496  * Streaming an old timeline that's in this server's history, but is
2497  * not the one we're currently inserting or replaying. It can be
2498  * streamed up to the point where we switched off that timeline.
2499  */
2500  SendRqstPtr = sendTimeLineValidUpto;
2501  }
2502  else if (am_cascading_walsender)
2503  {
2504  /*
2505  * Streaming the latest timeline on a standby.
2506  *
2507  * Attempt to send all WAL that has already been replayed, so that we
2508  * know it's valid. If we're receiving WAL through streaming
2509  * replication, it's also OK to send any WAL that has been received
2510  * but not replayed.
2511  *
2512  * The timeline we're recovering from can change, or we can be
2513  * promoted. In either case, the current timeline becomes historic. We
2514  * need to detect that so that we don't try to stream past the point
2515  * where we switched to another timeline. We check for promotion or
2516  * timeline switch after calculating FlushPtr, to avoid a race
2517  * condition: if the timeline becomes historic just after we checked
2518  * that it was still current, it's still be OK to stream it up to the
2519  * FlushPtr that was calculated before it became historic.
2520  */
2521  bool becameHistoric = false;
2522 
2523  SendRqstPtr = GetStandbyFlushRecPtr();
2524 
2525  if (!RecoveryInProgress())
2526  {
2527  /*
2528  * We have been promoted. RecoveryInProgress() updated
2529  * ThisTimeLineID to the new current timeline.
2530  */
2531  am_cascading_walsender = false;
2532  becameHistoric = true;
2533  }
2534  else
2535  {
2536  /*
2537  * Still a cascading standby. But is the timeline we're sending
2538  * still the one recovery is recovering from? ThisTimeLineID was
2539  * updated by the GetStandbyFlushRecPtr() call above.
2540  */
2542  becameHistoric = true;
2543  }
2544 
2545  if (becameHistoric)
2546  {
2547  /*
2548  * The timeline we were sending has become historic. Read the
2549  * timeline history file of the new timeline to see where exactly
2550  * we forked off from the timeline we were sending.
2551  */
2552  List *history;
2553 
2556 
2558  list_free_deep(history);
2559 
2560  sendTimeLineIsHistoric = true;
2561 
2562  SendRqstPtr = sendTimeLineValidUpto;
2563  }
2564  }
2565  else
2566  {
2567  /*
2568  * Streaming the current timeline on a master.
2569  *
2570  * Attempt to send all data that's already been written out and
2571  * fsync'd to disk. We cannot go further than what's been written out
2572  * given the current implementation of XLogRead(). And in any case
2573  * it's unsafe to send WAL that is not securely down to disk on the
2574  * master: if the master subsequently crashes and restarts, slaves
2575  * must not have applied any WAL that got lost on the master.
2576  */
2577  SendRqstPtr = GetFlushRecPtr();
2578  }
2579 
2580  /*
2581  * Record the current system time as an approximation of the time at which
2582  * this WAL location was written for the purposes of lag tracking.
2583  *
2584  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2585  * is flushed and we could get that time as well as the LSN when we call
2586  * GetFlushRecPtr() above (and likewise for the cascading standby
2587  * equivalent), but rather than putting any new code into the hot WAL path
2588  * it seems good enough to capture the time here. We should reach this
2589  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2590  * may take some time, we read the WAL flush pointer and take the time
2591  * very close to together here so that we'll get a later position if it is
2592  * still moving.
2593  *
2594  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2595  * this gives us a cheap approximation for the WAL flush time for this
2596  * LSN.
2597  *
2598  * Note that the LSN is not necessarily the LSN for the data contained in
2599  * the present message; it's the end of the WAL, which might be further
2600  * ahead. All the lag tracking machinery cares about is finding out when
2601  * that arbitrary LSN is eventually reported as written, flushed and
2602  * applied, so that it can measure the elapsed time.
2603  */
2604  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2605 
2606  /*
2607  * If this is a historic timeline and we've reached the point where we
2608  * forked to the next timeline, stop streaming.
2609  *
2610  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2611  * startup process will normally replay all WAL that has been received
2612  * from the master, before promoting, but if the WAL streaming is
2613  * terminated at a WAL page boundary, the valid portion of the timeline
2614  * might end in the middle of a WAL record. We might've already sent the
2615  * first half of that partial WAL record to the cascading standby, so that
2616  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2617  * replay the partial WAL record either, so it can still follow our
2618  * timeline switch.
2619  */
2621  {
2622  /* close the current file. */
2623  if (sendFile >= 0)
2624  close(sendFile);
2625  sendFile = -1;
2626 
2627  /* Send CopyDone */
2628  pq_putmessage_noblock('c', NULL, 0);
2629  streamingDoneSending = true;
2630 
2631  WalSndCaughtUp = true;
2632 
2633  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2635  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2636  return;
2637  }
2638 
2639  /* Do we have any work to do? */
2640  Assert(sentPtr <= SendRqstPtr);
2641  if (SendRqstPtr <= sentPtr)
2642  {
2643  WalSndCaughtUp = true;
2644  return;
2645  }
2646 
2647  /*
2648  * Figure out how much to send in one message. If there's no more than
2649  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2650  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2651  *
2652  * The rounding is not only for performance reasons. Walreceiver relies on
2653  * the fact that we never split a WAL record across two messages. Since a
2654  * long WAL record is split at page boundary into continuation records,
2655  * page boundary is always a safe cut-off point. We also assume that
2656  * SendRqstPtr never points to the middle of a WAL record.
2657  */
2658  startptr = sentPtr;
2659  endptr = startptr;
2660  endptr += MAX_SEND_SIZE;
2661 
2662  /* if we went beyond SendRqstPtr, back off */
2663  if (SendRqstPtr <= endptr)
2664  {
2665  endptr = SendRqstPtr;
2667  WalSndCaughtUp = false;
2668  else
2669  WalSndCaughtUp = true;
2670  }
2671  else
2672  {
2673  /* round down to page boundary. */
2674  endptr -= (endptr % XLOG_BLCKSZ);
2675  WalSndCaughtUp = false;
2676  }
2677 
2678  nbytes = endptr - startptr;
2679  Assert(nbytes <= MAX_SEND_SIZE);
2680 
2681  /*
2682  * OK to read and send the slice.
2683  */
2685  pq_sendbyte(&output_message, 'w');
2686 
2687  pq_sendint64(&output_message, startptr); /* dataStart */
2688  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2689  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2690 
2691  /*
2692  * Read the log directly into the output buffer to avoid extra memcpy
2693  * calls.
2694  */
2696  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2697  output_message.len += nbytes;
2699 
2700  /*
2701  * Fill the send timest