PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   11
 

Typedefs

typedef void(* WalSndSendDataCallback )(void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (TimestampTz now)
 
static void XLogRead (char *buf, XLogRecPtr startptr, Size count)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (TimestampTz now)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static int sendFile = -1
 
static XLogSegNo sendSegNo = 0
 
static uint32 sendOff = 0
 
static TimeLineID curFileTimeLine = 0
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
struct {
   XLogRecPtr   last_lsn
 
   WalTimeSample   buffer [LAG_TRACKER_BUFFER_SIZE]
 
   int   write_head
 
   int   read_heads [NUM_SYNC_REP_WAIT_MODE]
 
   WalTimeSample   last_read [NUM_SYNC_REP_WAIT_MODE]
 
LagTracker
 

Macro Definition Documentation

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 208 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 105 of file walsender.c.

Referenced by XLogSendPhysical().

#define PG_STAT_GET_WAL_SENDERS_COLS   11

Referenced by pg_stat_get_wal_senders().

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

typedef void(* WalSndSendDataCallback)(void)

Definition at line 224 of file walsender.c.

Function Documentation

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 840 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, NULL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf(), CreateReplicationSlotCmd::temporary, TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), and main().

841 {
842  const char *snapshot_name = NULL;
843  char xloc[MAXFNAMELEN];
844  char *slot_name;
845  bool reserve_wal = false;
846  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
847  DestReceiver *dest;
848  TupOutputState *tstate;
849  TupleDesc tupdesc;
850  Datum values[4];
851  bool nulls[4];
852 
854 
855  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
856 
857  /* setup state for XLogReadPage */
858  sendTimeLineIsHistoric = false;
860 
861  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
862  {
863  ReplicationSlotCreate(cmd->slotname, false,
865  }
866  else
867  {
869 
870  /*
871  * Initially create persistent slot as ephemeral - that allows us to
872  * nicely handle errors during initialization because it'll get
873  * dropped if this transaction fails. We'll make it persistent at the
874  * end. Temporary slots can be created as temporary from beginning as
875  * they get dropped on error as well.
876  */
877  ReplicationSlotCreate(cmd->slotname, true,
879  }
880 
881  if (cmd->kind == REPLICATION_KIND_LOGICAL)
882  {
884  bool need_full_snapshot = false;
885 
886  /*
887  * Do options check early so that we can bail before calling the
888  * DecodingContextFindStartpoint which can take long time.
889  */
890  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
891  {
892  if (IsTransactionBlock())
893  ereport(ERROR,
894  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
895  "must not be called inside a transaction")));
896 
897  need_full_snapshot = true;
898  }
899  else if (snapshot_action == CRS_USE_SNAPSHOT)
900  {
901  if (!IsTransactionBlock())
902  ereport(ERROR,
903  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
904  "must be called inside a transaction")));
905 
907  ereport(ERROR,
908  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
909  "must be called in REPEATABLE READ isolation mode transaction")));
910 
911  if (FirstSnapshotSet)
912  ereport(ERROR,
913  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
914  "must be called before any query")));
915 
916  if (IsSubTransaction())
917  ereport(ERROR,
918  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
919  "must not be called in a subtransaction")));
920 
921  need_full_snapshot = true;
922  }
923 
924  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
928 
929  /*
930  * Signal that we don't need the timeout mechanism. We're just
931  * creating the replication slot and don't yet accept feedback
932  * messages or send keepalives. As we possibly need to wait for
933  * further WAL the walsender would otherwise possibly be killed too
934  * soon.
935  */
937 
938  /* build initial snapshot, might take a while */
940 
941  /*
942  * Export or use the snapshot if we've been asked to do so.
943  *
944  * NB. We will convert the snapbuild.c kind of snapshot to normal
945  * snapshot when doing this.
946  */
947  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
948  {
949  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
950  }
951  else if (snapshot_action == CRS_USE_SNAPSHOT)
952  {
953  Snapshot snap;
954 
957  }
958 
959  /* don't need the decoding context anymore */
960  FreeDecodingContext(ctx);
961 
962  if (!cmd->temporary)
964  }
965  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
966  {
968 
970 
971  /* Write this slot to disk if it's a permanent one. */
972  if (!cmd->temporary)
974  }
975 
976  snprintf(xloc, sizeof(xloc), "%X/%X",
979 
981  MemSet(nulls, false, sizeof(nulls));
982 
983  /*----------
984  * Need a tuple descriptor representing four columns:
985  * - first field: the slot name
986  * - second field: LSN at which we became consistent
987  * - third field: exported snapshot's name
988  * - fourth field: output plugin
989  *----------
990  */
991  tupdesc = CreateTemplateTupleDesc(4, false);
992  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
993  TEXTOID, -1, 0);
994  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
995  TEXTOID, -1, 0);
996  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
997  TEXTOID, -1, 0);
998  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
999  TEXTOID, -1, 0);
1000 
1001  /* prepare for projection of tuples */
1002  tstate = begin_tup_output_tupdesc(dest, tupdesc);
1003 
1004  /* slot_name */
1005  slot_name = NameStr(MyReplicationSlot->data.name);
1006  values[0] = CStringGetTextDatum(slot_name);
1007 
1008  /* consistent wal location */
1009  values[1] = CStringGetTextDatum(xloc);
1010 
1011  /* snapshot name, or NULL if none */
1012  if (snapshot_name != NULL)
1013  values[2] = CStringGetTextDatum(snapshot_name);
1014  else
1015  nulls[2] = true;
1016 
1017  /* plugin, or NULL if none */
1018  if (cmd->plugin != NULL)
1019  values[3] = CStringGetTextDatum(cmd->plugin);
1020  else
1021  nulls[3] = true;
1022 
1023  /* send it to dest */
1024  do_tup_output(tstate, values, nulls);
1025  end_tup_output(tstate);
1026 
1028 }
#define NIL
Definition: pg_list.h:69
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:787
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:30
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2185
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:220
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ReplicationSlotSave(void)
Definition: slot.c:577
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:216
ReplicationSlotPersistentData data
Definition: slot.h:115
XLogRecPtr confirmed_flush
Definition: slot.h:76
ReplicationKind kind
Definition: replnodes.h:56
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:752
bool IsTransactionBlock(void)
Definition: xact.c:4305
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:635
void ReplicationSlotReserveWal(void)
Definition: slot.c:924
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:432
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:203
void ReplicationSlotPersist(void)
Definition: slot.c:612
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1160
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:373
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1133
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:372
TimeLineID ThisTimeLineID
Definition: xlog.c:179
struct SnapBuild * snapshot_builder
Definition: logical.h:46
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
static TimeLineID sendTimeLine
Definition: walsender.c:148
#define Assert(condition)
Definition: c.h:675
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:475
int XactIsoLevel
Definition: xact.c:74
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
bool IsSubTransaction(void)
Definition: xact.c:4377
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:545
static Datum values[MAXATTR]
Definition: bootstrap.c:163
int errmsg(const char *fmt,...)
Definition: elog.c:797
CRSSnapshotAction
Definition: walsender.h:22
#define NameStr(name)
Definition: c.h:499
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
void ReplicationSlotMarkDirty(void)
Definition: slot.c:595
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1250
static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1034 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), and DropReplicationSlotCmd::slotname.

Referenced by exec_replication_command(), and main().

1035 {
1037  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1038 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void ReplicationSlotDrop(const char *name)
Definition: slot.c:454
bool exec_replication_command ( const char *  cmd_string)

Definition at line 1411 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, PreventTransactionChain(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), WalSnd::state, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1412 {
1413  int parse_rc;
1414  Node *cmd_node;
1415  MemoryContext cmd_context;
1416  MemoryContext old_context;
1417 
1418  /*
1419  * If WAL sender has been told that shutdown is getting close, switch its
1420  * status accordingly to handle the next replication commands correctly.
1421  */
1422  if (got_STOPPING)
1424 
1425  /*
1426  * Throw error if in stopping mode. We need prevent commands that could
1427  * generate WAL while the shutdown checkpoint is being written. To be
1428  * safe, we just prohibit all new commands.
1429  */
1431  ereport(ERROR,
1432  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1433 
1434  /*
1435  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1436  * command arrives. Clean up the old stuff if there's anything.
1437  */
1439 
1441 
1443  "Replication command context",
1445  old_context = MemoryContextSwitchTo(cmd_context);
1446 
1447  replication_scanner_init(cmd_string);
1448  parse_rc = replication_yyparse();
1449  if (parse_rc != 0)
1450  ereport(ERROR,
1451  (errcode(ERRCODE_SYNTAX_ERROR),
1452  (errmsg_internal("replication command parser returned %d",
1453  parse_rc))));
1454 
1455  cmd_node = replication_parse_result;
1456 
1457  /*
1458  * Log replication command if log_replication_commands is enabled. Even
1459  * when it's disabled, log the command with DEBUG1 level for backward
1460  * compatibility. Note that SQL commands are not logged here, and will be
1461  * logged later if log_statement is enabled.
1462  */
1463  if (cmd_node->type != T_SQLCmd)
1465  (errmsg("received replication command: %s", cmd_string)));
1466 
1467  /*
1468  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1469  * called outside of transaction the snapshot should be cleared here.
1470  */
1471  if (!IsTransactionBlock())
1473 
1474  /*
1475  * For aborted transactions, don't allow anything except pure SQL, the
1476  * exec_simple_query() will handle it correctly.
1477  */
1478  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1479  ereport(ERROR,
1480  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1481  errmsg("current transaction is aborted, "
1482  "commands ignored until end of transaction block")));
1483 
1485 
1486  /*
1487  * Allocate buffers that will be used for each outgoing and incoming
1488  * message. We do this just once per command to reduce palloc overhead.
1489  */
1493 
1494  switch (cmd_node->type)
1495  {
1496  case T_IdentifySystemCmd:
1497  IdentifySystem();
1498  break;
1499 
1500  case T_BaseBackupCmd:
1501  PreventTransactionChain(true, "BASE_BACKUP");
1502  SendBaseBackup((BaseBackupCmd *) cmd_node);
1503  break;
1504 
1507  break;
1508 
1511  break;
1512 
1513  case T_StartReplicationCmd:
1514  {
1515  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1516 
1517  PreventTransactionChain(true, "START_REPLICATION");
1518 
1519  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1520  StartReplication(cmd);
1521  else
1523  break;
1524  }
1525 
1526  case T_TimeLineHistoryCmd:
1527  PreventTransactionChain(true, "TIMELINE_HISTORY");
1529  break;
1530 
1531  case T_VariableShowStmt:
1532  {
1534  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1535 
1536  GetPGVariable(n->name, dest);
1537  }
1538  break;
1539 
1540  case T_SQLCmd:
1541  if (MyDatabaseId == InvalidOid)
1542  ereport(ERROR,
1543  (errmsg("not connected to database")));
1544 
1545  /* Tell the caller that this wasn't a WalSender command. */
1546  return false;
1547 
1548  default:
1549  elog(ERROR, "unrecognized replication command node tag: %u",
1550  cmd_node->type);
1551  }
1552 
1553  /* done */
1554  MemoryContextSwitchTo(old_context);
1555  MemoryContextDelete(cmd_context);
1556 
1557  /* Send CommandComplete message */
1558  EndCommand("SELECT", DestRemote);
1559 
1560  return true;
1561 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:427
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1034
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:370
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:509
static StringInfoData output_message
Definition: walsender.c:160
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7894
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4305
ReplicationKind kind
Definition: replnodes.h:81
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:686
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
WalSndState state
NodeTag type
Definition: nodes.h:511
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:523
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static StringInfoData reply_message
Definition: walsender.c:161
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1045
Oid MyDatabaseId
Definition: globals.c:77
WalSnd * MyWalSnd
Definition: walsender.c:111
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
void WalSndSetState(WalSndState state)
Definition: walsender.c:3091
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:840
static StringInfoData tmpbuf
Definition: walsender.c:162
bool log_replication_commands
Definition: walsender.c:124
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
#define elog
Definition: elog.h:219
static void IdentifySystem(void)
Definition: walsender.c:338
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:696
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3155
void replication_scanner_init(const char *query_string)
static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2843 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), NULL, receiveTLI, result, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2844 {
2845  XLogRecPtr replayPtr;
2846  TimeLineID replayTLI;
2847  XLogRecPtr receivePtr;
2850 
2851  /*
2852  * We can safely send what's already been replayed. Also, if walreceiver
2853  * is streaming WAL from the same timeline, we can send anything that it
2854  * has streamed, but hasn't been replayed yet.
2855  */
2856 
2857  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2858  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2859 
2860  ThisTimeLineID = replayTLI;
2861 
2862  result = replayPtr;
2863  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2864  result = receivePtr;
2865 
2866  return result;
2867 }
uint32 TimeLineID
Definition: xlogdefs.h:45
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
return result
Definition: formatting.c:1633
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11088
static TimeLineID receiveTLI
Definition: xlog.c:201
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void HandleWalSndInitStopping ( void  )

Definition at line 2894 of file walsender.c.

References am_walsender, Assert, got_STOPPING, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

2895 {
2897 
2898  /*
2899  * If replication has not yet started, die like with SIGTERM. If
2900  * replication is active, only set a flag and wake up the main loop. It
2901  * will send any outstanding WAL, wait for it to be replicated to the
2902  * standby, and then exit gracefully.
2903  */
2904  if (!replication_active)
2905  kill(MyProcPid, SIGTERM);
2906  else
2907  got_STOPPING = true;
2908 }
int MyProcPid
Definition: globals.c:39
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
bool am_walsender
Definition: walsender.c:114
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:675
static void IdentifySystem ( void  )
static

Definition at line 338 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, INT4OID, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, NULL, RecoveryInProgress(), snprintf(), StartTransactionCommand(), TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

339 {
340  char sysid[32];
341  char xloc[MAXFNAMELEN];
342  XLogRecPtr logptr;
343  char *dbname = NULL;
344  DestReceiver *dest;
345  TupOutputState *tstate;
346  TupleDesc tupdesc;
347  Datum values[4];
348  bool nulls[4];
349 
350  /*
351  * Reply with a result set with one row, four columns. First col is system
352  * ID, second is timeline ID, third is current xlog location and the
353  * fourth contains the database name if we are connected to one.
354  */
355 
356  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
358 
361  {
362  /* this also updates ThisTimeLineID */
363  logptr = GetStandbyFlushRecPtr();
364  }
365  else
366  logptr = GetFlushRecPtr();
367 
368  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
369 
370  if (MyDatabaseId != InvalidOid)
371  {
373 
374  /* syscache access needs a transaction env. */
376  /* make dbname live outside TX context */
380  /* CommitTransactionCommand switches to TopMemoryContext */
382  }
383 
385  MemSet(nulls, false, sizeof(nulls));
386 
387  /* need a tuple descriptor representing four columns */
388  tupdesc = CreateTemplateTupleDesc(4, false);
389  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
390  TEXTOID, -1, 0);
391  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
392  INT4OID, -1, 0);
393  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
394  TEXTOID, -1, 0);
395  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
396  TEXTOID, -1, 0);
397 
398  /* prepare for projection of tuples */
399  tstate = begin_tup_output_tupdesc(dest, tupdesc);
400 
401  /* column 1: system identifier */
402  values[0] = CStringGetTextDatum(sysid);
403 
404  /* column 2: timeline */
405  values[1] = Int32GetDatum(ThisTimeLineID);
406 
407  /* column 3: wal location */
408  values[2] = CStringGetTextDatum(xloc);
409 
410  /* column 4: database name, or NULL if none */
411  if (dbname)
412  values[3] = CStringGetTextDatum(dbname);
413  else
414  nulls[3] = true;
415 
416  /* send it to dest */
417  do_tup_output(tstate, values, nulls);
418 
419  end_tup_output(tstate);
420 }
#define TEXTOID
Definition: pg_type.h:324
void CommitTransactionCommand(void)
Definition: xact.c:2748
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define INT4OID
Definition: pg_type.h:316
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8221
bool RecoveryInProgress(void)
Definition: xlog.c:7872
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2056
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
unsigned int uint32
Definition: c.h:268
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:77
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2678
char * dbname
Definition: streamutil.c:38
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static Datum values[MAXATTR]
Definition: bootstrap.c:163
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4689
#define Int32GetDatum(X)
Definition: postgres.h:485
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2843
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:316
bool am_cascading_walsender
Definition: walsender.c:115
static void InitWalSenderSlot ( void  )
static

Definition at line 2206 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, ereport, errcode(), errmsg(), FATAL, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, NULL, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

Referenced by WalSndShutdown().

2207 {
2208  int i;
2209 
2210  /*
2211  * WalSndCtl should be set up already (we inherit this by fork() or
2212  * EXEC_BACKEND mechanism from the postmaster).
2213  */
2214  Assert(WalSndCtl != NULL);
2215  Assert(MyWalSnd == NULL);
2216 
2217  /*
2218  * Find a free walsender slot and reserve it. If this fails, we must be
2219  * out of WalSnd structures.
2220  */
2221  for (i = 0; i < max_wal_senders; i++)
2222  {
2223  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2224 
2225  SpinLockAcquire(&walsnd->mutex);
2226 
2227  if (walsnd->pid != 0)
2228  {
2229  SpinLockRelease(&walsnd->mutex);
2230  continue;
2231  }
2232  else
2233  {
2234  /*
2235  * Found a free slot. Reserve it for us.
2236  */
2237  walsnd->pid = MyProcPid;
2238  walsnd->sentPtr = InvalidXLogRecPtr;
2239  walsnd->write = InvalidXLogRecPtr;
2240  walsnd->flush = InvalidXLogRecPtr;
2241  walsnd->apply = InvalidXLogRecPtr;
2242  walsnd->writeLag = -1;
2243  walsnd->flushLag = -1;
2244  walsnd->applyLag = -1;
2245  walsnd->state = WALSNDSTATE_STARTUP;
2246  walsnd->latch = &MyProc->procLatch;
2247  SpinLockRelease(&walsnd->mutex);
2248  /* don't need the lock anymore */
2249  MyWalSnd = (WalSnd *) walsnd;
2250 
2251  break;
2252  }
2253  }
2254  if (MyWalSnd == NULL)
2255  ereport(FATAL,
2256  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2257  errmsg("number of requested standby connections "
2258  "exceeds max_wal_senders (currently %d)",
2259  max_wal_senders)));
2260 
2261  /* Arrange to clean up at walsender exit */
2263 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:39
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
int errcode(int sqlerrcode)
Definition: elog.c:575
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2267
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:103
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define FATAL
Definition: elog.h:52
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeOffset applyLag
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
XLogRecPtr apply
static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3431 of file walsender.c.

References Assert, LAG_TRACKER_BUFFER_SIZE, LagTracker, WalTimeSample::lsn, next, and WalTimeSample::time.

Referenced by ProcessStandbyReplyMessage().

3432 {
3433  TimestampTz time = 0;
3434 
3435  /* Read all unread samples up to this LSN or end of buffer. */
3436  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3437  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3438  {
3439  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3440  LagTracker.last_read[head] =
3441  LagTracker.buffer[LagTracker.read_heads[head]];
3442  LagTracker.read_heads[head] =
3443  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3444  }
3445 
3446  /*
3447  * If the lag tracker is empty, that means the standby has processed
3448  * everything we've ever sent so we should now clear 'last_read'. If we
3449  * didn't do that, we'd risk using a stale and irrelevant sample for
3450  * interpolation at the beginning of the next burst of WAL after a period
3451  * of idleness.
3452  */
3453  if (LagTracker.read_heads[head] == LagTracker.write_head)
3454  LagTracker.last_read[head].time = 0;
3455 
3456  if (time > now)
3457  {
3458  /* If the clock somehow went backwards, treat as not found. */
3459  return -1;
3460  }
3461  else if (time == 0)
3462  {
3463  /*
3464  * We didn't cross a time. If there is a future sample that we
3465  * haven't reached yet, and we've already reached at least one sample,
3466  * let's interpolate the local flushed time. This is mainly useful
3467  * for reporting a completely stuck apply position as having
3468  * increasing lag, since otherwise we'd have to wait for it to
3469  * eventually start moving again and cross one of our samples before
3470  * we can show the lag increasing.
3471  */
3472  if (LagTracker.read_heads[head] == LagTracker.write_head)
3473  {
3474  /* There are no future samples, so we can't interpolate. */
3475  return -1;
3476  }
3477  else if (LagTracker.last_read[head].time != 0)
3478  {
3479  /* We can interpolate between last_read and the next sample. */
3480  double fraction;
3481  WalTimeSample prev = LagTracker.last_read[head];
3482  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3483 
3484  if (lsn < prev.lsn)
3485  {
3486  /*
3487  * Reported LSNs shouldn't normally go backwards, but it's
3488  * possible when there is a timeline change. Treat as not
3489  * found.
3490  */
3491  return -1;
3492  }
3493 
3494  Assert(prev.lsn < next.lsn);
3495 
3496  if (prev.time > next.time)
3497  {
3498  /* If the clock somehow went backwards, treat as not found. */
3499  return -1;
3500  }
3501 
3502  /* See how far we are between the previous and next samples. */
3503  fraction =
3504  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3505 
3506  /* Scale the local flush time proportionally. */
3507  time = (TimestampTz)
3508  ((double) prev.time + (next.time - prev.time) * fraction);
3509  }
3510  else
3511  {
3512  /*
3513  * We have only a future sample, implying that we were entirely
3514  * caught up but and now there is a new burst of WAL and the
3515  * standby hasn't processed the first sample yet. Until the
3516  * standby reaches the future sample the best we can do is report
3517  * the hypothetical lag if that sample were to be replayed now.
3518  */
3519  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3520  }
3521  }
3522 
3523  /* Return the elapsed time since local flush time in microseconds. */
3524  Assert(time != 0);
3525  return now - time;
3526 }
static int32 next
Definition: blutils.c:210
int64 TimestampTz
Definition: timestamp.h:39
TimestampTz time
Definition: walsender.c:204
static struct @27 LagTracker
XLogRecPtr lsn
Definition: walsender.c:203
#define Assert(condition)
Definition: c.h:675
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3366 of file walsender.c.

References am_walsender, i, LAG_TRACKER_BUFFER_SIZE, LagTracker, and NUM_SYNC_REP_WAIT_MODE.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3367 {
3368  bool buffer_full;
3369  int new_write_head;
3370  int i;
3371 
3372  if (!am_walsender)
3373  return;
3374 
3375  /*
3376  * If the lsn hasn't advanced since last time, then do nothing. This way
3377  * we only record a new sample when new WAL has been written.
3378  */
3379  if (LagTracker.last_lsn == lsn)
3380  return;
3381  LagTracker.last_lsn = lsn;
3382 
3383  /*
3384  * If advancing the write head of the circular buffer would crash into any
3385  * of the read heads, then the buffer is full. In other words, the
3386  * slowest reader (presumably apply) is the one that controls the release
3387  * of space.
3388  */
3389  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3390  buffer_full = false;
3391  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3392  {
3393  if (new_write_head == LagTracker.read_heads[i])
3394  buffer_full = true;
3395  }
3396 
3397  /*
3398  * If the buffer is full, for now we just rewind by one slot and overwrite
3399  * the last sample, as a simple (if somewhat uneven) way to lower the
3400  * sampling rate. There may be better adaptive compaction algorithms.
3401  */
3402  if (buffer_full)
3403  {
3404  new_write_head = LagTracker.write_head;
3405  if (LagTracker.write_head > 0)
3406  LagTracker.write_head--;
3407  else
3408  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3409  }
3410 
3411  /* Store a sample at the current write head position. */
3412  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3413  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3414  LagTracker.write_head = new_write_head;
3415 }
bool am_walsender
Definition: walsender.c:114
static struct @27 LagTracker
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page,
TimeLineID pageTLI 
)
static

Definition at line 752 of file walsender.c.

References XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WalSndWaitForWal(), XLogRead(), and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

754 {
755  XLogRecPtr flushptr;
756  int count;
757 
758  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
760  sendTimeLine = state->currTLI;
762  sendTimeLineNextTLI = state->nextTLI;
763 
764  /* make sure we have enough WAL available */
765  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
766 
767  /* more than one block available */
768  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
769  count = XLOG_BLCKSZ;
770  /* not enough WAL synced, that can happen during shutdown */
771  else if (targetPagePtr + reqLen > flushptr)
772  return -1;
773  /* part of the page available */
774  else
775  count = flushptr - targetPagePtr;
776 
777  /* now actually read the data, we know it's there */
778  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
779 
780  return count;
781 }
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:799
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:175
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2295
TimeLineID nextTLI
Definition: xlogreader.h:181
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1272
TimeLineID ThisTimeLineID
Definition: xlog.c:179
TimeLineID currTLI
Definition: xlogreader.h:165
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3129 of file walsender.c.

References Interval::day, Interval::month, palloc(), result, and Interval::time.

Referenced by pg_stat_get_wal_senders().

3130 {
3131  Interval *result = palloc(sizeof(Interval));
3132 
3133  result->month = 0;
3134  result->day = 0;
3135  result->time = offset;
3136 
3137  return result;
3138 }
return result
Definition: formatting.c:1633
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:849
static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 787 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

790 {
791  ListCell *lc;
792  bool snapshot_action_given = false;
793  bool reserve_wal_given = false;
794 
795  /* Parse options */
796  foreach(lc, cmd->options)
797  {
798  DefElem *defel = (DefElem *) lfirst(lc);
799 
800  if (strcmp(defel->defname, "export_snapshot") == 0)
801  {
802  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
803  ereport(ERROR,
804  (errcode(ERRCODE_SYNTAX_ERROR),
805  errmsg("conflicting or redundant options")));
806 
807  snapshot_action_given = true;
808  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
810  }
811  else if (strcmp(defel->defname, "use_snapshot") == 0)
812  {
813  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
814  ereport(ERROR,
815  (errcode(ERRCODE_SYNTAX_ERROR),
816  errmsg("conflicting or redundant options")));
817 
818  snapshot_action_given = true;
819  *snapshot_action = CRS_USE_SNAPSHOT;
820  }
821  else if (strcmp(defel->defname, "reserve_wal") == 0)
822  {
823  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
824  ereport(ERROR,
825  (errcode(ERRCODE_SYNTAX_ERROR),
826  errmsg("conflicting or redundant options")));
827 
828  reserve_wal_given = true;
829  *reserve_wal = true;
830  }
831  else
832  elog(ERROR, "unrecognized option: %s", defel->defname);
833  }
834 }
int errcode(int sqlerrcode)
Definition: elog.c:575
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:719
#define elog
Definition: elog.h:219
Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3145 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), i, Int32GetDatum, IntervalPGetDatum, IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, NULL, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, superuser(), SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3146 {
3147 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3148  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3149  TupleDesc tupdesc;
3150  Tuplestorestate *tupstore;
3151  MemoryContext per_query_ctx;
3152  MemoryContext oldcontext;
3153  List *sync_standbys;
3154  int i;
3155 
3156  /* check to see if caller supports us returning a tuplestore */
3157  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3158  ereport(ERROR,
3159  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3160  errmsg("set-valued function called in context that cannot accept a set")));
3161  if (!(rsinfo->allowedModes & SFRM_Materialize))
3162  ereport(ERROR,
3163  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3164  errmsg("materialize mode required, but it is not " \
3165  "allowed in this context")));
3166 
3167  /* Build a tuple descriptor for our result type */
3168  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3169  elog(ERROR, "return type must be a row type");
3170 
3171  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3172  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3173 
3174  tupstore = tuplestore_begin_heap(true, false, work_mem);
3175  rsinfo->returnMode = SFRM_Materialize;
3176  rsinfo->setResult = tupstore;
3177  rsinfo->setDesc = tupdesc;
3178 
3179  MemoryContextSwitchTo(oldcontext);
3180 
3181  /*
3182  * Get the currently active synchronous standbys.
3183  */
3184  LWLockAcquire(SyncRepLock, LW_SHARED);
3185  sync_standbys = SyncRepGetSyncStandbys(NULL);
3186  LWLockRelease(SyncRepLock);
3187 
3188  for (i = 0; i < max_wal_senders; i++)
3189  {
3190  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3192  XLogRecPtr write;
3193  XLogRecPtr flush;
3194  XLogRecPtr apply;
3195  TimeOffset writeLag;
3196  TimeOffset flushLag;
3197  TimeOffset applyLag;
3198  int priority;
3201  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3202 
3203  if (walsnd->pid == 0)
3204  continue;
3205 
3206  SpinLockAcquire(&walsnd->mutex);
3207  sentPtr = walsnd->sentPtr;
3208  state = walsnd->state;
3209  write = walsnd->write;
3210  flush = walsnd->flush;
3211  apply = walsnd->apply;
3212  writeLag = walsnd->writeLag;
3213  flushLag = walsnd->flushLag;
3214  applyLag = walsnd->applyLag;
3215  priority = walsnd->sync_standby_priority;
3216  SpinLockRelease(&walsnd->mutex);
3217 
3218  memset(nulls, 0, sizeof(nulls));
3219  values[0] = Int32GetDatum(walsnd->pid);
3220 
3221  if (!superuser())
3222  {
3223  /*
3224  * Only superusers can see details. Other users only get the pid
3225  * value to know it's a walsender, but no details.
3226  */
3227  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3228  }
3229  else
3230  {
3231  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3232 
3233  if (XLogRecPtrIsInvalid(sentPtr))
3234  nulls[2] = true;
3235  values[2] = LSNGetDatum(sentPtr);
3236 
3237  if (XLogRecPtrIsInvalid(write))
3238  nulls[3] = true;
3239  values[3] = LSNGetDatum(write);
3240 
3241  if (XLogRecPtrIsInvalid(flush))
3242  nulls[4] = true;
3243  values[4] = LSNGetDatum(flush);
3244 
3245  if (XLogRecPtrIsInvalid(apply))
3246  nulls[5] = true;
3247  values[5] = LSNGetDatum(apply);
3248 
3249  /*
3250  * Treat a standby such as a pg_basebackup background process
3251  * which always returns an invalid flush location, as an
3252  * asynchronous standby.
3253  */
3254  priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
3255 
3256  if (writeLag < 0)
3257  nulls[6] = true;
3258  else
3259  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3260 
3261  if (flushLag < 0)
3262  nulls[7] = true;
3263  else
3264  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3265 
3266  if (applyLag < 0)
3267  nulls[8] = true;
3268  else
3269  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3270 
3271  values[9] = Int32GetDatum(priority);
3272 
3273  /*
3274  * More easily understood version of standby state. This is purely
3275  * informational.
3276  *
3277  * In quorum-based sync replication, the role of each standby
3278  * listed in synchronous_standby_names can be changing very
3279  * frequently. Any standbys considered as "sync" at one moment can
3280  * be switched to "potential" ones at the next moment. So, it's
3281  * basically useless to report "sync" or "potential" as their sync
3282  * states. We report just "quorum" for them.
3283  */
3284  if (priority == 0)
3285  values[10] = CStringGetTextDatum("async");
3286  else if (list_member_int(sync_standbys, i))
3288  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3289  else
3290  values[10] = CStringGetTextDatum("potential");
3291  }
3292 
3293  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3294  }
3295 
3296  /* clean up and return the tuplestore */
3297  tuplestore_donestoring(tupstore);
3298 
3299  return (Datum) 0;
3300 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
#define write(a, b, c)
Definition: win32.h:14
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:857
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:677
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
bool list_member_int(const List *list, int datum)
Definition: list.c:485
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:120
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:372
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3110
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:113
static XLogRecPtr sentPtr
Definition: walsender.c:157
int allowedModes
Definition: execnodes.h:268
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:273
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:266
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:274
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3129
XLogRecPtr apply
Definition: pg_list.h:45
static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1701 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1702 {
1703  bool changed = false;
1705 
1706  Assert(lsn != InvalidXLogRecPtr);
1707  SpinLockAcquire(&slot->mutex);
1708  if (slot->data.restart_lsn != lsn)
1709  {
1710  changed = true;
1711  slot->data.restart_lsn = lsn;
1712  }
1713  SpinLockRelease(&slot->mutex);
1714 
1715  if (changed)
1716  {
1719  }
1720 
1721  /*
1722  * One could argue that the slot should be saved to disk now, but that'd
1723  * be energy wasted - the worst lost information can do here is give us
1724  * wrong information in a statistics view - we'll just potentially be more
1725  * conservative in removing files.
1726  */
1727 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:680
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:675
XLogRecPtr restart_lsn
Definition: slot.h:68
slock_t mutex
Definition: slot.h:88
void ReplicationSlotMarkDirty(void)
Definition: slot.c:595
static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1825 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1826 {
1827  bool changed = false;
1829 
1830  SpinLockAcquire(&slot->mutex);
1832 
1833  /*
1834  * For physical replication we don't need the interlock provided by xmin
1835  * and effective_xmin since the consequences of a missed increase are
1836  * limited to query cancellations, so set both at once.
1837  */
1838  if (!TransactionIdIsNormal(slot->data.xmin) ||
1839  !TransactionIdIsNormal(feedbackXmin) ||
1840  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1841  {
1842  changed = true;
1843  slot->data.xmin = feedbackXmin;
1844  slot->effective_xmin = feedbackXmin;
1845  }
1846  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1847  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1848  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1849  {
1850  changed = true;
1851  slot->data.catalog_xmin = feedbackCatalogXmin;
1852  slot->effective_catalog_xmin = feedbackCatalogXmin;
1853  }
1854  SpinLockRelease(&slot->mutex);
1855 
1856  if (changed)
1857  {
1860  }
1861 }
TransactionId xmin
Definition: proc.h:213
ReplicationSlotPersistentData data
Definition: slot.h:115
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:111
TransactionId catalog_xmin
Definition: slot.h:65
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:57
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:112
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:88
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:634
void ReplicationSlotMarkDirty(void)
Definition: slot.c:595
static void ProcessRepliesIfAny ( void  )
static

Definition at line 1568 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_reply_timestamp, NULL, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1569 {
1570  unsigned char firstchar;
1571  int r;
1572  bool received = false;
1573 
1574  for (;;)
1575  {
1576  pq_startmsgread();
1577  r = pq_getbyte_if_available(&firstchar);
1578  if (r < 0)
1579  {
1580  /* unexpected error or EOF */
1582  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1583  errmsg("unexpected EOF on standby connection")));
1584  proc_exit(0);
1585  }
1586  if (r == 0)
1587  {
1588  /* no data available without blocking */
1589  pq_endmsgread();
1590  break;
1591  }
1592 
1593  /* Read the message contents */
1595  if (pq_getmessage(&reply_message, 0))
1596  {
1598  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1599  errmsg("unexpected EOF on standby connection")));
1600  proc_exit(0);
1601  }
1602 
1603  /*
1604  * If we already received a CopyDone from the frontend, the frontend
1605  * should not send us anything until we've closed our end of the COPY.
1606  * XXX: In theory, the frontend could already send the next command
1607  * before receiving the CopyDone, but libpq doesn't currently allow
1608  * that.
1609  */
1610  if (streamingDoneReceiving && firstchar != 'X')
1611  ereport(FATAL,
1612  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1613  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1614  firstchar)));
1615 
1616  /* Handle the very limited subset of commands expected in this phase */
1617  switch (firstchar)
1618  {
1619  /*
1620  * 'd' means a standby reply wrapped in a CopyData packet.
1621  */
1622  case 'd':
1624  received = true;
1625  break;
1626 
1627  /*
1628  * CopyDone means the standby requested to finish streaming.
1629  * Reply with CopyDone, if we had not sent that already.
1630  */
1631  case 'c':
1632  if (!streamingDoneSending)
1633  {
1634  pq_putmessage_noblock('c', NULL, 0);
1635  streamingDoneSending = true;
1636  }
1637 
1638  streamingDoneReceiving = true;
1639  received = true;
1640  break;
1641 
1642  /*
1643  * 'X' means that the standby is closing down the socket.
1644  */
1645  case 'X':
1646  proc_exit(0);
1647 
1648  default:
1649  ereport(FATAL,
1650  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1651  errmsg("invalid standby message type \"%c\"",
1652  firstchar)));
1653  }
1654  }
1655 
1656  /*
1657  * Save the last reply timestamp if we've received at least one reply.
1658  */
1659  if (received)
1660  {
1662  waiting_for_ping_response = false;
1663  }
1664 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1670
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
void pq_startmsgread(void)
Definition: pqcomm.c:1191
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1021
static bool streamingDoneSending
Definition: walsender.c:179
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:122
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1253
static StringInfoData reply_message
Definition: walsender.c:161
void pq_endmsgread(void)
Definition: pqcomm.c:1215
#define NULL
Definition: c.h:229
static bool streamingDoneReceiving
Definition: walsender.c:180
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1902 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, MyPgXact, MyReplicationSlot, NULL, PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1903 {
1904  TransactionId feedbackXmin;
1905  uint32 feedbackEpoch;
1906  TransactionId feedbackCatalogXmin;
1907  uint32 feedbackCatalogEpoch;
1908 
1909  /*
1910  * Decipher the reply message. The caller already consumed the msgtype
1911  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1912  * of this message.
1913  */
1914  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1915  feedbackXmin = pq_getmsgint(&reply_message, 4);
1916  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1917  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1918  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1919 
1920  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1921  feedbackXmin,
1922  feedbackEpoch,
1923  feedbackCatalogXmin,
1924  feedbackCatalogEpoch);
1925 
1926  /*
1927  * Unset WalSender's xmins if the feedback message values are invalid.
1928  * This happens when the downstream turned hot_standby_feedback off.
1929  */
1930  if (!TransactionIdIsNormal(feedbackXmin)
1931  && !TransactionIdIsNormal(feedbackCatalogXmin))
1932  {
1934  if (MyReplicationSlot != NULL)
1935  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1936  return;
1937  }
1938 
1939  /*
1940  * Check that the provided xmin/epoch are sane, that is, not in the future
1941  * and not so far back as to be already wrapped around. Ignore if not.
1942  */
1943  if (TransactionIdIsNormal(feedbackXmin) &&
1944  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1945  return;
1946 
1947  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1948  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1949  return;
1950 
1951  /*
1952  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1953  * the xmin will be taken into account by GetOldestXmin. This will hold
1954  * back the removal of dead rows and thereby prevent the generation of
1955  * cleanup conflicts on the standby server.
1956  *
1957  * There is a small window for a race condition here: although we just
1958  * checked that feedbackXmin precedes nextXid, the nextXid could have
1959  * gotten advanced between our fetching it and applying the xmin below,
1960  * perhaps far enough to make feedbackXmin wrap around. In that case the
1961  * xmin we set here would be "in the future" and have no effect. No point
1962  * in worrying about this since it's too late to save the desired data
1963  * anyway. Assuming that the standby sends us an increasing sequence of
1964  * xmins, this could only happen during the first reply cycle, else our
1965  * own xmin would prevent nextXid from advancing so far.
1966  *
1967  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1968  * is assumed atomic, and there's no real need to prevent a concurrent
1969  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1970  * safe, and if we're moving it backwards, well, the data is at risk
1971  * already since a VACUUM could have just finished calling GetOldestXmin.)
1972  *
1973  * If we're using a replication slot we reserve the xmin via that,
1974  * otherwise via the walsender's PGXACT entry. We can only track the
1975  * catalog xmin separately when using a slot, so we store the least of the
1976  * two provided when not using a slot.
1977  *
1978  * XXX: It might make sense to generalize the ephemeral slot concept and
1979  * always use the slot mechanism to handle the feedback xmin.
1980  */
1981  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1982  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1983  else
1984  {
1985  if (TransactionIdIsNormal(feedbackCatalogXmin)
1986  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
1987  MyPgXact->xmin = feedbackCatalogXmin;
1988  else
1989  MyPgXact->xmin = feedbackXmin;
1990  }
1991 }
uint32 TransactionId
Definition: c.h:397
TransactionId xmin
Definition: proc.h:213
PGXACT * MyPgXact
Definition: proc.c:68
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1874
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:268
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:161
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1825
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define elog
Definition: elog.h:219
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static void ProcessStandbyMessage ( void  )
static

Definition at line 1670 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1671 {
1672  char msgtype;
1673 
1674  /*
1675  * Check message type from the first byte.
1676  */
1677  msgtype = pq_getmsgbyte(&reply_message);
1678 
1679  switch (msgtype)
1680  {
1681  case 'r':
1683  break;
1684 
1685  case 'h':
1687  break;
1688 
1689  default:
1691  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1692  errmsg("unexpected message type \"%c\"", msgtype)));
1693  proc_exit(0);
1694  }
1695 }
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static StringInfoData reply_message
Definition: walsender.c:161
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1733
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1902
static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1733 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1734 {
1735  XLogRecPtr writePtr,
1736  flushPtr,
1737  applyPtr;
1738  bool replyRequested;
1739  TimeOffset writeLag,
1740  flushLag,
1741  applyLag;
1742  bool clearLagTimes;
1743  TimestampTz now;
1744 
1745  static bool fullyAppliedLastTime = false;
1746 
1747  /* the caller already consumed the msgtype byte */
1748  writePtr = pq_getmsgint64(&reply_message);
1749  flushPtr = pq_getmsgint64(&reply_message);
1750  applyPtr = pq_getmsgint64(&reply_message);
1751  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1752  replyRequested = pq_getmsgbyte(&reply_message);
1753 
1754  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1755  (uint32) (writePtr >> 32), (uint32) writePtr,
1756  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1757  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1758  replyRequested ? " (reply requested)" : "");
1759 
1760  /* See if we can compute the round-trip lag for these positions. */
1761  now = GetCurrentTimestamp();
1762  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1763  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1764  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1765 
1766  /*
1767  * If the standby reports that it has fully replayed the WAL in two
1768  * consecutive reply messages, then the second such message must result
1769  * from wal_receiver_status_interval expiring on the standby. This is a
1770  * convenient time to forget the lag times measured when it last
1771  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1772  * until more WAL traffic arrives.
1773  */
1774  clearLagTimes = false;
1775  if (applyPtr == sentPtr)
1776  {
1777  if (fullyAppliedLastTime)
1778  clearLagTimes = true;
1779  fullyAppliedLastTime = true;
1780  }
1781  else
1782  fullyAppliedLastTime = false;
1783 
1784  /* Send a reply if the standby requested one. */
1785  if (replyRequested)
1786  WalSndKeepalive(false);
1787 
1788  /*
1789  * Update shared state for this WalSender process based on reply data from
1790  * standby.
1791  */
1792  {
1793  WalSnd *walsnd = MyWalSnd;
1794 
1795  SpinLockAcquire(&walsnd->mutex);
1796  walsnd->write = writePtr;
1797  walsnd->flush = flushPtr;
1798  walsnd->apply = applyPtr;
1799  if (writeLag != -1 || clearLagTimes)
1800  walsnd->writeLag = writeLag;
1801  if (flushLag != -1 || clearLagTimes)
1802  walsnd->flushLag = flushLag;
1803  if (applyLag != -1 || clearLagTimes)
1804  walsnd->applyLag = applyLag;
1805  SpinLockRelease(&walsnd->mutex);
1806  }
1807 
1810 
1811  /*
1812  * Advance our local xmin horizon when the client confirmed a flush.
1813  */
1814  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1815  {
1818  else
1820  }
1821 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3308
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1701
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:268
#define SlotIsLogical(slot)
Definition: slot.h:134
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:161
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3431
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:914
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define elog
Definition: elog.h:219
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:409
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
bool am_cascading_walsender
Definition: walsender.c:115
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 427 of file walsender.c.

References buf, BYTEAOID, CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint(), pq_sendstring(), read, TEXTOID, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

428 {
430  char histfname[MAXFNAMELEN];
431  char path[MAXPGPATH];
432  int fd;
433  off_t histfilelen;
434  off_t bytesleft;
435  Size len;
436 
437  /*
438  * Reply with a result set with one row, and two columns. The first col is
439  * the name of the history file, 2nd is the contents.
440  */
441 
442  TLHistoryFileName(histfname, cmd->timeline);
443  TLHistoryFilePath(path, cmd->timeline);
444 
445  /* Send a RowDescription message */
446  pq_beginmessage(&buf, 'T');
447  pq_sendint(&buf, 2, 2); /* 2 fields */
448 
449  /* first field */
450  pq_sendstring(&buf, "filename"); /* col name */
451  pq_sendint(&buf, 0, 4); /* table oid */
452  pq_sendint(&buf, 0, 2); /* attnum */
453  pq_sendint(&buf, TEXTOID, 4); /* type oid */
454  pq_sendint(&buf, -1, 2); /* typlen */
455  pq_sendint(&buf, 0, 4); /* typmod */
456  pq_sendint(&buf, 0, 2); /* format code */
457 
458  /* second field */
459  pq_sendstring(&buf, "content"); /* col name */
460  pq_sendint(&buf, 0, 4); /* table oid */
461  pq_sendint(&buf, 0, 2); /* attnum */
462  pq_sendint(&buf, BYTEAOID, 4); /* type oid */
463  pq_sendint(&buf, -1, 2); /* typlen */
464  pq_sendint(&buf, 0, 4); /* typmod */
465  pq_sendint(&buf, 0, 2); /* format code */
466  pq_endmessage(&buf);
467 
468  /* Send a DataRow message */
469  pq_beginmessage(&buf, 'D');
470  pq_sendint(&buf, 2, 2); /* # of columns */
471  len = strlen(histfname);
472  pq_sendint(&buf, len, 4); /* col1 len */
473  pq_sendbytes(&buf, histfname, len);
474 
475  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
476  if (fd < 0)
477  ereport(ERROR,
479  errmsg("could not open file \"%s\": %m", path)));
480 
481  /* Determine file length and send it to client */
482  histfilelen = lseek(fd, 0, SEEK_END);
483  if (histfilelen < 0)
484  ereport(ERROR,
486  errmsg("could not seek to end of file \"%s\": %m", path)));
487  if (lseek(fd, 0, SEEK_SET) != 0)
488  ereport(ERROR,
490  errmsg("could not seek to beginning of file \"%s\": %m", path)));
491 
492  pq_sendint(&buf, histfilelen, 4); /* col2 len */
493 
494  bytesleft = histfilelen;
495  while (bytesleft > 0)
496  {
497  char rbuf[BLCKSZ];
498  int nread;
499 
501  nread = read(fd, rbuf, sizeof(rbuf));
503  if (nread <= 0)
504  ereport(ERROR,
506  errmsg("could not read file \"%s\": %m",
507  path)));
508  pq_sendbytes(&buf, rbuf, nread);
509  bytesleft -= nread;
510  }
511  CloseTransientFile(fd);
512 
513  pq_endmessage(&buf);
514 }
#define TEXTOID
Definition: pg_type.h:324
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
TimeLineID timeline
Definition: replnodes.h:96
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:66
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2305
#define MAXFNAMELEN
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
#define BYTEAOID
Definition: pg_type.h:292
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:115
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)
static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1045 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1046 {
1048 
1049  /* make sure that our requirements are still fulfilled */
1051 
1053 
1055 
1056  /*
1057  * Force a disconnect, so that the decoding code doesn't need to care
1058  * about an eventual switch from running in recovery, to running in a
1059  * normal environment. Client code is expected to handle reconnects.
1060  */
1062  {
1063  ereport(LOG,
1064  (errmsg("terminating walsender process after promotion")));
1065  got_STOPPING = true;
1066  }
1067 
1069 
1070  /* Send a CopyBothResponse message, and start streaming */
1071  pq_beginmessage(&buf, 'W');
1072  pq_sendbyte(&buf, 0);
1073  pq_sendint(&buf, 0, 2);
1074  pq_endmessage(&buf);
1075  pq_flush();
1076 
1077  /*
1078  * Initialize position to the last ack'ed one, then the xlog records begin
1079  * to be shipped from that position.
1080  */
1086 
1087  /* Start reading WAL from the oldest required WAL. */
1089 
1090  /*
1091  * Report the location after which we'll send out further commits as the
1092  * current sentPtr.
1093  */
1095 
1096  /* Also update the sent position status in shared memory */
1097  {
1098  WalSnd *walsnd = MyWalSnd;
1099 
1100  SpinLockAcquire(&walsnd->mutex);
1102  SpinLockRelease(&walsnd->mutex);
1103  }
1104 
1105  replication_active = true;
1106 
1108 
1109  /* Main loop of walsender */
1111 
1114 
1115  replication_active = false;
1116  if (got_STOPPING)
1117  proc_exit(0);
1119 
1120  /* Get out of COPY mode (CommandComplete). */
1121  EndCommand("COPY 0", DestRemote);
1122 }
XLogRecPtr startpoint
Definition: replnodes.h:84
#define pq_flush()
Definition: libpq.h:39
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:99
ReplicationSlotPersistentData data
Definition: slot.h:115
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7872
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
XLogRecPtr confirmed_flush
Definition: slot.h:76
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:752
#define SpinLockAcquire(lock)
Definition: spin.h:62
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
static char * buf
Definition: pg_test_fsync.c:66
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1160
static XLogRecPtr logical_startptr
Definition: walsender.c:198
void ReplicationSlotRelease(void)
Definition: slot.c:373
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2070
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1133
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:343
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:675
void WalSndSetState(WalSndState state)
Definition: walsender.c:3091
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:475
static void XLogSendLogical(void)
Definition: walsender.c:2733
XLogRecPtr restart_lsn
Definition: slot.h:68
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
bool am_cascading_walsender
Definition: walsender.c:115
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1250
static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 523 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), INT8OID, InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TEXTOID, ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TupleDescInitBuiltinEntry(), values, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

524 {
526  XLogRecPtr FlushPtr;
527 
528  if (ThisTimeLineID == 0)
529  ereport(ERROR,
530  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
531  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
532 
533  /*
534  * We assume here that we're logging enough information in the WAL for
535  * log-shipping, since this is checked in PostmasterMain().
536  *
537  * NOTE: wal_level can only change at shutdown, so in most cases it is
538  * difficult for there to be WAL data that we can still see that was
539  * written at wal_level='minimal'.
540  */
541 
542  if (cmd->slotname)
543  {
546  ereport(ERROR,
547  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548  (errmsg("cannot use a logical replication slot for physical replication"))));
549  }
550 
551  /*
552  * Select the timeline. If it was given explicitly by the client, use
553  * that. Otherwise use the timeline of the last replayed record, which is
554  * kept in ThisTimeLineID.
555  */
557  {
558  /* this also updates ThisTimeLineID */
559  FlushPtr = GetStandbyFlushRecPtr();
560  }
561  else
562  FlushPtr = GetFlushRecPtr();
563 
564  if (cmd->timeline != 0)
565  {
566  XLogRecPtr switchpoint;
567 
568  sendTimeLine = cmd->timeline;
570  {
571  sendTimeLineIsHistoric = false;
573  }
574  else
575  {
576  List *timeLineHistory;
577 
578  sendTimeLineIsHistoric = true;
579 
580  /*
581  * Check that the timeline the client requested exists, and the
582  * requested start location is on that timeline.
583  */
584  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
585  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
587  list_free_deep(timeLineHistory);
588 
589  /*
590  * Found the requested timeline in the history. Check that
591  * requested startpoint is on that timeline in our history.
592  *
593  * This is quite loose on purpose. We only check that we didn't
594  * fork off the requested timeline before the switchpoint. We
595  * don't check that we switched *to* it before the requested
596  * starting point. This is because the client can legitimately
597  * request to start replication from the beginning of the WAL
598  * segment that contains switchpoint, but on the new timeline, so
599  * that it doesn't end up with a partial segment. If you ask for
600  * too old a starting point, you'll get an error later when we
601  * fail to find the requested WAL segment in pg_wal.
602  *
603  * XXX: we could be more strict here and only allow a startpoint
604  * that's older than the switchpoint, if it's still in the same
605  * WAL segment.
606  */
607  if (!XLogRecPtrIsInvalid(switchpoint) &&
608  switchpoint < cmd->startpoint)
609  {
610  ereport(ERROR,
611  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
612  (uint32) (cmd->startpoint >> 32),
613  (uint32) (cmd->startpoint),
614  cmd->timeline),
615  errdetail("This server's history forked from timeline %u at %X/%X.",
616  cmd->timeline,
617  (uint32) (switchpoint >> 32),
618  (uint32) (switchpoint))));
619  }
620  sendTimeLineValidUpto = switchpoint;
621  }
622  }
623  else
624  {
627  sendTimeLineIsHistoric = false;
628  }
629 
631 
632  /* If there is nothing to stream, don't even enter COPY mode */
634  {
635  /*
636  * When we first start replication the standby will be behind the
637  * primary. For some applications, for example synchronous
638  * replication, it is important to have a clear state for this initial
639  * catchup mode, so we can trigger actions when we change streaming
640  * state later. We may stay in this state for a long time, which is
641  * exactly why we want to be able to monitor whether or not we are
642  * still here.
643  */
645 
646  /* Send a CopyBothResponse message, and start streaming */
647  pq_beginmessage(&buf, 'W');
648  pq_sendbyte(&buf, 0);
649  pq_sendint(&buf, 0, 2);
650  pq_endmessage(&buf);
651  pq_flush();
652 
653  /*
654  * Don't allow a request to stream from a future point in WAL that
655  * hasn't been flushed to disk in this server yet.
656  */
657  if (FlushPtr < cmd->startpoint)
658  {
659  ereport(ERROR,
660  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
661  (uint32) (cmd->startpoint >> 32),
662  (uint32) (cmd->startpoint),
663  (uint32) (FlushPtr >> 32),
664  (uint32) (FlushPtr))));
665  }
666 
667  /* Start streaming from the requested point */
668  sentPtr = cmd->startpoint;
669 
670  /* Initialize shared memory status, too */
671  {
672  WalSnd *walsnd = MyWalSnd;
673 
674  SpinLockAcquire(&walsnd->mutex);
675  walsnd->sentPtr = sentPtr;
676  SpinLockRelease(&walsnd->mutex);
677  }
678 
680 
681  /* Main loop of walsender */
682  replication_active = true;
683 
685 
686  replication_active = false;
687  if (got_STOPPING)
688  proc_exit(0);
690 
692  }
693 
694  if (cmd->slotname)
696 
697  /*
698  * Copy is finished now. Send a single-row result set indicating the next
699  * timeline.
700  */
702  {
703  char startpos_str[8 + 1 + 8 + 1];
704  DestReceiver *dest;
705  TupOutputState *tstate;
706  TupleDesc tupdesc;
707  Datum values[2];
708  bool nulls[2];
709 
710  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
711  (uint32) (sendTimeLineValidUpto >> 32),
713 
715  MemSet(nulls, false, sizeof(nulls));
716 
717  /*
718  * Need a tuple descriptor representing two columns. int8 may seem
719  * like a surprising data type for this, but in theory int4 would not
720  * be wide enough for this, as TimeLineID is unsigned.
721  */
722  tupdesc = CreateTemplateTupleDesc(2, false);
723  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
724  INT8OID, -1, 0);
725  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
726  TEXTOID, -1, 0);
727 
728  /* prepare for projection of tuple */
729  tstate = begin_tup_output_tupdesc(dest, tupdesc);
730 
731  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
732  values[1] = CStringGetTextDatum(startpos_str);
733 
734  /* send it to dest */
735  do_tup_output(tstate, values, nulls);
736 
737  end_tup_output(tstate);
738  }
739 
740  /* Send CommandComplete message */
741  pq_puttextmessage('C', "START_STREAMING");
742 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:84
#define pq_flush()
Definition: libpq.h:39
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define TEXTOID
Definition: pg_type.h:324
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2470
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8221
void list_free_deep(List *list)
Definition: list.c:1147
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:66
static bool streamingDoneSending
Definition: walsender.c:179
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
int errdetail(const char *fmt,...)
Definition: elog.c:873
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:373
#define SlotIsLogical(slot)
Definition: slot.h:134
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1791
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2070
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:372
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define INT8OID
Definition: pg_type.h:304
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
void WalSndSetState(WalSndState state)
Definition: walsender.c:3091
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static bool streamingDoneReceiving
Definition: walsender.c:180
static Datum values[MAXATTR]
Definition: bootstrap.c:163
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
TimeLineID timeline
Definition: replnodes.h:83
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2843
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:400
Definition: pg_list.h:45
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
bool am_cascading_walsender
Definition: walsender.c:115
static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 1874 of file walsender.c.

References GetNextXidAndEpoch(), and TransactionIdPrecedesOrEquals().

Referenced by ProcessStandbyHSFeedbackMessage().

1875 {
1876  TransactionId nextXid;
1877  uint32 nextEpoch;
1878 
1879  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1880 
1881  if (xid <= nextXid)
1882  {
1883  if (epoch != nextEpoch)
1884  return false;
1885  }
1886  else
1887  {
1888  if (epoch + 1 != nextEpoch)
1889  return false;
1890  }
1891 
1892  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1893  return false; /* epoch OK, but it's wrapped around */
1894 
1895  return true;
1896 }
uint32 TransactionId
Definition: c.h:397
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8290
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:268
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
static void WalSndCheckTimeOut ( TimestampTz  now)
static

Definition at line 2043 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2044 {
2045  TimestampTz timeout;
2046 
2047  /* don't bail out if we're doing something that doesn't require timeouts */
2048  if (last_reply_timestamp <= 0)
2049  return;
2050 
2053 
2054  if (wal_sender_timeout > 0 && now >= timeout)
2055  {
2056  /*
2057  * Since typically expiration of replication timeout means
2058  * communication problem, we don't send the error message to the
2059  * standby.
2060  */
2062  (errmsg("terminating walsender process due to replication timeout")));
2063 
2064  WalSndShutdown();
2065  }
2066 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndShutdown(void)
Definition: walsender.c:228
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:797
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2001 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2002 {
2003  long sleeptime = 10000; /* 10 s */
2004 
2006  {
2007  TimestampTz wakeup_time;
2008  long sec_to_timeout;
2009  int microsec_to_timeout;
2010 
2011  /*
2012  * At the latest stop sleeping once wal_sender_timeout has been
2013  * reached.
2014  */
2017 
2018  /*
2019  * If no ping has been sent yet, wakeup when it's time to do so.
2020  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2021  * the timeout passed without a response.
2022  */
2025  wal_sender_timeout / 2);
2026 
2027  /* Compute relative time until wakeup. */
2028  TimestampDifference(now, wakeup_time,
2029  &sec_to_timeout, &microsec_to_timeout);
2030 
2031  sleeptime = sec_to_timeout * 1000 +
2032  microsec_to_timeout / 1000;
2033  }
2034 
2035  return sleeptime;
2036 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2803 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2804 {
2805  XLogRecPtr replicatedPtr;
2806 
2807  /* ... let's just be real sure we're caught up ... */
2808  send_data();
2809 
2810  /*
2811  * To figure out whether all WAL has successfully been replicated, check
2812  * flush location if valid, write otherwise. Tools like pg_receivewal will
2813  * usually (unless in synchronous mode) return an invalid flush location.
2814  */
2815  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2817 
2818  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2819  !pq_is_send_pending())
2820  {
2821  /* Inform the standby that XLOG streaming is done */
2822  EndCommand("COPY 0", DestRemote);
2823  pq_flush();
2824 
2825  proc_exit(0);
2826  }
2828  {
2829  WalSndKeepalive(true);
2831  }
2832 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3308
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static bool WalSndCaughtUp
Definition: walsender.c:183
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:171
void WalSndErrorCleanup ( void  )

Definition at line 291 of file walsender.c.

References close, ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, LWLockReleaseAll(), MyReplicationSlot, NULL, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, WalSndSetState(), and WALSNDSTATE_STARTUP.

Referenced by PostgresMain().

292 {
296 
297  if (sendFile >= 0)
298  {
299  close(sendFile);
300  sendFile = -1;
301  }
302 
303  if (MyReplicationSlot != NULL)
305 
307 
308  replication_active = false;
309 
310  if (got_STOPPING || got_SIGUSR2)
311  proc_exit(0);
312 
313  /* Revert back to startup state */
315 }
static int sendFile
Definition: walsender.c:135
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:99
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:373
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
void WalSndSetState(WalSndState state)
Definition: walsender.c:3091
void ReplicationSlotCleanup(void)
Definition: slot.c:427
void LWLockReleaseAll(void)
Definition: lwlock.c:1814
#define close(a)
Definition: win32.h:12
static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3110 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3111 {
3112  switch (state)
3113  {
3114  case WALSNDSTATE_STARTUP:
3115  return "startup";
3116  case WALSNDSTATE_BACKUP:
3117  return "backup";
3118  case WALSNDSTATE_CATCHUP:
3119  return "catchup";
3120  case WALSNDSTATE_STREAMING:
3121  return "streaming";
3122  case WALSNDSTATE_STOPPING:
3123  return "stopping";
3124  }
3125  return "UNKNOWN";
3126 }
Definition: regguts.h:298
void WalSndInitStopping ( void  )

Definition at line 3025 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3026 {
3027  int i;
3028 
3029  for (i = 0; i < max_wal_senders; i++)
3030  {
3031  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3032  pid_t pid;
3033 
3034  SpinLockAcquire(&walsnd->mutex);
3035  pid = walsnd->pid;
3036  SpinLockRelease(&walsnd->mutex);
3037 
3038  if (pid == 0)
3039  continue;
3040 
3042  }
3043 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i
static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3308 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3309 {
3310  elog(DEBUG2, "sending replication keepalive");
3311 
3312  /* construct the message... */
3314  pq_sendbyte(&output_message, 'k');
3317  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3318 
3319  /* ... and send it wrapped in CopyData */
3321 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
static StringInfoData output_message
Definition: walsender.c:160
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static XLogRecPtr sentPtr
Definition: walsender.c:157
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog
Definition: elog.h:219
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
static void WalSndKeepaliveIfNecessary ( TimestampTz  now)
static

Definition at line 3327 of file walsender.c.

References last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3328 {
3329  TimestampTz ping_time;
3330 
3331  /*
3332  * Don't send keepalive messages if timeouts are globally disabled or
3333  * we're doing something not partaking in timeouts.
3334  */
3335  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3336  return;
3337 
3339  return;
3340 
3341  /*
3342  * If half of wal_sender_timeout has lapsed without receiving any reply
3343  * from the standby, send a keep-alive message to the standby requesting
3344  * an immediate reply.
3345  */
3347  wal_sender_timeout / 2);
3348  if (now >= ping_time)
3349  {
3350  WalSndKeepalive(true);
3352 
3353  /* Try to flush pending output to the client */
3354  if (pq_flush_if_writable() != 0)
3355  WalSndShutdown();
3356  }
3357 }
int wal_sender_timeout
Definition: walsender.c:122
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3308
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:228
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2267 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, NULL, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2268 {
2269  WalSnd *walsnd = MyWalSnd;
2270 
2271  Assert(walsnd != NULL);
2272 
2273  MyWalSnd = NULL;
2274 
2275  SpinLockAcquire(&walsnd->mutex);
2276  /* clear latch while holding the spinlock, so it can safely be read */
2277  walsnd->latch = NULL;
2278  /* Mark WalSnd struct as no longer being in use. */
2279  walsnd->pid = 0;
2280  SpinLockRelease(&walsnd->mutex);
2281 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2916 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2917 {
2918  int save_errno = errno;
2919 
2920  got_SIGUSR2 = true;
2921  SetLatch(MyLatch);
2922 
2923  errno = save_errno;
2924 }
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
struct Latch * MyLatch
Definition: globals.c:52
static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2070 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, now(), NULL, PGC_SIGHUP, pgstat_report_activity(), PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, STATE_RUNNING, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

2071 {
2072  /*
2073  * Initialize the last reply timestamp. That enables timeout processing
2074  * from hereon.
2075  */
2077  waiting_for_ping_response = false;
2078 
2079  /* Report to pgstat that this process is running */
2081 
2082  /*
2083  * Loop until we reach the end of this timeline or the client requests to
2084  * stop streaming.
2085  */
2086  for (;;)
2087  {
2088  TimestampTz now;
2089 
2090  /*
2091  * Emergency bailout if postmaster has died. This is to avoid the
2092  * necessity for manual cleanup of all postmaster children.
2093  */
2094  if (!PostmasterIsAlive())
2095  exit(1);
2096 
2097  /* Clear any already-pending wakeups */
2099 
2101 
2102  /* Process any requests or signals received recently */
2103  if (ConfigReloadPending)
2104  {
2105  ConfigReloadPending = false;
2108  }
2109 
2110  /* Check for input from the client */
2112 
2113  /*
2114  * If we have received CopyDone from the client, sent CopyDone
2115  * ourselves, and the output buffer is empty, it's time to exit
2116  * streaming.
2117  */
2119  break;
2120 
2121  /*
2122  * If we don't have any pending data in the output buffer, try to send
2123  * some more. If there is some, we don't bother to call send_data
2124  * again until we've flushed it ... but we'd better assume we are not
2125  * caught up.
2126  */
2127  if (!pq_is_send_pending())
2128  send_data();
2129  else
2130  WalSndCaughtUp = false;
2131 
2132  /* Try to flush pending output to the client */
2133  if (pq_flush_if_writable() != 0)
2134  WalSndShutdown();
2135 
2136  /* If nothing remains to be sent right now ... */
2138  {
2139  /*
2140  * If we're in catchup state, move to streaming. This is an
2141  * important state change for users to know about, since before
2142  * this point data loss might occur if the primary dies and we
2143  * need to failover to the standby. The state change is also
2144  * important for synchronous replication, since commits that
2145  * started to wait at that point might wait for some time.
2146  */
2148  {
2149  ereport(DEBUG1,
2150  (errmsg("standby \"%s\" has now caught up with primary",
2151  application_name)));
2153  }
2154 
2155  /*
2156  * When SIGUSR2 arrives, we send any outstanding logs up to the
2157  * shutdown checkpoint record (i.e., the latest record), wait for
2158  * them to be replicated to the standby, and exit. This may be a
2159  * normal termination at shutdown, or a promotion, the walsender
2160  * is not sure which.
2161  */
2162  if (got_SIGUSR2)
2163  WalSndDone(send_data);
2164  }
2165 
2166  now = GetCurrentTimestamp();
2167 
2168  /* Check for replication timeout. */
2169  WalSndCheckTimeOut(now);
2170 
2171  /* Send keepalive if the time has come */
2173 
2174  /*
2175  * We don't block if not caught up, unless there is unsent data
2176  * pending in which case we'd better block until the socket is
2177  * write-ready. This test is only needed for the case where the
2178  * send_data callback handled a subset of the available data but then
2179  * pq_flush_if_writable flushed it all --- we should immediately try
2180  * to send more.
2181  */
2183  {
2184  long sleeptime;
2185  int wakeEvents;
2186 
2187  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2189 
2190  sleeptime = WalSndComputeSleeptime(now);
2191 
2192  if (pq_is_send_pending())
2193  wakeEvents |= WL_SOCKET_WRITEABLE;
2194 
2195  /* Sleep until something happens or we time out */
2196  WaitLatchOrSocket(MyLatch, wakeEvents,
2197  MyProcPort->sock, sleeptime,
2199  }
2200  }
2201  return;
2202 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:41
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2803
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
pgsocket sock
Definition: libpq-be.h:118
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:179
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:228
WalSnd * MyWalSnd
Definition: walsender.c:111
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3327
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2001
#define NULL
Definition: c.h:229
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2043
void WalSndSetState(WalSndState state)
Definition: walsender.c:3091
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static bool streamingDoneReceiving
Definition: walsender.c:180
char * application_name
Definition: guc.c:469
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
static bool waiting_for_ping_response
Definition: walsender.c:171
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1568
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1133 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1134 {
1135  /* can't have sync rep confused by sending the same LSN several times */
1136  if (!last_write)
1137  lsn = InvalidXLogRecPtr;
1138 
1139  resetStringInfo(ctx->out);
1140 
1141  pq_sendbyte(ctx->out, 'w');
1142  pq_sendint64(ctx->out, lsn); /* dataStart */
1143  pq_sendint64(ctx->out, lsn); /* walEnd */
1144 
1145  /*
1146  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1147  * reserve space here.
1148  */
1149  pq_sendint64(ctx->out, 0); /* sendtime */
1150 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
StringInfo out
Definition: logical.h:66
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
void WalSndRqstFileReload ( void  )

Definition at line 2873 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2874 {
2875  int i;
2876 
2877  for (i = 0; i < max_wal_senders; i++)
2878  {
2879  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2880 
2881  if (walsnd->pid == 0)
2882  continue;
2883 
2884  SpinLockAcquire(&walsnd->mutex);
2885  walsnd->needreload = true;
2886  SpinLockRelease(&walsnd->mutex);
2887  }
2888 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i
void WalSndSetState ( WalSndState  state)

Definition at line 3091 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3092 {
3093  WalSnd *walsnd = MyWalSnd;
3094 
3096 
3097  if (walsnd->state == state)
3098  return;
3099 
3100  SpinLockAcquire(&walsnd->mutex);
3101  walsnd->state = state;
3102  SpinLockRelease(&walsnd->mutex);
3103 }
slock_t mutex
bool am_walsender
Definition: walsender.c:114
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
#define Assert(condition)
Definition: c.h:675
Definition: regguts.h:298
void WalSndShmemInit ( void  )

Definition at line 2964 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

2965 {
2966  bool found;
2967  int i;
2968 
2969  WalSndCtl = (WalSndCtlData *)
2970  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2971 
2972  if (!found)
2973  {
2974  /* First time through, so initialize */
2976 
2977  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2979 
2980  for (i = 0; i < max_wal_senders; i++)
2981  {
2982  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2983 
2984  SpinLockInit(&walsnd->mutex);
2985  }
2986  }
2987 }
Size WalSndShmemSize(void)
Definition: walsender.c:2952
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
#define MemSet(start, val, len)
Definition: c.h:857
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:120
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
Size WalSndShmemSize ( void  )

Definition at line 2952 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2953 {
2954  Size size = 0;
2955 
2956  size = offsetof(WalSndCtlData, walsnds);
2957  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2958 
2959  return size;
2960 }
int max_wal_senders
Definition: walsender.c:120
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:356
#define offsetof(type, field)
Definition: c.h:555
static void WalSndShutdown ( void  )
static

Definition at line 228 of file walsender.c.

References am_cascading_walsender, CurrentResourceOwner, InitWalSenderSlot(), LagTracker, MarkPostmasterChildWalSender(), NULL, PMSIGNAL_ADVANCE_STATE_MACHINE, RecoveryInProgress(), ResourceOwnerCreate(), and SendPostmasterSignal().

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

260 {
262 
263  /* Create a per-walsender data structure in shared memory */
265 
266  /* Set up resource owner */
267  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
268 
269  /*
270  * Let postmaster know that we're a WAL sender. Once we've declared us as
271  * a WAL sender process, postmaster will let us outlive the bgwriter and
272  * kill us last in the shutdown sequence, so we get a chance to stream all
273  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
274  * there's no going back, and we mustn't write any WAL records after this.
275  */
278 
279  /* Initialize empty timestamp buffer for lag tracking. */
280  memset(&LagTracker, 0, sizeof(LagTracker));
281 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2206
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
bool RecoveryInProgress(void)
Definition: xlog.c:7872
static struct @27 LagTracker
#define NULL
Definition: c.h:229
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
bool am_cascading_walsender
Definition: walsender.c:115
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
void WalSndSignals ( void  )

Definition at line 2928 of file walsender.c.

References die(), InitializeTimeouts(), PostgresSigHupHandler(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

2929 {
2930  /* Set up signal handlers */
2931  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2932  * file */
2933  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2934  pqsignal(SIGTERM, die); /* request shutdown */
2935  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2936  InitializeTimeouts(); /* establishes SIGALRM handler */
2939  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2940  * shutdown */
2941 
2942  /* Reset some signals that are accepted by postmaster but not here */
2948 }
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGUSR1
Definition: win32.h:202
#define SIGCONT
Definition: win32.h:197
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2916
#define SIGWINCH
Definition: win32.h:201
#define SIGTTIN
Definition: win32.h:199
#define SIGQUIT
Definition: win32.h:189
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2648
#define SIG_IGN
Definition: win32.h:185
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2688
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
#define SIG_DFL
Definition: win32.h:183
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIGTTOU
Definition: win32.h:200
void die(SIGNAL_ARGS)
Definition: postgres.c:2617
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:261
#define SIGCHLD
Definition: win32.h:198
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2556
#define SIGUSR2
Definition: win32.h:203
static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1250 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1251 {
1252  static TimestampTz sendTime = 0;
1254 
1255  /*
1256  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1257  * avoid flooding the lag tracker when we commit frequently.
1258  */
1259 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1260  if (!TimestampDifferenceExceeds(sendTime, now,
1262  return;
1263 
1264  LagTrackerWrite(lsn, now);
1265  sendTime = now;
1266 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3366
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1272 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, now(), NULL, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1273 {
1274  int wakeEvents;
1275  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1276 
1277 
1278  /*
1279  * Fast path to avoid acquiring the spinlock in case we already know we
1280  * have enough WAL available. This is particularly interesting if we're
1281  * far behind.
1282  */
1283  if (RecentFlushPtr != InvalidXLogRecPtr &&
1284  loc <= RecentFlushPtr)
1285  return RecentFlushPtr;
1286 
1287  /* Get a more recent flush pointer. */
1288  if (!RecoveryInProgress())
1289  RecentFlushPtr = GetFlushRecPtr();
1290  else
1291  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1292 
1293  for (;;)
1294  {
1295  long sleeptime;
1296  TimestampTz now;
1297 
1298  /*
1299  * Emergency bailout if postmaster has died. This is to avoid the
1300  * necessity for manual cleanup of all postmaster children.
1301  */
1302  if (!PostmasterIsAlive())
1303  exit(1);
1304 
1305  /* Clear any already-pending wakeups */
1307 
1309 
1310  /* Process any requests or signals received recently */
1311  if (ConfigReloadPending)
1312  {
1313  ConfigReloadPending = false;
1316  }
1317 
1318  /* Check for input from the client */
1320 
1321  /*
1322  * If we're shutting down, trigger pending WAL to be written out,
1323  * otherwise we'd possibly end up waiting for WAL that never gets
1324  * written, because walwriter has shut down already.
1325  */
1326  if (got_STOPPING)
1328 
1329  /* Update our idea of the currently flushed position. */
1330  if (!RecoveryInProgress())
1331  RecentFlushPtr = GetFlushRecPtr();
1332  else
1333  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1334 
1335  /*
1336  * If postmaster asked us to stop, don't wait here anymore. This will
1337  * cause the xlogreader to return without reading a full record, which
1338  * is the fastest way to reach the mainloop which then can quit.
1339  *
1340  * It's important to do this check after the recomputation of
1341  * RecentFlushPtr, so we can send all remaining data before shutting
1342  * down.
1343  */
1344  if (got_STOPPING)
1345  break;
1346 
1347  /*
1348  * We only send regular messages to the client for full decoded
1349  * transactions, but a synchronous replication and walsender shutdown
1350  * possibly are waiting for a later location. So we send pings
1351  * containing the flush location every now and then.
1352  */
1353  if (MyWalSnd->flush < sentPtr &&
1354  MyWalSnd->write < sentPtr &&
1356  {
1357  WalSndKeepalive(false);
1359  }
1360 
1361  /* check whether we're done */
1362  if (loc <= RecentFlushPtr)
1363  break;
1364 
1365  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1366  WalSndCaughtUp = true;
1367 
1368  /*
1369  * Try to flush pending output to the client. Also wait for the socket
1370  * becoming writable, if there's still pending output after an attempt
1371  * to flush. Otherwise we might just sit on output data while waiting
1372  * for new WAL being generated.
1373  */
1374  if (pq_flush_if_writable() != 0)
1375  WalSndShutdown();
1376 
1377  now = GetCurrentTimestamp();
1378 
1379  /* die if timeout was reached */
1380  WalSndCheckTimeOut(now);
1381 
1382  /* Send keepalive if the time has come */
1384 
1385  sleeptime = WalSndComputeSleeptime(now);
1386 
1387  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1389 
1390  if (pq_is_send_pending())
1391  wakeEvents |= WL_SOCKET_WRITEABLE;
1392 
1393  /* Sleep until something happens or we time out */
1394  WaitLatchOrSocket(MyLatch, wakeEvents,
1395  MyProcPort->sock, sleeptime,
1397  }
1398 
1399  /* reactivate latch so WalSndLoop knows to continue */
1400  SetLatch(MyLatch);
1401  return RecentFlushPtr;
1402 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:41
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8221
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
bool RecoveryInProgress(void)
Definition: xlog.c:7872
pgsocket sock
Definition: libpq-be.h:118
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3308
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11088
bool XLogBackgroundFlush(void)
Definition: xlog.c:2946
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void SyncRepInitConfig(void)
Definition: syncrep.c:381
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:228
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3327
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2001
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2043
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
static bool waiting_for_ping_response
Definition: walsender.c:171
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1568
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void WalSndWaitStopping ( void  )

Definition at line 3051 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3052 {
3053  for (;;)
3054  {
3055  int i;
3056  bool all_stopped = true;
3057 
3058  for (i = 0; i < max_wal_senders; i++)
3059  {
3061  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3062 
3063  SpinLockAcquire(&walsnd->mutex);
3064 
3065  if (walsnd->pid == 0)
3066  {
3067  SpinLockRelease(&walsnd->mutex);
3068  continue;
3069  }
3070 
3071  state = walsnd->state;
3072  SpinLockRelease(&walsnd->mutex);
3073 
3074  if (state != WALSNDSTATE_STOPPING)
3075  {
3076  all_stopped = false;
3077  break;
3078  }
3079  }
3080 
3081  /* safe to leave if confirmation is done for all WAL senders */
3082  if (all_stopped)
3083  return;
3084 
3085  pg_usleep(10000L); /* wait for 10 msec */
3086  }
3087 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: regguts.h:298
WalSndState
int i
void WalSndWakeup ( void  )

Definition at line 2996 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, NULL, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

2997 {
2998  int i;
2999 
3000  for (i = 0; i < max_wal_senders; i++)
3001  {
3002  Latch *latch;
3003  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3004 
3005  /*
3006  * Get latch pointer with spinlock held, for the unlikely case that
3007  * pointer reads aren't atomic (as they're 8 bytes).
3008  */
3009  SpinLockAcquire(&walsnd->mutex);
3010  latch = walsnd->latch;
3011  SpinLockRelease(&walsnd->mutex);
3012 
3013  if (latch != NULL)
3014  SetLatch(latch);
3015  }
3016 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:120
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define NULL
Definition: c.h:229
int i
static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1160 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1162 {
1163  /* output previously gathered data in a CopyData packet */
1164  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1165 
1166  /*
1167  * Fill the send timestamp last, so that it is taken as late as possible.
1168  * This is somewhat ugly, but the protocol is set as it's already used for
1169  * several releases by streaming physical replication.
1170  */
1173  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1174  tmpbuf.data, sizeof(int64));
1175 
1176  /* fast path */
1177  /* Try to flush pending output to the client */
1178  if (pq_flush_if_writable() != 0)
1179  WalSndShutdown();
1180 
1181  if (!pq_is_send_pending())
1182  return;
1183 
1184  for (;;)
1185  {
1186  int wakeEvents;
1187  long sleeptime;
1188  TimestampTz now;
1189 
1190  /*
1191  * Emergency bailout if postmaster has died. This is to avoid the
1192  * necessity for manual cleanup of all postmaster children.
1193  */
1194  if (!PostmasterIsAlive())
1195  exit(1);
1196 
1197  /* Clear any already-pending wakeups */
1199 
1201 
1202  /* Process any requests or signals received recently */
1203  if (ConfigReloadPending)
1204  {
1205  ConfigReloadPending = false;
1208  }
1209 
1210  /* Check for input from the client */
1212 
1213  /* Try to flush pending output to the client */
1214  if (pq_flush_if_writable() != 0)
1215  WalSndShutdown();
1216 
1217  /* If we finished clearing the buffered data, we're done here. */
1218  if (!pq_is_send_pending())
1219  break;
1220 
1221  now = GetCurrentTimestamp();
1222 
1223  /* die if timeout was reached */
1224  WalSndCheckTimeOut(now);
1225 
1226  /* Send keepalive if the time has come */
1228 
1229  sleeptime = WalSndComputeSleeptime(now);
1230 
1231  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1233 
1234  /* Sleep until something happens or we time out */
1235  WaitLatchOrSocket(MyLatch, wakeEvents,
1236  MyProcPort->sock, sleeptime,
1238  }
1239 
1240  /* reactivate latch so WalSndLoop knows to continue */
1241  SetLatch(MyLatch);
1242 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
struct Port * MyProcPort
Definition: globals.c:41
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
pgsocket sock
Definition: libpq-be.h:118
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
void SyncRepInitConfig(void)
Definition: syncrep.c:381
Definition: guc.h:72
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:228
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3327
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2001
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2043
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
static StringInfoData tmpbuf
Definition: walsender.c:162
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:66
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1568
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void XLogRead ( char *  buf,
XLogRecPtr  startptr,
Size  count 
)
static

Definition at line 2295 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), buf, CheckXLogRemoved(), close, curFileTimeLine, ereport, errcode_for_file_access(), errmsg(), ERROR, MAXPGPATH, WalSnd::mutex, MyWalSnd, WalSnd::needreload, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sendFile, sendOff, sendSegNo, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, WAIT_EVENT_WAL_READ, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegSize.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2296 {
2297  char *p;
2298  XLogRecPtr recptr;
2299  Size nbytes;
2300  XLogSegNo segno;
2301 
2302 retry:
2303  p = buf;
2304  recptr = startptr;
2305  nbytes = count;
2306 
2307  while (nbytes > 0)
2308  {
2309  uint32 startoff;
2310  int segbytes;
2311  int readbytes;
2312 
2313  startoff = recptr % XLogSegSize;
2314 
2315  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2316  {
2317  char path[MAXPGPATH];
2318 
2319  /* Switch to another logfile segment */
2320  if (sendFile >= 0)
2321  close(sendFile);
2322 
2323  XLByteToSeg(recptr, sendSegNo);
2324 
2325  /*-------
2326  * When reading from a historic timeline, and there is a timeline
2327  * switch within this segment, read from the WAL segment belonging
2328  * to the new timeline.
2329  *
2330  * For example, imagine that this server is currently on timeline
2331  * 5, and we're streaming timeline 4. The switch from timeline 4
2332  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2333  *
2334  * ...
2335  * 000000040000000000000012
2336  * 000000040000000000000013
2337  * 000000050000000000000013
2338  * 000000050000000000000014
2339  * ...
2340  *
2341  * In this situation, when requested to send the WAL from
2342  * segment 0x13, on timeline 4, we read the WAL from file
2343  * 000000050000000000000013. Archive recovery prefers files from
2344  * newer timelines, so if the segment was restored from the
2345  * archive on this server, the file belonging to the old timeline,
2346  * 000000040000000000000013, might not exist. Their contents are
2347  * equal up to the switchpoint, because at a timeline switch, the
2348  * used portion of the old segment is copied to the new file.
2349  *-------
2350  */
2353  {
2354  XLogSegNo endSegNo;
2355 
2357  if (sendSegNo == endSegNo)
2359  }
2360 
2362 
2363  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2364  if (sendFile < 0)
2365  {
2366  /*
2367  * If the file is not found, assume it's because the standby
2368  * asked for a too old WAL segment that has already been
2369  * removed or recycled.
2370  */
2371  if (errno == ENOENT)
2372  ereport(ERROR,
2374  errmsg("requested WAL segment %s has already been removed",
2376  else
2377  ereport(ERROR,
2379  errmsg("could not open file \"%s\": %m",
2380  path)));
2381  }
2382  sendOff = 0;
2383  }
2384 
2385  /* Need to seek in the file? */
2386  if (sendOff != startoff)
2387  {
2388  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2389  ereport(ERROR,
2391  errmsg("could not seek in log segment %s to offset %u: %m",
2393  startoff)));
2394  sendOff = startoff;
2395  }
2396 
2397  /* How many bytes are within this segment? */
2398  if (nbytes > (XLogSegSize - startoff))
2399  segbytes = XLogSegSize - startoff;
2400  else
2401  segbytes = nbytes;
2402 
2404  readbytes = read(sendFile, p, segbytes);
2406  if (readbytes <= 0)
2407  {
2408  ereport(ERROR,
2410  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2412  sendOff, (unsigned long) segbytes)));
2413  }
2414 
2415  /* Update state for read */
2416  recptr += readbytes;
2417 
2418  sendOff += readbytes;
2419  nbytes -= readbytes;
2420  p += readbytes;
2421  }
2422 
2423  /*
2424  * After reading into the buffer, check that what we read was valid. We do
2425  * this after reading, because even though the segment was present when we
2426  * opened it, it might get recycled or removed while we read it. The
2427  * read() succeeds in that case, but the data we tried to read might
2428  * already have been overwritten with new WAL records.
2429  */
2430  XLByteToSeg(startptr, segno);
2432 
2433  /*
2434  * During recovery, the currently-open WAL file might be replaced with the
2435  * file of the same name retrieved from archive. So we always need to
2436  * check what we read was valid after reading into the buffer. If it's
2437  * invalid, we try to open and read the file again.
2438  */
2440  {
2441  WalSnd *walsnd = MyWalSnd;
2442  bool reload;
2443 
2444  SpinLockAcquire(&walsnd->mutex);
2445  reload = walsnd->needreload;
2446  walsnd->needreload = false;
2447  SpinLockRelease(&walsnd->mutex);
2448 
2449  if (reload && sendFile >= 0)
2450  {
2451  close(sendFile);
2452  sendFile = -1;
2453 
2454  goto retry;
2455  }
2456  }
2457 }
#define XLogSegSize
Definition: xlog_internal.h:92
static int sendFile
Definition: walsender.c:135
#define PG_BINARY
Definition: c.h:1038
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3765
#define MAXPGPATH
static TimeLineID curFileTimeLine
Definition: walsender.c:140
#define XLogFilePath(path, tli, logSegNo)
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10130
static char * buf
Definition: pg_test_fsync.c:66
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
#define ereport(elevel, rest)
Definition: elog.h:122
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:111
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define XLByteToSeg(xlrp, logSegNo)
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool needreload
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
static XLogSegNo sendSegNo
Definition: walsender.c:136
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 sendOff
Definition: walsender.c:137
#define close(a)
Definition: win32.h:12
#define XLByteInSeg(xlrp, logSegNo)
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
bool am_cascading_walsender
Definition: walsender.c:115
int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:936
static void XLogSendLogical ( void  )
static

Definition at line 2733 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, NULL, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2734 {
2735  XLogRecord *record;
2736  char *errm;
2737 
2738  /*
2739  * Don't know whether we've caught up yet. We'll set it to true in
2740  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2741  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2742  * i.e. when we're shutting down.
2743  */
2744  WalSndCaughtUp = false;
2745 
2748 
2749  /* xlog record was invalid */
2750  if (errm != NULL)
2751  elog(ERROR, "%s", errm);
2752 
2753  if (record != NULL)
2754  {
2755  /*
2756  * Note the lack of any call to LagTrackerWrite() which is handled by
2757  * WalSndUpdateProgress which is called by output plugin through
2758  * logical decoding write api.
2759  */
2761 
2763  }
2764  else
2765  {
2766  /*
2767  * If the record we just wanted read is at or beyond the flushed
2768  * point, then we're caught up.
2769  */
2771  {
2772  WalSndCaughtUp = true;
2773 
2774  /*
2775  * Have WalSndLoop() terminate the connection in an orderly
2776  * manner, after writing out all the pending data.
2777  */
2778  if (got_STOPPING)
2779  got_SIGUSR2 = true;
2780  }
2781  }
2782 
2783  /* Update shared memory status */
2784  {
2785  WalSnd *walsnd = MyWalSnd;
2786 
2787  SpinLockAcquire(&walsnd->mutex);
2788  walsnd->sentPtr = sentPtr;
2789  SpinLockRelease(&walsnd->mutex);
2790  }
2791 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8221
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
static XLogRecPtr logical_startptr
Definition: walsender.c:198
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static bool WalSndCaughtUp
Definition: walsender.c:183
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
#define NULL
Definition: c.h:229
XLogReaderState * reader
Definition: logical.h:44
#define elog
Definition: elog.h:219
static void XLogSendPhysical ( void  )
static

Definition at line 2470 of file walsender.c.

References am_cascading_walsender, Assert, close, StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, NULL, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendFile, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, and XLogRead().

Referenced by StartReplication().

2471 {
2472  XLogRecPtr SendRqstPtr;
2473  XLogRecPtr startptr;
2474  XLogRecPtr endptr;
2475  Size nbytes;
2476 
2477  /* If requested switch the WAL sender to the stopping state. */
2478  if (got_STOPPING)
2480 
2482  {
2483  WalSndCaughtUp = true;
2484  return;
2485  }
2486 
2487  /* Figure out how far we can safely send the WAL. */
2489  {
2490  /*
2491  * Streaming an old timeline that's in this server's history, but is
2492  * not the one we're currently inserting or replaying. It can be
2493  * streamed up to the point where we switched off that timeline.
2494  */
2495  SendRqstPtr = sendTimeLineValidUpto;
2496  }
2497  else if (am_cascading_walsender)
2498  {
2499  /*
2500  * Streaming the latest timeline on a standby.
2501  *
2502  * Attempt to send all WAL that has already been replayed, so that we
2503  * know it's valid. If we're receiving WAL through streaming
2504  * replication, it's also OK to send any WAL that has been received
2505  * but not replayed.
2506  *
2507  * The timeline we're recovering from can change, or we can be
2508  * promoted. In either case, the current timeline becomes historic. We
2509  * need to detect that so that we don't try to stream past the point
2510  * where we switched to another timeline. We check for promotion or
2511  * timeline switch after calculating FlushPtr, to avoid a race
2512  * condition: if the timeline becomes historic just after we checked
2513  * that it was still current, it's still be OK to stream it up to the
2514  * FlushPtr that was calculated before it became historic.
2515  */
2516  bool becameHistoric = false;
2517 
2518  SendRqstPtr = GetStandbyFlushRecPtr();
2519 
2520  if (!RecoveryInProgress())
2521  {
2522  /*
2523  * We have been promoted. RecoveryInProgress() updated
2524  * ThisTimeLineID to the new current timeline.
2525  */
2526  am_cascading_walsender = false;
2527  becameHistoric = true;
2528  }
2529  else
2530  {
2531  /*
2532  * Still a cascading standby. But is the timeline we're sending
2533  * still the one recovery is recovering from? ThisTimeLineID was
2534  * updated by the GetStandbyFlushRecPtr() call above.
2535  */
2537  becameHistoric = true;
2538  }
2539 
2540  if (becameHistoric)
2541  {
2542  /*
2543  * The timeline we were sending has become historic. Read the
2544  * timeline history file of the new timeline to see where exactly
2545  * we forked off from the timeline we were sending.
2546  */
2547  List *history;
2548 
2551 
2553  list_free_deep(history);
2554 
2555  sendTimeLineIsHistoric = true;
2556 
2557  SendRqstPtr = sendTimeLineValidUpto;
2558  }
2559  }
2560  else
2561  {
2562  /*
2563  * Streaming the current timeline on a master.
2564  *
2565  * Attempt to send all data that's already been written out and
2566  * fsync'd to disk. We cannot go further than what's been written out
2567  * given the current implementation of XLogRead(). And in any case
2568  * it's unsafe to send WAL that is not securely down to disk on the
2569  * master: if the master subsequently crashes and restarts, slaves
2570  * must not have applied any WAL that got lost on the master.
2571  */
2572  SendRqstPtr = GetFlushRecPtr();
2573  }
2574 
2575  /*
2576  * Record the current system time as an approximation of the time at which
2577  * this WAL location was written for the purposes of lag tracking.
2578  *
2579  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2580  * is flushed and we could get that time as well as the LSN when we call
2581  * GetFlushRecPtr() above (and likewise for the cascading standby
2582  * equivalent), but rather than putting any new code into the hot WAL path
2583  * it seems good enough to capture the time here. We should reach this
2584  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2585  * may take some time, we read the WAL flush pointer and take the time
2586  * very close to together here so that we'll get a later position if it is
2587  * still moving.
2588  *
2589  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2590  * this gives us a cheap approximation for the WAL flush time for this
2591  * LSN.
2592  *
2593  * Note that the LSN is not necessarily the LSN for the data contained in
2594  * the present message; it's the end of the WAL, which might be further
2595  * ahead. All the lag tracking machinery cares about is finding out when
2596  * that arbitrary LSN is eventually reported as written, flushed and
2597  * applied, so that it can measure the elapsed time.
2598  */
2599  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2600 
2601  /*
2602  * If this is a historic timeline and we've reached the point where we
2603  * forked to the next timeline, stop streaming.
2604  *
2605  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2606  * startup process will normally replay all WAL that has been received
2607  * from the master, before promoting, but if the WAL streaming is
2608  * terminated at a WAL page boundary, the valid portion of the timeline
2609  * might end in the middle of a WAL record. We might've already sent the
2610  * first half of that partial WAL record to the cascading standby, so that
2611  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2612  * replay the partial WAL record either, so it can still follow our
2613  * timeline switch.
2614  */
2616  {
2617  /* close the current file. */
2618  if (sendFile >= 0)
2619  close(sendFile);
2620  sendFile = -1;
2621 
2622  /* Send CopyDone */
2623  pq_putmessage_noblock('c', NULL, 0);
2624  streamingDoneSending = true;
2625 
2626  WalSndCaughtUp = true;
2627 
2628  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2630  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2631  return;
2632  }
2633 
2634  /* Do we have any work to do? */
2635  Assert(sentPtr <= SendRqstPtr);
2636  if (SendRqstPtr <= sentPtr)
2637  {
2638  WalSndCaughtUp = true;
2639  return;
2640  }
2641 
2642  /*
2643  * Figure out how much to send in one message. If there's no more than
2644  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2645  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2646  *
2647  * The rounding is not only for performance reasons. Walreceiver relies on
2648  * the fact that we never split a WAL record across two messages. Since a
2649  * long WAL record is split at page boundary into continuation records,
2650  * page boundary is always a safe cut-off point. We also assume that
2651  * SendRqstPtr never points to the middle of a WAL record.
2652  */
2653  startptr = sentPtr;
2654  endptr = startptr;
2655  endptr += MAX_SEND_SIZE;
2656 
2657  /* if we went beyond SendRqstPtr, back off */
2658  if (SendRqstPtr <= endptr)
2659  {
2660  endptr = SendRqstPtr;
2662  WalSndCaughtUp = false;
2663  else
2664  WalSndCaughtUp = true;
2665  }
2666  else
2667  {
2668  /* round down to page boundary. */
2669  endptr -= (endptr % XLOG_BLCKSZ);
2670  WalSndCaughtUp = false;
2671  }
2672 
2673  nbytes = endptr - startptr;
2674  Assert(nbytes <= MAX_SEND_SIZE);
2675 
2676  /*
2677  * OK to read and send the slice.