PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define PG_STAT_GET_WAL_SENDERS_COLS   11
 

Typedefs

typedef void(* WalSndSendDataCallback )(void)
 

Functions

static void WalSndSigHupHandler (SIGNAL_ARGS)
 
static void WalSndXLogSendHandler (SIGNAL_ARGS)
 
static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (TimestampTz now)
 
static void XLogRead (char *buf, XLogRecPtr startptr, Size count)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (TimestampTz now)
 
void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static int sendFile = -1
 
static XLogSegNo sendSegNo = 0
 
static uint32 sendOff = 0
 
static TimeLineID curFileTimeLine = 0
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t walsender_ready_to_stop = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 
struct {
   XLogRecPtr   last_lsn
 
   WalTimeSample   buffer [LAG_TRACKER_BUFFER_SIZE]
 
   int   write_head
 
   int   read_heads [NUM_SYNC_REP_WAIT_MODE]
 
   WalTimeSample   last_read [NUM_SYNC_REP_WAIT_MODE]
 
LagTracker
 

Macro Definition Documentation

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 201 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 99 of file walsender.c.

Referenced by XLogSendPhysical().

#define PG_STAT_GET_WAL_SENDERS_COLS   11

Referenced by pg_stat_get_wal_senders().

Typedef Documentation

typedef void(* WalSndSendDataCallback)(void)

Definition at line 219 of file walsender.c.

Function Documentation

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 832 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, NULL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf(), CreateReplicationSlotCmd::temporary, TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), and main().

833 {
834  const char *snapshot_name = NULL;
835  char xpos[MAXFNAMELEN];
836  char *slot_name;
837  bool reserve_wal = false;
838  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
839  DestReceiver *dest;
840  TupOutputState *tstate;
841  TupleDesc tupdesc;
842  Datum values[4];
843  bool nulls[4];
844 
846 
847  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
848 
849  /* setup state for XLogReadPage */
850  sendTimeLineIsHistoric = false;
852 
853  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
854  {
855  ReplicationSlotCreate(cmd->slotname, false,
857  }
858  else
859  {
861 
862  /*
863  * Initially create persistent slot as ephemeral - that allows us to
864  * nicely handle errors during initialization because it'll get
865  * dropped if this transaction fails. We'll make it persistent at the
866  * end. Temporary slots can be created as temporary from beginning as
867  * they get dropped on error as well.
868  */
869  ReplicationSlotCreate(cmd->slotname, true,
871  }
872 
873  if (cmd->kind == REPLICATION_KIND_LOGICAL)
874  {
876 
877  /*
878  * Do options check early so that we can bail before calling the
879  * DecodingContextFindStartpoint which can take long time.
880  */
881  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
882  {
883  if (IsTransactionBlock())
884  ereport(ERROR,
885  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
886  "must not be called inside a transaction")));
887  }
888  else if (snapshot_action == CRS_USE_SNAPSHOT)
889  {
890  if (!IsTransactionBlock())
891  ereport(ERROR,
892  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
893  "must be called inside a transaction")));
894 
896  ereport(ERROR,
897  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
898  "must be called in REPEATABLE READ isolation mode transaction")));
899 
900  if (FirstSnapshotSet)
901  ereport(ERROR,
902  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
903  "must be called before any query")));
904 
905  if (IsSubTransaction())
906  ereport(ERROR,
907  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
908  "must not be called in a subtransaction")));
909  }
910 
914 
915  /*
916  * Signal that we don't need the timeout mechanism. We're just
917  * creating the replication slot and don't yet accept feedback
918  * messages or send keepalives. As we possibly need to wait for
919  * further WAL the walsender would otherwise possibly be killed too
920  * soon.
921  */
923 
924  /* build initial snapshot, might take a while */
926 
927  /*
928  * Export or use the snapshot if we've been asked to do so.
929  *
930  * NB. We will convert the snapbuild.c kind of snapshot to normal
931  * snapshot when doing this.
932  */
933  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
934  {
935  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
936  }
937  else if (snapshot_action == CRS_USE_SNAPSHOT)
938  {
939  Snapshot snap;
940 
943  }
944 
945  /* don't need the decoding context anymore */
946  FreeDecodingContext(ctx);
947 
948  if (!cmd->temporary)
950  }
951  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
952  {
954 
956 
957  /* Write this slot to disk if it's a permanent one. */
958  if (!cmd->temporary)
960  }
961 
962  snprintf(xpos, sizeof(xpos), "%X/%X",
965 
967  MemSet(nulls, false, sizeof(nulls));
968 
969  /*----------
970  * Need a tuple descriptor representing four columns:
971  * - first field: the slot name
972  * - second field: LSN at which we became consistent
973  * - third field: exported snapshot's name
974  * - fourth field: output plugin
975  *----------
976  */
977  tupdesc = CreateTemplateTupleDesc(4, false);
978  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
979  TEXTOID, -1, 0);
980  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
981  TEXTOID, -1, 0);
982  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
983  TEXTOID, -1, 0);
984  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
985  TEXTOID, -1, 0);
986 
987  /* prepare for projection of tuples */
988  tstate = begin_tup_output_tupdesc(dest, tupdesc);
989 
990  /* slot_name */
991  slot_name = NameStr(MyReplicationSlot->data.name);
992  values[0] = CStringGetTextDatum(slot_name);
993 
994  /* consistent wal location */
995  values[1] = CStringGetTextDatum(xpos);
996 
997  /* snapshot name, or NULL if none */
998  if (snapshot_name != NULL)
999  values[2] = CStringGetTextDatum(snapshot_name);
1000  else
1001  nulls[2] = true;
1002 
1003  /* plugin, or NULL if none */
1004  if (cmd->plugin != NULL)
1005  values[3] = CStringGetTextDatum(cmd->plugin);
1006  else
1007  nulls[3] = true;
1008 
1009  /* send it to dest */
1010  do_tup_output(tstate, values, nulls);
1011  end_tup_output(tstate);
1012 
1014 }
#define NIL
Definition: pg_list.h:69
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:779
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:30
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2148
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:220
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ReplicationSlotSave(void)
Definition: slot.c:563
ReplicationSlotPersistentData data
Definition: slot.h:115
XLogRecPtr confirmed_flush
Definition: slot.h:76
ReplicationKind kind
Definition: replnodes.h:56
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:744
bool IsTransactionBlock(void)
Definition: xact.c:4304
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:587
void ReplicationSlotReserveWal(void)
Definition: slot.c:909
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:414
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:203
void ReplicationSlotPersist(void)
Definition: slot.c:598
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1145
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:375
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1118
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:372
TimeLineID ThisTimeLineID
Definition: xlog.c:179
struct SnapBuild * snapshot_builder
Definition: logical.h:40
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
static TimeLineID sendTimeLine
Definition: walsender.c:141
#define Assert(condition)
Definition: c.h:675
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:457
int XactIsoLevel
Definition: xact.c:74
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
bool IsSubTransaction(void)
Definition: xact.c:4376
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:509
static Datum values[MAXATTR]
Definition: bootstrap.c:163
int errmsg(const char *fmt,...)
Definition: elog.c:797
CRSSnapshotAction
Definition: walsender.h:22
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:211
#define NameStr(name)
Definition: c.h:499
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
void ReplicationSlotMarkDirty(void)
Definition: slot.c:581
static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1020 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), and DropReplicationSlotCmd::slotname.

Referenced by exec_replication_command(), and main().

1021 {
1023  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1024 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void ReplicationSlotDrop(const char *name)
Definition: slot.c:440
bool exec_replication_command ( const char *  cmd_string)

Definition at line 1364 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, PreventTransactionChain(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, and Node::type.

Referenced by PostgresMain().

1365 {
1366  int parse_rc;
1367  Node *cmd_node;
1368  MemoryContext cmd_context;
1369  MemoryContext old_context;
1370 
1371  /*
1372  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1373  * command arrives. Clean up the old stuff if there's anything.
1374  */
1376 
1378 
1380  "Replication command context",
1382  old_context = MemoryContextSwitchTo(cmd_context);
1383 
1384  replication_scanner_init(cmd_string);
1385  parse_rc = replication_yyparse();
1386  if (parse_rc != 0)
1387  ereport(ERROR,
1388  (errcode(ERRCODE_SYNTAX_ERROR),
1389  (errmsg_internal("replication command parser returned %d",
1390  parse_rc))));
1391 
1392  cmd_node = replication_parse_result;
1393 
1394  /*
1395  * Log replication command if log_replication_commands is enabled. Even
1396  * when it's disabled, log the command with DEBUG1 level for backward
1397  * compatibility. Note that SQL commands are not logged here, and will be
1398  * logged later if log_statement is enabled.
1399  */
1400  if (cmd_node->type != T_SQLCmd)
1402  (errmsg("received replication command: %s", cmd_string)));
1403 
1404  /*
1405  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1406  * called outside of transaction the snapshot should be cleared here.
1407  */
1408  if (!IsTransactionBlock())
1410 
1411  /*
1412  * For aborted transactions, don't allow anything except pure SQL,
1413  * the exec_simple_query() will handle it correctly.
1414  */
1415  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1416  ereport(ERROR,
1417  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1418  errmsg("current transaction is aborted, "
1419  "commands ignored until end of transaction block")));
1420 
1422 
1423  /*
1424  * Allocate buffers that will be used for each outgoing and incoming
1425  * message. We do this just once per command to reduce palloc overhead.
1426  */
1430 
1431  switch (cmd_node->type)
1432  {
1433  case T_IdentifySystemCmd:
1434  IdentifySystem();
1435  break;
1436 
1437  case T_BaseBackupCmd:
1438  PreventTransactionChain(true, "BASE_BACKUP");
1439  SendBaseBackup((BaseBackupCmd *) cmd_node);
1440  break;
1441 
1444  break;
1445 
1448  break;
1449 
1450  case T_StartReplicationCmd:
1451  {
1452  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1453 
1454  PreventTransactionChain(true, "START_REPLICATION");
1455 
1456  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1457  StartReplication(cmd);
1458  else
1460  break;
1461  }
1462 
1463  case T_TimeLineHistoryCmd:
1464  PreventTransactionChain(true, "TIMELINE_HISTORY");
1466  break;
1467 
1468  case T_VariableShowStmt:
1469  {
1471  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1472 
1473  GetPGVariable(n->name, dest);
1474  }
1475  break;
1476 
1477  case T_SQLCmd:
1478  if (MyDatabaseId == InvalidOid)
1479  ereport(ERROR,
1480  (errmsg("not connected to database")));
1481 
1482  /* Tell the caller that this wasn't a WalSender command. */
1483  return false;
1484 
1485  default:
1486  elog(ERROR, "unrecognized replication command node tag: %u",
1487  cmd_node->type);
1488  }
1489 
1490  /* done */
1491  MemoryContextSwitchTo(old_context);
1492  MemoryContextDelete(cmd_context);
1493 
1494  /* Send CommandComplete message */
1495  EndCommand("SELECT", DestRemote);
1496 
1497  return true;
1498 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:419
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1020
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:370
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:509
static StringInfoData output_message
Definition: walsender.c:153
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7898
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4304
ReplicationKind kind
Definition: replnodes.h:81
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:686
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
NodeTag type
Definition: nodes.h:511
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:515
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
static StringInfoData reply_message
Definition: walsender.c:154
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1031
Oid MyDatabaseId
Definition: globals.c:76
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:832
static StringInfoData tmpbuf
Definition: walsender.c:155
bool log_replication_commands
Definition: walsender.c:117
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
static void IdentifySystem(void)
Definition: walsender.c:330
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:648
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3154
void replication_scanner_init(const char *query_string)
static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2768 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), NULL, receiveTLI, result, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2769 {
2770  XLogRecPtr replayPtr;
2771  TimeLineID replayTLI;
2772  XLogRecPtr receivePtr;
2775 
2776  /*
2777  * We can safely send what's already been replayed. Also, if walreceiver
2778  * is streaming WAL from the same timeline, we can send anything that it
2779  * has streamed, but hasn't been replayed yet.
2780  */
2781 
2782  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2783  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2784 
2785  ThisTimeLineID = replayTLI;
2786 
2787  result = replayPtr;
2788  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2789  result = receivePtr;
2790 
2791  return result;
2792 }
uint32 TimeLineID
Definition: xlogdefs.h:45
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
return result
Definition: formatting.c:1618
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11077
static TimeLineID receiveTLI
Definition: xlog.c:201
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void IdentifySystem ( void  )
static

Definition at line 330 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, INT4OID, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, NULL, RecoveryInProgress(), snprintf(), StartTransactionCommand(), TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

331 {
332  char sysid[32];
333  char xpos[MAXFNAMELEN];
334  XLogRecPtr logptr;
335  char *dbname = NULL;
336  DestReceiver *dest;
337  TupOutputState *tstate;
338  TupleDesc tupdesc;
339  Datum values[4];
340  bool nulls[4];
341 
342  /*
343  * Reply with a result set with one row, four columns. First col is system
344  * ID, second is timeline ID, third is current xlog location and the
345  * fourth contains the database name if we are connected to one.
346  */
347 
348  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
350 
353  {
354  /* this also updates ThisTimeLineID */
355  logptr = GetStandbyFlushRecPtr();
356  }
357  else
358  logptr = GetFlushRecPtr();
359 
360  snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
361 
362  if (MyDatabaseId != InvalidOid)
363  {
365 
366  /* syscache access needs a transaction env. */
368  /* make dbname live outside TX context */
372  /* CommitTransactionCommand switches to TopMemoryContext */
374  }
375 
377  MemSet(nulls, false, sizeof(nulls));
378 
379  /* need a tuple descriptor representing four columns */
380  tupdesc = CreateTemplateTupleDesc(4, false);
381  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
382  TEXTOID, -1, 0);
383  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
384  INT4OID, -1, 0);
385  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
386  TEXTOID, -1, 0);
387  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
388  TEXTOID, -1, 0);
389 
390  /* prepare for projection of tuples */
391  tstate = begin_tup_output_tupdesc(dest, tupdesc);
392 
393  /* column 1: system identifier */
394  values[0] = CStringGetTextDatum(sysid);
395 
396  /* column 2: timeline */
397  values[1] = Int32GetDatum(ThisTimeLineID);
398 
399  /* column 3: xlog position */
400  values[2] = CStringGetTextDatum(xpos);
401 
402  /* column 4: database name, or NULL if none */
403  if (dbname)
404  values[3] = CStringGetTextDatum(dbname);
405  else
406  nulls[3] = true;
407 
408  /* send it to dest */
409  do_tup_output(tstate, values, nulls);
410 
411  end_tup_output(tstate);
412 }
#define TEXTOID
Definition: pg_type.h:324
void CommitTransactionCommand(void)
Definition: xact.c:2747
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define INT4OID
Definition: pg_type.h:316
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8223
bool RecoveryInProgress(void)
Definition: xlog.c:7874
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2056
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
unsigned int uint32
Definition: c.h:268
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:76
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2677
char * dbname
Definition: streamutil.c:38
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static Datum values[MAXATTR]
Definition: bootstrap.c:163
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4691
#define Int32GetDatum(X)
Definition: postgres.h:485
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2768
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:316
bool am_cascading_walsender
Definition: walsender.c:109
static void InitWalSenderSlot ( void  )
static

Definition at line 2143 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, ereport, errcode(), errmsg(), FATAL, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, NULL, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

Referenced by WalSndShutdown().

2144 {
2145  int i;
2146 
2147  /*
2148  * WalSndCtl should be set up already (we inherit this by fork() or
2149  * EXEC_BACKEND mechanism from the postmaster).
2150  */
2151  Assert(WalSndCtl != NULL);
2152  Assert(MyWalSnd == NULL);
2153 
2154  /*
2155  * Find a free walsender slot and reserve it. If this fails, we must be
2156  * out of WalSnd structures.
2157  */
2158  for (i = 0; i < max_wal_senders; i++)
2159  {
2160  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2161 
2162  SpinLockAcquire(&walsnd->mutex);
2163 
2164  if (walsnd->pid != 0)
2165  {
2166  SpinLockRelease(&walsnd->mutex);
2167  continue;
2168  }
2169  else
2170  {
2171  /*
2172  * Found a free slot. Reserve it for us.
2173  */
2174  walsnd->pid = MyProcPid;
2175  walsnd->sentPtr = InvalidXLogRecPtr;
2176  walsnd->write = InvalidXLogRecPtr;
2177  walsnd->flush = InvalidXLogRecPtr;
2178  walsnd->apply = InvalidXLogRecPtr;
2179  walsnd->writeLag = -1;
2180  walsnd->flushLag = -1;
2181  walsnd->applyLag = -1;
2182  walsnd->state = WALSNDSTATE_STARTUP;
2183  walsnd->latch = &MyProc->procLatch;
2184  SpinLockRelease(&walsnd->mutex);
2185  /* don't need the lock anymore */
2186  MyWalSnd = (WalSnd *) walsnd;
2187 
2188  break;
2189  }
2190  }
2191  if (MyWalSnd == NULL)
2192  ereport(FATAL,
2193  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2194  errmsg("number of requested standby connections "
2195  "exceeds max_wal_senders (currently %d)",
2196  max_wal_senders)));
2197 
2198  /* Arrange to clean up at walsender exit */
2200 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:38
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
int errcode(int sqlerrcode)
Definition: elog.c:575
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2204
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:103
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define FATAL
Definition: elog.h:52
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:114
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:105
TimeOffset applyLag
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
XLogRecPtr apply
static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3293 of file walsender.c.

References Assert, LAG_TRACKER_BUFFER_SIZE, LagTracker, WalTimeSample::lsn, next, and WalTimeSample::time.

Referenced by ProcessStandbyReplyMessage().

3294 {
3295  TimestampTz time = 0;
3296 
3297  /* Read all unread samples up to this LSN or end of buffer. */
3298  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3299  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3300  {
3301  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3302  LagTracker.last_read[head] =
3303  LagTracker.buffer[LagTracker.read_heads[head]];
3304  LagTracker.read_heads[head] =
3305  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3306  }
3307 
3308  if (time > now)
3309  {
3310  /* If the clock somehow went backwards, treat as not found. */
3311  return -1;
3312  }
3313  else if (time == 0)
3314  {
3315  /*
3316  * We didn't cross a time. If there is a future sample that we
3317  * haven't reached yet, and we've already reached at least one sample,
3318  * let's interpolate the local flushed time. This is mainly useful for
3319  * reporting a completely stuck apply position as having increasing
3320  * lag, since otherwise we'd have to wait for it to eventually start
3321  * moving again and cross one of our samples before we can show the
3322  * lag increasing.
3323  */
3324  if (LagTracker.read_heads[head] != LagTracker.write_head &&
3325  LagTracker.last_read[head].time != 0)
3326  {
3327  double fraction;
3328  WalTimeSample prev = LagTracker.last_read[head];
3329  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3330 
3331  Assert(lsn >= prev.lsn);
3332  Assert(prev.lsn < next.lsn);
3333 
3334  if (prev.time > next.time)
3335  {
3336  /* If the clock somehow went backwards, treat as not found. */
3337  return -1;
3338  }
3339 
3340  /* See how far we are between the previous and next samples. */
3341  fraction =
3342  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3343 
3344  /* Scale the local flush time proportionally. */
3345  time = (TimestampTz)
3346  ((double) prev.time + (next.time - prev.time) * fraction);
3347  }
3348  else
3349  {
3350  /* Couldn't interpolate due to lack of data. */
3351  return -1;
3352  }
3353  }
3354 
3355  /* Return the elapsed time since local flush time in microseconds. */
3356  Assert(time != 0);
3357  return now - time;
3358 }
static int32 next
Definition: blutils.c:210
int64 TimestampTz
Definition: timestamp.h:39
TimestampTz time
Definition: walsender.c:197
XLogRecPtr lsn
Definition: walsender.c:196
#define Assert(condition)
Definition: c.h:675
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:201
static struct @27 LagTracker
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)

Definition at line 3228 of file walsender.c.

References am_walsender, i, LAG_TRACKER_BUFFER_SIZE, LagTracker, and NUM_SYNC_REP_WAIT_MODE.

Referenced by XLogSendPhysical().

3229 {
3230  bool buffer_full;
3231  int new_write_head;
3232  int i;
3233 
3234  if (!am_walsender)
3235  return;
3236 
3237  /*
3238  * If the lsn hasn't advanced since last time, then do nothing. This way
3239  * we only record a new sample when new WAL has been written.
3240  */
3241  if (LagTracker.last_lsn == lsn)
3242  return;
3243  LagTracker.last_lsn = lsn;
3244 
3245  /*
3246  * If advancing the write head of the circular buffer would crash into any
3247  * of the read heads, then the buffer is full. In other words, the
3248  * slowest reader (presumably apply) is the one that controls the release
3249  * of space.
3250  */
3251  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3252  buffer_full = false;
3253  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3254  {
3255  if (new_write_head == LagTracker.read_heads[i])
3256  buffer_full = true;
3257  }
3258 
3259  /*
3260  * If the buffer is full, for now we just rewind by one slot and overwrite
3261  * the last sample, as a simple (if somewhat uneven) way to lower the
3262  * sampling rate. There may be better adaptive compaction algorithms.
3263  */
3264  if (buffer_full)
3265  {
3266  new_write_head = LagTracker.write_head;
3267  if (LagTracker.write_head > 0)
3268  LagTracker.write_head--;
3269  else
3270  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3271  }
3272 
3273  /* Store a sample at the current write head position. */
3274  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3275  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3276  LagTracker.write_head = new_write_head;
3277 }
bool am_walsender
Definition: walsender.c:108
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:201
int i
static struct @27 LagTracker
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page,
TimeLineID pageTLI 
)
static

Definition at line 744 of file walsender.c.

References XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WalSndWaitForWal(), XLogRead(), and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

746 {
747  XLogRecPtr flushptr;
748  int count;
749 
750  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
752  sendTimeLine = state->currTLI;
754  sendTimeLineNextTLI = state->nextTLI;
755 
756  /* make sure we have enough WAL available */
757  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
758 
759  /* more than one block available */
760  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
761  count = XLOG_BLCKSZ;
762  /* not enough WAL synced, that can happen during shutdown */
763  else if (targetPagePtr + reqLen > flushptr)
764  return -1;
765  /* part of the page available */
766  else
767  count = flushptr - targetPagePtr;
768 
769  /* now actually read the data, we know it's there */
770  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
771 
772  return count;
773 }
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:799
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:174
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2232
TimeLineID nextTLI
Definition: xlogreader.h:179
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1233
TimeLineID ThisTimeLineID
Definition: xlog.c:179
TimeLineID currTLI
Definition: xlogreader.h:165
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 2990 of file walsender.c.

References Interval::day, Interval::month, palloc(), result, and Interval::time.

Referenced by pg_stat_get_wal_senders().

2991 {
2992  Interval *result = palloc(sizeof(Interval));
2993 
2994  result->month = 0;
2995  result->day = 0;
2996  result->time = offset;
2997 
2998  return result;
2999 }
return result
Definition: formatting.c:1618
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:849
static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 779 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

782 {
783  ListCell *lc;
784  bool snapshot_action_given = false;
785  bool reserve_wal_given = false;
786 
787  /* Parse options */
788  foreach (lc, cmd->options)
789  {
790  DefElem *defel = (DefElem *) lfirst(lc);
791 
792  if (strcmp(defel->defname, "export_snapshot") == 0)
793  {
794  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
795  ereport(ERROR,
796  (errcode(ERRCODE_SYNTAX_ERROR),
797  errmsg("conflicting or redundant options")));
798 
799  snapshot_action_given = true;
800  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
802  }
803  else if (strcmp(defel->defname, "use_snapshot") == 0)
804  {
805  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
806  ereport(ERROR,
807  (errcode(ERRCODE_SYNTAX_ERROR),
808  errmsg("conflicting or redundant options")));
809 
810  snapshot_action_given = true;
811  *snapshot_action = CRS_USE_SNAPSHOT;
812  }
813  else if (strcmp(defel->defname, "reserve_wal") == 0)
814  {
815  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
816  ereport(ERROR,
817  (errcode(ERRCODE_SYNTAX_ERROR),
818  errmsg("conflicting or redundant options")));
819 
820  reserve_wal_given = true;
821  *reserve_wal = true;
822  }
823  else
824  elog(ERROR, "unrecognized option: %s", defel->defname);
825  }
826 }
int errcode(int sqlerrcode)
Definition: elog.c:575
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:719
#define elog
Definition: elog.h:219
Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3006 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), i, Int32GetDatum, IntervalPGetDatum, IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, NULL, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, superuser(), SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3007 {
3008 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3009  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3010  TupleDesc tupdesc;
3011  Tuplestorestate *tupstore;
3012  MemoryContext per_query_ctx;
3013  MemoryContext oldcontext;
3014  List *sync_standbys;
3015  int i;
3016 
3017  /* check to see if caller supports us returning a tuplestore */
3018  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3019  ereport(ERROR,
3020  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3021  errmsg("set-valued function called in context that cannot accept a set")));
3022  if (!(rsinfo->allowedModes & SFRM_Materialize))
3023  ereport(ERROR,
3024  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3025  errmsg("materialize mode required, but it is not " \
3026  "allowed in this context")));
3027 
3028  /* Build a tuple descriptor for our result type */
3029  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3030  elog(ERROR, "return type must be a row type");
3031 
3032  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3033  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3034 
3035  tupstore = tuplestore_begin_heap(true, false, work_mem);
3036  rsinfo->returnMode = SFRM_Materialize;
3037  rsinfo->setResult = tupstore;
3038  rsinfo->setDesc = tupdesc;
3039 
3040  MemoryContextSwitchTo(oldcontext);
3041 
3042  /*
3043  * Get the currently active synchronous standbys.
3044  */
3045  LWLockAcquire(SyncRepLock, LW_SHARED);
3046  sync_standbys = SyncRepGetSyncStandbys(NULL);
3047  LWLockRelease(SyncRepLock);
3048 
3049  for (i = 0; i < max_wal_senders; i++)
3050  {
3051  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3053  XLogRecPtr write;
3054  XLogRecPtr flush;
3055  XLogRecPtr apply;
3056  TimeOffset writeLag;
3057  TimeOffset flushLag;
3058  TimeOffset applyLag;
3059  int priority;
3062  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3063 
3064  if (walsnd->pid == 0)
3065  continue;
3066 
3067  SpinLockAcquire(&walsnd->mutex);
3068  sentPtr = walsnd->sentPtr;
3069  state = walsnd->state;
3070  write = walsnd->write;
3071  flush = walsnd->flush;
3072  apply = walsnd->apply;
3073  writeLag = walsnd->writeLag;
3074  flushLag = walsnd->flushLag;
3075  applyLag = walsnd->applyLag;
3076  priority = walsnd->sync_standby_priority;
3077  SpinLockRelease(&walsnd->mutex);
3078 
3079  memset(nulls, 0, sizeof(nulls));
3080  values[0] = Int32GetDatum(walsnd->pid);
3081 
3082  if (!superuser())
3083  {
3084  /*
3085  * Only superusers can see details. Other users only get the pid
3086  * value to know it's a walsender, but no details.
3087  */
3088  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3089  }
3090  else
3091  {
3092  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3093 
3094  if (XLogRecPtrIsInvalid(sentPtr))
3095  nulls[2] = true;
3096  values[2] = LSNGetDatum(sentPtr);
3097 
3098  if (XLogRecPtrIsInvalid(write))
3099  nulls[3] = true;
3100  values[3] = LSNGetDatum(write);
3101 
3102  if (XLogRecPtrIsInvalid(flush))
3103  nulls[4] = true;
3104  values[4] = LSNGetDatum(flush);
3105 
3106  if (XLogRecPtrIsInvalid(apply))
3107  nulls[5] = true;
3108  values[5] = LSNGetDatum(apply);
3109 
3110  /*
3111  * Treat a standby such as a pg_basebackup background process
3112  * which always returns an invalid flush location, as an
3113  * asynchronous standby.
3114  */
3115  priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
3116 
3117  if (writeLag < 0)
3118  nulls[6] = true;
3119  else
3120  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3121 
3122  if (flushLag < 0)
3123  nulls[7] = true;
3124  else
3125  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3126 
3127  if (applyLag < 0)
3128  nulls[8] = true;
3129  else
3130  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3131 
3132  values[9] = Int32GetDatum(priority);
3133 
3134  /*
3135  * More easily understood version of standby state. This is purely
3136  * informational.
3137  *
3138  * In quorum-based sync replication, the role of each standby
3139  * listed in synchronous_standby_names can be changing very
3140  * frequently. Any standbys considered as "sync" at one moment can
3141  * be switched to "potential" ones at the next moment. So, it's
3142  * basically useless to report "sync" or "potential" as their sync
3143  * states. We report just "quorum" for them.
3144  */
3145  if (priority == 0)
3146  values[10] = CStringGetTextDatum("async");
3147  else if (list_member_int(sync_standbys, i))
3149  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3150  else
3151  values[10] = CStringGetTextDatum("potential");
3152  }
3153 
3154  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3155  }
3156 
3157  /* clean up and return the tuplestore */
3158  tuplestore_donestoring(tupstore);
3159 
3160  return (Datum) 0;
3161 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
#define write(a, b, c)
Definition: win32.h:14
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:857
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:677
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
bool list_member_int(const List *list, int datum)
Definition: list.c:485
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:114
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:372
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:2973
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:150
int allowedModes
Definition: execnodes.h:268
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:273
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:266
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:274
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:2990
XLogRecPtr apply
Definition: pg_list.h:45
static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1638 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1639 {
1640  bool changed = false;
1642 
1643  Assert(lsn != InvalidXLogRecPtr);
1644  SpinLockAcquire(&slot->mutex);
1645  if (slot->data.restart_lsn != lsn)
1646  {
1647  changed = true;
1648  slot->data.restart_lsn = lsn;
1649  }
1650  SpinLockRelease(&slot->mutex);
1651 
1652  if (changed)
1653  {
1656  }
1657 
1658  /*
1659  * One could argue that the slot should be saved to disk now, but that'd
1660  * be energy wasted - the worst lost information can do here is give us
1661  * wrong information in a statistics view - we'll just potentially be more
1662  * conservative in removing files.
1663  */
1664 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:665
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:675
XLogRecPtr restart_lsn
Definition: slot.h:68
slock_t mutex
Definition: slot.h:88
void ReplicationSlotMarkDirty(void)
Definition: slot.c:581
static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1762 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1763 {
1764  bool changed = false;
1766 
1767  SpinLockAcquire(&slot->mutex);
1769 
1770  /*
1771  * For physical replication we don't need the interlock provided by xmin
1772  * and effective_xmin since the consequences of a missed increase are
1773  * limited to query cancellations, so set both at once.
1774  */
1775  if (!TransactionIdIsNormal(slot->data.xmin) ||
1776  !TransactionIdIsNormal(feedbackXmin) ||
1777  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1778  {
1779  changed = true;
1780  slot->data.xmin = feedbackXmin;
1781  slot->effective_xmin = feedbackXmin;
1782  }
1783  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1784  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1785  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1786  {
1787  changed = true;
1788  slot->data.catalog_xmin = feedbackCatalogXmin;
1789  slot->effective_catalog_xmin = feedbackCatalogXmin;
1790  }
1791  SpinLockRelease(&slot->mutex);
1792 
1793  if (changed)
1794  {
1797  }
1798 }
TransactionId xmin
Definition: proc.h:213
ReplicationSlotPersistentData data
Definition: slot.h:115
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:111
TransactionId catalog_xmin
Definition: slot.h:65
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:57
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:112
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:88
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:617
void ReplicationSlotMarkDirty(void)
Definition: slot.c:581
static void ProcessRepliesIfAny ( void  )
static

Definition at line 1505 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_reply_timestamp, NULL, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1506 {
1507  unsigned char firstchar;
1508  int r;
1509  bool received = false;
1510 
1511  for (;;)
1512  {
1513  pq_startmsgread();
1514  r = pq_getbyte_if_available(&firstchar);
1515  if (r < 0)
1516  {
1517  /* unexpected error or EOF */
1519  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1520  errmsg("unexpected EOF on standby connection")));
1521  proc_exit(0);
1522  }
1523  if (r == 0)
1524  {
1525  /* no data available without blocking */
1526  pq_endmsgread();
1527  break;
1528  }
1529 
1530  /* Read the message contents */
1532  if (pq_getmessage(&reply_message, 0))
1533  {
1535  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1536  errmsg("unexpected EOF on standby connection")));
1537  proc_exit(0);
1538  }
1539 
1540  /*
1541  * If we already received a CopyDone from the frontend, the frontend
1542  * should not send us anything until we've closed our end of the COPY.
1543  * XXX: In theory, the frontend could already send the next command
1544  * before receiving the CopyDone, but libpq doesn't currently allow
1545  * that.
1546  */
1547  if (streamingDoneReceiving && firstchar != 'X')
1548  ereport(FATAL,
1549  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1550  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1551  firstchar)));
1552 
1553  /* Handle the very limited subset of commands expected in this phase */
1554  switch (firstchar)
1555  {
1556  /*
1557  * 'd' means a standby reply wrapped in a CopyData packet.
1558  */
1559  case 'd':
1561  received = true;
1562  break;
1563 
1564  /*
1565  * CopyDone means the standby requested to finish streaming.
1566  * Reply with CopyDone, if we had not sent that already.
1567  */
1568  case 'c':
1569  if (!streamingDoneSending)
1570  {
1571  pq_putmessage_noblock('c', NULL, 0);
1572  streamingDoneSending = true;
1573  }
1574 
1575  streamingDoneReceiving = true;
1576  received = true;
1577  break;
1578 
1579  /*
1580  * 'X' means that the standby is closing down the socket.
1581  */
1582  case 'X':
1583  proc_exit(0);
1584 
1585  default:
1586  ereport(FATAL,
1587  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1588  errmsg("invalid standby message type \"%c\"",
1589  firstchar)));
1590  }
1591  }
1592 
1593  /*
1594  * Save the last reply timestamp if we've received at least one reply.
1595  */
1596  if (received)
1597  {
1599  waiting_for_ping_response = false;
1600  }
1601 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1607
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
void pq_startmsgread(void)
Definition: pqcomm.c:1191
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1021
static bool streamingDoneSending
Definition: walsender.c:172
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
#define ereport(elevel, rest)
Definition: elog.h:122
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1253
static StringInfoData reply_message
Definition: walsender.c:154
void pq_endmsgread(void)
Definition: pqcomm.c:1215
#define NULL
Definition: c.h:229
static bool streamingDoneReceiving
Definition: walsender.c:173
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:164
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1839 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, MyPgXact, MyReplicationSlot, NULL, PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1840 {
1841  TransactionId feedbackXmin;
1842  uint32 feedbackEpoch;
1843  TransactionId feedbackCatalogXmin;
1844  uint32 feedbackCatalogEpoch;
1845 
1846  /*
1847  * Decipher the reply message. The caller already consumed the msgtype
1848  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1849  * of this message.
1850  */
1851  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1852  feedbackXmin = pq_getmsgint(&reply_message, 4);
1853  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1854  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1855  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1856 
1857  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1858  feedbackXmin,
1859  feedbackEpoch,
1860  feedbackCatalogXmin,
1861  feedbackCatalogEpoch);
1862 
1863  /*
1864  * Unset WalSender's xmins if the feedback message values are invalid.
1865  * This happens when the downstream turned hot_standby_feedback off.
1866  */
1867  if (!TransactionIdIsNormal(feedbackXmin)
1868  && !TransactionIdIsNormal(feedbackCatalogXmin))
1869  {
1871  if (MyReplicationSlot != NULL)
1872  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1873  return;
1874  }
1875 
1876  /*
1877  * Check that the provided xmin/epoch are sane, that is, not in the future
1878  * and not so far back as to be already wrapped around. Ignore if not.
1879  */
1880  if (TransactionIdIsNormal(feedbackXmin) &&
1881  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1882  return;
1883 
1884  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1885  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1886  return;
1887 
1888  /*
1889  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1890  * the xmin will be taken into account by GetOldestXmin. This will hold
1891  * back the removal of dead rows and thereby prevent the generation of
1892  * cleanup conflicts on the standby server.
1893  *
1894  * There is a small window for a race condition here: although we just
1895  * checked that feedbackXmin precedes nextXid, the nextXid could have
1896  * gotten advanced between our fetching it and applying the xmin below,
1897  * perhaps far enough to make feedbackXmin wrap around. In that case the
1898  * xmin we set here would be "in the future" and have no effect. No point
1899  * in worrying about this since it's too late to save the desired data
1900  * anyway. Assuming that the standby sends us an increasing sequence of
1901  * xmins, this could only happen during the first reply cycle, else our
1902  * own xmin would prevent nextXid from advancing so far.
1903  *
1904  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1905  * is assumed atomic, and there's no real need to prevent a concurrent
1906  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1907  * safe, and if we're moving it backwards, well, the data is at risk
1908  * already since a VACUUM could have just finished calling GetOldestXmin.)
1909  *
1910  * If we're using a replication slot we reserve the xmin via that,
1911  * otherwise via the walsender's PGXACT entry. We can only track the
1912  * catalog xmin separately when using a slot, so we store the least
1913  * of the two provided when not using a slot.
1914  *
1915  * XXX: It might make sense to generalize the ephemeral slot concept and
1916  * always use the slot mechanism to handle the feedback xmin.
1917  */
1918  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1919  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1920  else
1921  {
1922  if (TransactionIdIsNormal(feedbackCatalogXmin)
1923  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
1924  MyPgXact->xmin = feedbackCatalogXmin;
1925  else
1926  MyPgXact->xmin = feedbackXmin;
1927  }
1928 }
uint32 TransactionId
Definition: c.h:397
TransactionId xmin
Definition: proc.h:213
PGXACT * MyPgXact
Definition: proc.c:68
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1811
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:268
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:154
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1762
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define elog
Definition: elog.h:219
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static void ProcessStandbyMessage ( void  )
static

Definition at line 1607 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1608 {
1609  char msgtype;
1610 
1611  /*
1612  * Check message type from the first byte.
1613  */
1614  msgtype = pq_getmsgbyte(&reply_message);
1615 
1616  switch (msgtype)
1617  {
1618  case 'r':
1620  break;
1621 
1622  case 'h':
1624  break;
1625 
1626  default:
1628  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1629  errmsg("unexpected message type \"%c\"", msgtype)));
1630  proc_exit(0);
1631  }
1632 }
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static StringInfoData reply_message
Definition: walsender.c:154
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1670
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1839
static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1670 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1671 {
1672  XLogRecPtr writePtr,
1673  flushPtr,
1674  applyPtr;
1675  bool replyRequested;
1676  TimeOffset writeLag,
1677  flushLag,
1678  applyLag;
1679  bool clearLagTimes;
1680  TimestampTz now;
1681 
1682  static bool fullyAppliedLastTime = false;
1683 
1684  /* the caller already consumed the msgtype byte */
1685  writePtr = pq_getmsgint64(&reply_message);
1686  flushPtr = pq_getmsgint64(&reply_message);
1687  applyPtr = pq_getmsgint64(&reply_message);
1688  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1689  replyRequested = pq_getmsgbyte(&reply_message);
1690 
1691  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1692  (uint32) (writePtr >> 32), (uint32) writePtr,
1693  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1694  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1695  replyRequested ? " (reply requested)" : "");
1696 
1697  /* See if we can compute the round-trip lag for these positions. */
1698  now = GetCurrentTimestamp();
1699  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1700  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1701  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1702 
1703  /*
1704  * If the standby reports that it has fully replayed the WAL in two
1705  * consecutive reply messages, then the second such message must result
1706  * from wal_receiver_status_interval expiring on the standby. This is a
1707  * convenient time to forget the lag times measured when it last
1708  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1709  * until more WAL traffic arrives.
1710  */
1711  clearLagTimes = false;
1712  if (applyPtr == sentPtr)
1713  {
1714  if (fullyAppliedLastTime)
1715  clearLagTimes = true;
1716  fullyAppliedLastTime = true;
1717  }
1718  else
1719  fullyAppliedLastTime = false;
1720 
1721  /* Send a reply if the standby requested one. */
1722  if (replyRequested)
1723  WalSndKeepalive(false);
1724 
1725  /*
1726  * Update shared state for this WalSender process based on reply data from
1727  * standby.
1728  */
1729  {
1730  WalSnd *walsnd = MyWalSnd;
1731 
1732  SpinLockAcquire(&walsnd->mutex);
1733  walsnd->write = writePtr;
1734  walsnd->flush = flushPtr;
1735  walsnd->apply = applyPtr;
1736  if (writeLag != -1 || clearLagTimes)
1737  walsnd->writeLag = writeLag;
1738  if (flushLag != -1 || clearLagTimes)
1739  walsnd->flushLag = flushLag;
1740  if (applyLag != -1 || clearLagTimes)
1741  walsnd->applyLag = applyLag;
1742  SpinLockRelease(&walsnd->mutex);
1743  }
1744 
1747 
1748  /*
1749  * Advance our local xmin horizon when the client confirmed a flush.
1750  */
1751  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1752  {
1755  else
1757  }
1758 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3169
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1638
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:268
#define SlotIsLogical(slot)
Definition: slot.h:134
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:154
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3293
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:884
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define elog
Definition: elog.h:219
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:409
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
bool am_cascading_walsender
Definition: walsender.c:109
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 419 of file walsender.c.

References buf, BYTEAOID, CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint(), pq_sendstring(), read, TEXTOID, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

420 {
422  char histfname[MAXFNAMELEN];
423  char path[MAXPGPATH];
424  int fd;
425  off_t histfilelen;
426  off_t bytesleft;
427  Size len;
428 
429  /*
430  * Reply with a result set with one row, and two columns. The first col is
431  * the name of the history file, 2nd is the contents.
432  */
433 
434  TLHistoryFileName(histfname, cmd->timeline);
435  TLHistoryFilePath(path, cmd->timeline);
436 
437  /* Send a RowDescription message */
438  pq_beginmessage(&buf, 'T');
439  pq_sendint(&buf, 2, 2); /* 2 fields */
440 
441  /* first field */
442  pq_sendstring(&buf, "filename"); /* col name */
443  pq_sendint(&buf, 0, 4); /* table oid */
444  pq_sendint(&buf, 0, 2); /* attnum */
445  pq_sendint(&buf, TEXTOID, 4); /* type oid */
446  pq_sendint(&buf, -1, 2); /* typlen */
447  pq_sendint(&buf, 0, 4); /* typmod */
448  pq_sendint(&buf, 0, 2); /* format code */
449 
450  /* second field */
451  pq_sendstring(&buf, "content"); /* col name */
452  pq_sendint(&buf, 0, 4); /* table oid */
453  pq_sendint(&buf, 0, 2); /* attnum */
454  pq_sendint(&buf, BYTEAOID, 4); /* type oid */
455  pq_sendint(&buf, -1, 2); /* typlen */
456  pq_sendint(&buf, 0, 4); /* typmod */
457  pq_sendint(&buf, 0, 2); /* format code */
458  pq_endmessage(&buf);
459 
460  /* Send a DataRow message */
461  pq_beginmessage(&buf, 'D');
462  pq_sendint(&buf, 2, 2); /* # of columns */
463  len = strlen(histfname);
464  pq_sendint(&buf, len, 4); /* col1 len */
465  pq_sendbytes(&buf, histfname, len);
466 
467  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
468  if (fd < 0)
469  ereport(ERROR,
471  errmsg("could not open file \"%s\": %m", path)));
472 
473  /* Determine file length and send it to client */
474  histfilelen = lseek(fd, 0, SEEK_END);
475  if (histfilelen < 0)
476  ereport(ERROR,
478  errmsg("could not seek to end of file \"%s\": %m", path)));
479  if (lseek(fd, 0, SEEK_SET) != 0)
480  ereport(ERROR,
482  errmsg("could not seek to beginning of file \"%s\": %m", path)));
483 
484  pq_sendint(&buf, histfilelen, 4); /* col2 len */
485 
486  bytesleft = histfilelen;
487  while (bytesleft > 0)
488  {
489  char rbuf[BLCKSZ];
490  int nread;
491 
493  nread = read(fd, rbuf, sizeof(rbuf));
495  if (nread <= 0)
496  ereport(ERROR,
498  errmsg("could not read file \"%s\": %m",
499  path)));
500  pq_sendbytes(&buf, rbuf, nread);
501  bytesleft -= nread;
502  }
503  CloseTransientFile(fd);
504 
505  pq_endmessage(&buf);
506 }
#define TEXTOID
Definition: pg_type.h:324
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
TimeLineID timeline
Definition: replnodes.h:96
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:66
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2305
#define MAXFNAMELEN
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
#define BYTEAOID
Definition: pg_type.h:292
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:115
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)
static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1031 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), walsender_ready_to_stop, WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

1032 {
1034 
1035  /* make sure that our requirements are still fulfilled */
1037 
1039 
1041 
1042  /*
1043  * Force a disconnect, so that the decoding code doesn't need to care
1044  * about an eventual switch from running in recovery, to running in a
1045  * normal environment. Client code is expected to handle reconnects.
1046  */
1048  {
1049  ereport(LOG,
1050  (errmsg("terminating walsender process after promotion")));
1051  walsender_ready_to_stop = true;
1052  }
1053 
1055 
1056  /* Send a CopyBothResponse message, and start streaming */
1057  pq_beginmessage(&buf, 'W');
1058  pq_sendbyte(&buf, 0);
1059  pq_sendint(&buf, 0, 2);
1060  pq_endmessage(&buf);
1061  pq_flush();
1062 
1063  /*
1064  * Initialize position to the last ack'ed one, then the xlog records begin
1065  * to be shipped from that position.
1066  */
1068  cmd->startpoint, cmd->options,
1071 
1072  /* Start reading WAL from the oldest required WAL. */
1074 
1075  /*
1076  * Report the location after which we'll send out further commits as the
1077  * current sentPtr.
1078  */
1080 
1081  /* Also update the sent position status in shared memory */
1082  {
1083  WalSnd *walsnd = MyWalSnd;
1084 
1085  SpinLockAcquire(&walsnd->mutex);
1087  SpinLockRelease(&walsnd->mutex);
1088  }
1089 
1090  replication_active = true;
1091 
1093 
1094  /* Main loop of walsender */
1096 
1099 
1100  replication_active = false;
1102  proc_exit(0);
1104 
1105  /* Get out of COPY mode (CommandComplete). */
1106  EndCommand("COPY 0", DestRemote);
1107 }
XLogRecPtr startpoint
Definition: replnodes.h:84
#define pq_flush()
Definition: libpq.h:39
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void proc_exit(int code)
Definition: ipc.c:99
ReplicationSlotPersistentData data
Definition: slot.h:115
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7874
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
XLogRecPtr confirmed_flush
Definition: slot.h:76
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:744
#define SpinLockAcquire(lock)
Definition: spin.h:62
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:190
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:327
static char * buf
Definition: pg_test_fsync.c:66
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1145
static XLogRecPtr logical_startptr
Definition: walsender.c:191
void ReplicationSlotRelease(void)
Definition: slot.c:375
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2007
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1118
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:675
void WalSndSetState(WalSndState state)
Definition: walsender.c:2954
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:457
static void XLogSendLogical(void)
Definition: walsender.c:2666
XLogRecPtr restart_lsn
Definition: slot.h:68
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
bool am_cascading_walsender
Definition: walsender.c:109
static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 515 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), Int64GetDatum(), INT8OID, InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TEXTOID, ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TupleDescInitBuiltinEntry(), values, walsender_ready_to_stop, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

516 {
518  XLogRecPtr FlushPtr;
519 
520  if (ThisTimeLineID == 0)
521  ereport(ERROR,
522  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
523  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
524 
525  /*
526  * We assume here that we're logging enough information in the WAL for
527  * log-shipping, since this is checked in PostmasterMain().
528  *
529  * NOTE: wal_level can only change at shutdown, so in most cases it is
530  * difficult for there to be WAL data that we can still see that was
531  * written at wal_level='minimal'.
532  */
533 
534  if (cmd->slotname)
535  {
538  ereport(ERROR,
539  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
540  (errmsg("cannot use a logical replication slot for physical replication"))));
541  }
542 
543  /*
544  * Select the timeline. If it was given explicitly by the client, use
545  * that. Otherwise use the timeline of the last replayed record, which is
546  * kept in ThisTimeLineID.
547  */
549  {
550  /* this also updates ThisTimeLineID */
551  FlushPtr = GetStandbyFlushRecPtr();
552  }
553  else
554  FlushPtr = GetFlushRecPtr();
555 
556  if (cmd->timeline != 0)
557  {
558  XLogRecPtr switchpoint;
559 
560  sendTimeLine = cmd->timeline;
562  {
563  sendTimeLineIsHistoric = false;
565  }
566  else
567  {
568  List *timeLineHistory;
569 
570  sendTimeLineIsHistoric = true;
571 
572  /*
573  * Check that the timeline the client requested exists, and
574  * the requested start location is on that timeline.
575  */
576  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
577  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
579  list_free_deep(timeLineHistory);
580 
581  /*
582  * Found the requested timeline in the history. Check that
583  * requested startpoint is on that timeline in our history.
584  *
585  * This is quite loose on purpose. We only check that we didn't
586  * fork off the requested timeline before the switchpoint. We
587  * don't check that we switched *to* it before the requested
588  * starting point. This is because the client can legitimately
589  * request to start replication from the beginning of the WAL
590  * segment that contains switchpoint, but on the new timeline, so
591  * that it doesn't end up with a partial segment. If you ask for
592  * too old a starting point, you'll get an error later when we fail
593  * to find the requested WAL segment in pg_wal.
594  *
595  * XXX: we could be more strict here and only allow a startpoint
596  * that's older than the switchpoint, if it's still in the same
597  * WAL segment.
598  */
599  if (!XLogRecPtrIsInvalid(switchpoint) &&
600  switchpoint < cmd->startpoint)
601  {
602  ereport(ERROR,
603  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
604  (uint32) (cmd->startpoint >> 32),
605  (uint32) (cmd->startpoint),
606  cmd->timeline),
607  errdetail("This server's history forked from timeline %u at %X/%X.",
608  cmd->timeline,
609  (uint32) (switchpoint >> 32),
610  (uint32) (switchpoint))));
611  }
612  sendTimeLineValidUpto = switchpoint;
613  }
614  }
615  else
616  {
619  sendTimeLineIsHistoric = false;
620  }
621 
623 
624  /* If there is nothing to stream, don't even enter COPY mode */
626  {
627  /*
628  * When we first start replication the standby will be behind the
629  * primary. For some applications, for example synchronous
630  * replication, it is important to have a clear state for this initial
631  * catchup mode, so we can trigger actions when we change streaming
632  * state later. We may stay in this state for a long time, which is
633  * exactly why we want to be able to monitor whether or not we are
634  * still here.
635  */
637 
638  /* Send a CopyBothResponse message, and start streaming */
639  pq_beginmessage(&buf, 'W');
640  pq_sendbyte(&buf, 0);
641  pq_sendint(&buf, 0, 2);
642  pq_endmessage(&buf);
643  pq_flush();
644 
645  /*
646  * Don't allow a request to stream from a future point in WAL that
647  * hasn't been flushed to disk in this server yet.
648  */
649  if (FlushPtr < cmd->startpoint)
650  {
651  ereport(ERROR,
652  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
653  (uint32) (cmd->startpoint >> 32),
654  (uint32) (cmd->startpoint),
655  (uint32) (FlushPtr >> 32),
656  (uint32) (FlushPtr))));
657  }
658 
659  /* Start streaming from the requested point */
660  sentPtr = cmd->startpoint;
661 
662  /* Initialize shared memory status, too */
663  {
664  WalSnd *walsnd = MyWalSnd;
665 
666  SpinLockAcquire(&walsnd->mutex);
667  walsnd->sentPtr = sentPtr;
668  SpinLockRelease(&walsnd->mutex);
669  }
670 
672 
673  /* Main loop of walsender */
674  replication_active = true;
675 
677 
678  replication_active = false;
680  proc_exit(0);
682 
684  }
685 
686  if (cmd->slotname)
688 
689  /*
690  * Copy is finished now. Send a single-row result set indicating the next
691  * timeline.
692  */
694  {
695  char startpos_str[8 + 1 + 8 + 1];
696  DestReceiver *dest;
697  TupOutputState *tstate;
698  TupleDesc tupdesc;
699  Datum values[2];
700  bool nulls[2];
701 
702  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
703  (uint32) (sendTimeLineValidUpto >> 32),
705 
707  MemSet(nulls, false, sizeof(nulls));
708 
709  /*
710  * Need a tuple descriptor representing two columns.
711  * int8 may seem like a surprising data type for this, but in theory
712  * int4 would not be wide enough for this, as TimeLineID is unsigned.
713  */
714  tupdesc = CreateTemplateTupleDesc(2, false);
715  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
716  INT8OID, -1, 0);
717  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
718  TEXTOID, -1, 0);
719 
720  /* prepare for projection of tuple */
721  tstate = begin_tup_output_tupdesc(dest, tupdesc);
722 
723  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
724  values[1] = CStringGetTextDatum(startpos_str);
725 
726  /* send it to dest */
727  do_tup_output(tstate, values, nulls);
728 
729  end_tup_output(tstate);
730  }
731 
732  /* Send CommandComplete message */
733  pq_puttextmessage('C', "START_STREAMING");
734 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:84
#define pq_flush()
Definition: libpq.h:39
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define TEXTOID
Definition: pg_type.h:324
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static void XLogSendPhysical(void)
Definition: walsender.c:2407
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8223
void list_free_deep(List *list)
Definition: list.c:1147
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:66
static bool streamingDoneSending
Definition: walsender.c:172
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
int errdetail(const char *fmt,...)
Definition: elog.c:873
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:375
#define SlotIsLogical(slot)
Definition: slot.h:134
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1791
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2007
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
uintptr_t Datum
Definition: postgres.h:372
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define INT8OID
Definition: pg_type.h:304
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
void WalSndSetState(WalSndState state)
Definition: walsender.c:2954
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static bool streamingDoneReceiving
Definition: walsender.c:173
static Datum values[MAXATTR]
Definition: bootstrap.c:163
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
TimeLineID timeline
Definition: replnodes.h:83
#define CStringGetTextDatum(s)
Definition: builtins.h:91
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2768
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:400
Definition: pg_list.h:45
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
bool am_cascading_walsender
Definition: walsender.c:109
static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 1811 of file walsender.c.

References GetNextXidAndEpoch(), and TransactionIdPrecedesOrEquals().

Referenced by ProcessStandbyHSFeedbackMessage().

1812 {
1813  TransactionId nextXid;
1814  uint32 nextEpoch;
1815 
1816  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1817 
1818  if (xid <= nextXid)
1819  {
1820  if (epoch != nextEpoch)
1821  return false;
1822  }
1823  else
1824  {
1825  if (epoch + 1 != nextEpoch)
1826  return false;
1827  }
1828 
1829  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1830  return false; /* epoch OK, but it's wrapped around */
1831 
1832  return true;
1833 }
uint32 TransactionId
Definition: c.h:397
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8292
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:268
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
static void WalSndCheckTimeOut ( TimestampTz  now)
static

Definition at line 1980 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1981 {
1982  TimestampTz timeout;
1983 
1984  /* don't bail out if we're doing something that doesn't require timeouts */
1985  if (last_reply_timestamp <= 0)
1986  return;
1987 
1990 
1991  if (wal_sender_timeout > 0 && now >= timeout)
1992  {
1993  /*
1994  * Since typically expiration of replication timeout means
1995  * communication problem, we don't send the error message to the
1996  * standby.
1997  */
1999  (errmsg("terminating walsender process due to replication timeout")));
2000 
2001  WalSndShutdown();
2002  }
2003 }
int wal_sender_timeout
Definition: walsender.c:115
int64 TimestampTz
Definition: timestamp.h:39
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndShutdown(void)
Definition: walsender.c:223
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:797
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 1938 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1939 {
1940  long sleeptime = 10000; /* 10 s */
1941 
1943  {
1944  TimestampTz wakeup_time;
1945  long sec_to_timeout;
1946  int microsec_to_timeout;
1947 
1948  /*
1949  * At the latest stop sleeping once wal_sender_timeout has been
1950  * reached.
1951  */
1954 
1955  /*
1956  * If no ping has been sent yet, wakeup when it's time to do so.
1957  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
1958  * the timeout passed without a response.
1959  */
1962  wal_sender_timeout / 2);
1963 
1964  /* Compute relative time until wakeup. */
1965  TimestampDifference(now, wakeup_time,
1966  &sec_to_timeout, &microsec_to_timeout);
1967 
1968  sleeptime = sec_to_timeout * 1000 +
1969  microsec_to_timeout / 1000;
1970  }
1971 
1972  return sleeptime;
1973 }
int wal_sender_timeout
Definition: walsender.c:115
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
static bool waiting_for_ping_response
Definition: walsender.c:164
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2727 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2728 {
2729  XLogRecPtr replicatedPtr;
2730 
2731  /* ... let's just be real sure we're caught up ... */
2732  send_data();
2733 
2734  /*
2735  * To figure out whether all WAL has successfully been replicated, check
2736  * flush location if valid, write otherwise. Tools like pg_receivewal
2737  * will usually (unless in synchronous mode) return an invalid flush
2738  * location.
2739  */
2740  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2742 
2743  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2744  !pq_is_send_pending())
2745  {
2746  /* Inform the standby that XLOG streaming is done */
2747  EndCommand("COPY 0", DestRemote);
2748  pq_flush();
2749 
2750  proc_exit(0);
2751  }
2753  {
2754  WalSndKeepalive(true);
2756  }
2757 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3169
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static bool WalSndCaughtUp
Definition: walsender.c:176
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:164
void WalSndErrorCleanup ( void  )

Definition at line 284 of file walsender.c.

References close, ConditionVariableCancelSleep(), LWLockReleaseAll(), MyReplicationSlot, NULL, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, walsender_ready_to_stop, WalSndSetState(), and WALSNDSTATE_STARTUP.

Referenced by PostgresMain().

285 {
289 
290  if (sendFile >= 0)
291  {
292  close(sendFile);
293  sendFile = -1;
294  }
295 
296  if (MyReplicationSlot != NULL)
298 
300 
301  replication_active = false;
303  proc_exit(0);
304 
305  /* Revert back to startup state */
307 }
static int sendFile
Definition: walsender.c:128
void proc_exit(int code)
Definition: ipc.c:99
void ConditionVariableCancelSleep(void)
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
void ReplicationSlotRelease(void)
Definition: slot.c:375
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
void WalSndSetState(WalSndState state)
Definition: walsender.c:2954
void ReplicationSlotCleanup(void)
Definition: slot.c:413
void LWLockReleaseAll(void)
Definition: lwlock.c:1814
#define close(a)
Definition: win32.h:12
static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 2973 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

2974 {
2975  switch (state)
2976  {
2977  case WALSNDSTATE_STARTUP:
2978  return "startup";
2979  case WALSNDSTATE_BACKUP:
2980  return "backup";
2981  case WALSNDSTATE_CATCHUP:
2982  return "catchup";
2983  case WALSNDSTATE_STREAMING:
2984  return "streaming";
2985  }
2986  return "UNKNOWN";
2987 }
Definition: regguts.h:298
static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3169 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3170 {
3171  elog(DEBUG2, "sending replication keepalive");
3172 
3173  /* construct the message... */
3175  pq_sendbyte(&output_message, 'k');
3178  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3179 
3180  /* ... and send it wrapped in CopyData */
3182 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
static StringInfoData output_message
Definition: walsender.c:153
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
static XLogRecPtr sentPtr
Definition: walsender.c:150
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog
Definition: elog.h:219
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
static void WalSndKeepaliveIfNecessary ( TimestampTz  now)
static

Definition at line 3188 of file walsender.c.

References last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3189 {
3190  TimestampTz ping_time;
3191 
3192  /*
3193  * Don't send keepalive messages if timeouts are globally disabled or
3194  * we're doing something not partaking in timeouts.
3195  */
3196  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3197  return;
3198 
3200  return;
3201 
3202  /*
3203  * If half of wal_sender_timeout has lapsed without receiving any reply
3204  * from the standby, send a keep-alive message to the standby requesting
3205  * an immediate reply.
3206  */
3208  wal_sender_timeout / 2);
3209  if (now >= ping_time)
3210  {
3211  WalSndKeepalive(true);
3213 
3214  /* Try to flush pending output to the client */
3215  if (pq_flush_if_writable() != 0)
3216  WalSndShutdown();
3217  }
3218 }
int wal_sender_timeout
Definition: walsender.c:115
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3169
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:223
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:164
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2204 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, NULL, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2205 {
2206  WalSnd *walsnd = MyWalSnd;
2207 
2208  Assert(walsnd != NULL);
2209 
2210  MyWalSnd = NULL;
2211 
2212  SpinLockAcquire(&walsnd->mutex);
2213  /* clear latch while holding the spinlock, so it can safely be read */
2214  walsnd->latch = NULL;
2215  /* Mark WalSnd struct as no longer being in use. */
2216  walsnd->pid = 0;
2217  SpinLockRelease(&walsnd->mutex);
2218 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:105
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2841 of file walsender.c.

References MyLatch, MyProcPid, replication_active, SetLatch(), and walsender_ready_to_stop.

Referenced by WalSndSignals().

2842 {
2843  int save_errno = errno;
2844 
2845  /*
2846  * If replication has not yet started, die like with SIGTERM. If
2847  * replication is active, only set a flag and wake up the main loop. It
2848  * will send any outstanding WAL, wait for it to be replicated to the
2849  * standby, and then exit gracefully.
2850  */
2851  if (!replication_active)
2852  kill(MyProcPid, SIGTERM);
2853 
2854  walsender_ready_to_stop = true;
2855  SetLatch(MyLatch);
2856 
2857  errno = save_errno;
2858 }
int MyProcPid
Definition: globals.c:38
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
struct Latch * MyLatch
Definition: globals.c:51
static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2007 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGHUP, last_reply_timestamp, MyLatch, MyProcPort, now(), NULL, PGC_SIGHUP, pgstat_report_activity(), PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, STATE_RUNNING, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), walsender_ready_to_stop, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

2008 {
2009  /*
2010  * Initialize the last reply timestamp. That enables timeout processing
2011  * from hereon.
2012  */
2014  waiting_for_ping_response = false;
2015 
2016  /* Report to pgstat that this process is running */
2018 
2019  /*
2020  * Loop until we reach the end of this timeline or the client requests to
2021  * stop streaming.
2022  */
2023  for (;;)
2024  {
2025  TimestampTz now;
2026 
2027  /*
2028  * Emergency bailout if postmaster has died. This is to avoid the
2029  * necessity for manual cleanup of all postmaster children.
2030  */
2031  if (!PostmasterIsAlive())
2032  exit(1);
2033 
2034  /* Clear any already-pending wakeups */
2036 
2038 
2039  /* Process any requests or signals received recently */
2040  if (got_SIGHUP)
2041  {
2042  got_SIGHUP = false;
2045  }
2046 
2047  /* Check for input from the client */
2049 
2050  /*
2051  * If we have received CopyDone from the client, sent CopyDone
2052  * ourselves, and the output buffer is empty, it's time to exit
2053  * streaming.
2054  */
2056  break;
2057 
2058  /*
2059  * If we don't have any pending data in the output buffer, try to send
2060  * some more. If there is some, we don't bother to call send_data
2061  * again until we've flushed it ... but we'd better assume we are not
2062  * caught up.
2063  */
2064  if (!pq_is_send_pending())
2065  send_data();
2066  else
2067  WalSndCaughtUp = false;
2068 
2069  /* Try to flush pending output to the client */
2070  if (pq_flush_if_writable() != 0)
2071  WalSndShutdown();
2072 
2073  /* If nothing remains to be sent right now ... */
2075  {
2076  /*
2077  * If we're in catchup state, move to streaming. This is an
2078  * important state change for users to know about, since before
2079  * this point data loss might occur if the primary dies and we
2080  * need to failover to the standby. The state change is also
2081  * important for synchronous replication, since commits that
2082  * started to wait at that point might wait for some time.
2083  */
2085  {
2086  ereport(DEBUG1,
2087  (errmsg("standby \"%s\" has now caught up with primary",
2088  application_name)));
2090  }
2091 
2092  /*
2093  * When SIGUSR2 arrives, we send any outstanding logs up to the
2094  * shutdown checkpoint record (i.e., the latest record), wait for
2095  * them to be replicated to the standby, and exit. This may be a
2096  * normal termination at shutdown, or a promotion, the walsender
2097  * is not sure which.
2098  */
2100  WalSndDone(send_data);
2101  }
2102 
2103  now = GetCurrentTimestamp();
2104 
2105  /* Check for replication timeout. */
2106  WalSndCheckTimeOut(now);
2107 
2108  /* Send keepalive if the time has come */
2110 
2111  /*
2112  * We don't block if not caught up, unless there is unsent data
2113  * pending in which case we'd better block until the socket is
2114  * write-ready. This test is only needed for the case where the
2115  * send_data callback handled a subset of the available data but then
2116  * pq_flush_if_writable flushed it all --- we should immediately try
2117  * to send more.
2118  */
2120  {
2121  long sleeptime;
2122  int wakeEvents;
2123 
2124  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2126 
2127  sleeptime = WalSndComputeSleeptime(now);
2128 
2129  if (pq_is_send_pending())
2130  wakeEvents |= WL_SOCKET_WRITEABLE;
2131 
2132  /* Sleep until something happens or we time out */
2133  WaitLatchOrSocket(MyLatch, wakeEvents,
2134  MyProcPort->sock, sleeptime,
2136  }
2137  }
2138  return;
2139 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2958
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2727
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
pgsocket sock
Definition: libpq-be.h:118
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:179
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:172
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:176
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:223
WalSnd * MyWalSnd
Definition: walsender.c:105
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3188
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1938
#define NULL
Definition: c.h:229
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1980
void WalSndSetState(WalSndState state)
Definition: walsender.c:2954
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
static bool streamingDoneReceiving
Definition: walsender.c:173
char * application_name
Definition: guc.c:473
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static bool waiting_for_ping_response
Definition: walsender.c:164
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1505
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1118 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1119 {
1120  /* can't have sync rep confused by sending the same LSN several times */
1121  if (!last_write)
1122  lsn = InvalidXLogRecPtr;
1123 
1124  resetStringInfo(ctx->out);
1125 
1126  pq_sendbyte(ctx->out, 'w');
1127  pq_sendint64(ctx->out, lsn); /* dataStart */
1128  pq_sendint64(ctx->out, lsn); /* walEnd */
1129 
1130  /*
1131  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1132  * reserve space here.
1133  */
1134  pq_sendint64(ctx->out, 0); /* sendtime */
1135 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
StringInfo out
Definition: logical.h:59
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
void WalSndRqstFileReload ( void  )

Definition at line 2798 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2799 {
2800  int i;
2801 
2802  for (i = 0; i < max_wal_senders; i++)
2803  {
2804  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2805 
2806  if (walsnd->pid == 0)
2807  continue;
2808 
2809  SpinLockAcquire(&walsnd->mutex);
2810  walsnd->needreload = true;
2811  SpinLockRelease(&walsnd->mutex);
2812  }
2813 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:114
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i
void WalSndSetState ( WalSndState  state)

Definition at line 2954 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), and WalSndLoop().

2955 {
2956  WalSnd *walsnd = MyWalSnd;
2957 
2959 
2960  if (walsnd->state == state)
2961  return;
2962 
2963  SpinLockAcquire(&walsnd->mutex);
2964  walsnd->state = state;
2965  SpinLockRelease(&walsnd->mutex);
2966 }
slock_t mutex
bool am_walsender
Definition: walsender.c:108
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:105
#define Assert(condition)
Definition: c.h:675
Definition: regguts.h:298
void WalSndShmemInit ( void  )

Definition at line 2898 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

2899 {
2900  bool found;
2901  int i;
2902 
2903  WalSndCtl = (WalSndCtlData *)
2904  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2905 
2906  if (!found)
2907  {
2908  /* First time through, so initialize */
2910 
2911  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2913 
2914  for (i = 0; i < max_wal_senders; i++)
2915  {
2916  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2917 
2918  SpinLockInit(&walsnd->mutex);
2919  }
2920  }
2921 }
Size WalSndShmemSize(void)
Definition: walsender.c:2886
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
#define MemSet(start, val, len)
Definition: c.h:857
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:114
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
Size WalSndShmemSize ( void  )

Definition at line 2886 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2887 {
2888  Size size = 0;
2889 
2890  size = offsetof(WalSndCtlData, walsnds);
2891  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2892 
2893  return size;
2894 }
int max_wal_senders
Definition: walsender.c:114
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:356
#define offsetof(type, field)
Definition: c.h:555
static void WalSndShutdown ( void  )
static

Definition at line 223 of file walsender.c.

References am_cascading_walsender, CurrentResourceOwner, InitWalSenderSlot(), LagTracker, MarkPostmasterChildWalSender(), NULL, PMSIGNAL_ADVANCE_STATE_MACHINE, RecoveryInProgress(), ResourceOwnerCreate(), and SendPostmasterSignal().

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

253 {
255 
256  /* Create a per-walsender data structure in shared memory */
258 
259  /* Set up resource owner */
260  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
261 
262  /*
263  * Let postmaster know that we're a WAL sender. Once we've declared us as
264  * a WAL sender process, postmaster will let us outlive the bgwriter and
265  * kill us last in the shutdown sequence, so we get a chance to stream all
266  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
267  * there's no going back, and we mustn't write any WAL records after this.
268  */
271 
272  /* Initialize empty timestamp buffer for lag tracking. */
273  memset(&LagTracker, 0, sizeof(LagTracker));
274 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2143
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
bool RecoveryInProgress(void)
Definition: xlog.c:7874
#define NULL
Definition: c.h:229
static struct @27 LagTracker
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
bool am_cascading_walsender
Definition: walsender.c:109
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
static void WalSndSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 2817 of file walsender.c.

References got_SIGHUP, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2818 {
2819  int save_errno = errno;
2820 
2821  got_SIGHUP = true;
2822 
2823  SetLatch(MyLatch);
2824 
2825  errno = save_errno;
2826 }
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:179
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
struct Latch * MyLatch
Definition: globals.c:51
void WalSndSignals ( void  )

Definition at line 2862 of file walsender.c.

References die(), InitializeTimeouts(), pqsignal(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalSndLastCycleHandler(), WalSndSigHupHandler(), and WalSndXLogSendHandler().

Referenced by PostgresMain().

2863 {
2864  /* Set up signal handlers */
2865  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2866  * file */
2867  pqsignal(SIGINT, SIG_IGN); /* not used */
2868  pqsignal(SIGTERM, die); /* request shutdown */
2869  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2870  InitializeTimeouts(); /* establishes SIGALRM handler */
2872  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2873  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2874  * shutdown */
2875 
2876  /* Reset some signals that are accepted by postmaster but not here */
2882 }
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2830
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGUSR1
Definition: win32.h:202
#define SIGCONT
Definition: win32.h:197
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2841
#define SIGWINCH
Definition: win32.h:201
#define SIGTTIN
Definition: win32.h:199
#define SIGQUIT
Definition: win32.h:189
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2817
#define SIG_IGN
Definition: win32.h:185
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
#define SIG_DFL
Definition: win32.h:183
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIGTTOU
Definition: win32.h:200
void die(SIGNAL_ARGS)
Definition: postgres.c:2623
#define SIGCHLD
Definition: win32.h:198
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2562
#define SIGUSR2
Definition: win32.h:203
static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1233 of file walsender.c.

References CHECK_FOR_INTERRUPTS, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGHUP, InvalidXLogRecPtr, MyLatch, MyProcPort, now(), NULL, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), walsender_ready_to_stop, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, and WalSnd::write.

Referenced by logical_read_xlog_page().

1234 {
1235  int wakeEvents;
1236  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1237 
1238 
1239  /*
1240  * Fast path to avoid acquiring the spinlock in case we already know we
1241  * have enough WAL available. This is particularly interesting if we're
1242  * far behind.
1243  */
1244  if (RecentFlushPtr != InvalidXLogRecPtr &&
1245  loc <= RecentFlushPtr)
1246  return RecentFlushPtr;
1247 
1248  /* Get a more recent flush pointer. */
1249  if (!RecoveryInProgress())
1250  RecentFlushPtr = GetFlushRecPtr();
1251  else
1252  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1253 
1254  for (;;)
1255  {
1256  long sleeptime;
1257  TimestampTz now;
1258 
1259  /*
1260  * Emergency bailout if postmaster has died. This is to avoid the
1261  * necessity for manual cleanup of all postmaster children.
1262  */
1263  if (!PostmasterIsAlive())
1264  exit(1);
1265 
1266  /* Clear any already-pending wakeups */
1268 
1270 
1271  /* Process any requests or signals received recently */
1272  if (got_SIGHUP)
1273  {
1274  got_SIGHUP = false;
1277  }
1278 
1279  /* Check for input from the client */
1281 
1282  /* Update our idea of the currently flushed position. */
1283  if (!RecoveryInProgress())
1284  RecentFlushPtr = GetFlushRecPtr();
1285  else
1286  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1287 
1288  /*
1289  * If postmaster asked us to stop, don't wait here anymore. This will
1290  * cause the xlogreader to return without reading a full record, which
1291  * is the fastest way to reach the mainloop which then can quit.
1292  *
1293  * It's important to do this check after the recomputation of
1294  * RecentFlushPtr, so we can send all remaining data before shutting
1295  * down.
1296  */
1298  break;
1299 
1300  /*
1301  * We only send regular messages to the client for full decoded
1302  * transactions, but a synchronous replication and walsender shutdown
1303  * possibly are waiting for a later location. So we send pings
1304  * containing the flush location every now and then.
1305  */
1306  if (MyWalSnd->flush < sentPtr &&
1307  MyWalSnd->write < sentPtr &&
1309  {
1310  WalSndKeepalive(false);
1312  }
1313 
1314  /* check whether we're done */
1315  if (loc <= RecentFlushPtr)
1316  break;
1317 
1318  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1319  WalSndCaughtUp = true;
1320 
1321  /*
1322  * Try to flush pending output to the client. Also wait for the socket
1323  * becoming writable, if there's still pending output after an attempt
1324  * to flush. Otherwise we might just sit on output data while waiting
1325  * for new WAL being generated.
1326  */
1327  if (pq_flush_if_writable() != 0)
1328  WalSndShutdown();
1329 
1330  now = GetCurrentTimestamp();
1331 
1332  /* die if timeout was reached */
1333  WalSndCheckTimeOut(now);
1334 
1335  /* Send keepalive if the time has come */
1337 
1338  sleeptime = WalSndComputeSleeptime(now);
1339 
1340  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1342 
1343  if (pq_is_send_pending())
1344  wakeEvents |= WL_SOCKET_WRITEABLE;
1345 
1346  /* Sleep until something happens or we time out */
1347  WaitLatchOrSocket(MyLatch, wakeEvents,
1348  MyProcPort->sock, sleeptime,
1350  }
1351 
1352  /* reactivate latch so WalSndLoop knows to continue */
1353  SetLatch(MyLatch);
1354  return RecentFlushPtr;
1355 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8223
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
bool RecoveryInProgress(void)
Definition: xlog.c:7874
pgsocket sock
Definition: libpq-be.h:118
XLogRecPtr flush
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:179
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3169
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11077
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void SyncRepInitConfig(void)
Definition: syncrep.c:381
static bool WalSndCaughtUp
Definition: walsender.c:176
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:223
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3188
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1938
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1980
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static bool waiting_for_ping_response
Definition: walsender.c:164
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1505
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void WalSndWakeup ( void  )

Definition at line 2930 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, NULL, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

2931 {
2932  int i;
2933 
2934  for (i = 0; i < max_wal_senders; i++)
2935  {
2936  Latch *latch;
2937  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2938 
2939  /*
2940  * Get latch pointer with spinlock held, for the unlikely case that
2941  * pointer reads aren't atomic (as they're 8 bytes).
2942  */
2943  SpinLockAcquire(&walsnd->mutex);
2944  latch = walsnd->latch;
2945  SpinLockRelease(&walsnd->mutex);
2946 
2947  if (latch != NULL)
2948  SetLatch(latch);
2949  }
2950 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:114
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
#define NULL
Definition: c.h:229
int i
static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1145 of file walsender.c.

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), got_SIGHUP, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1147 {
1148  /* output previously gathered data in a CopyData packet */
1149  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1150 
1151  /*
1152  * Fill the send timestamp last, so that it is taken as late as possible.
1153  * This is somewhat ugly, but the protocol is set as it's already used for
1154  * several releases by streaming physical replication.
1155  */
1158  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1159  tmpbuf.data, sizeof(int64));
1160 
1161  /* fast path */
1162  /* Try to flush pending output to the client */
1163  if (pq_flush_if_writable() != 0)
1164  WalSndShutdown();
1165 
1166  if (!pq_is_send_pending())
1167  return;
1168 
1169  for (;;)
1170  {
1171  int wakeEvents;
1172  long sleeptime;
1173  TimestampTz now;
1174 
1175  /*
1176  * Emergency bailout if postmaster has died. This is to avoid the
1177  * necessity for manual cleanup of all postmaster children.
1178  */
1179  if (!PostmasterIsAlive())
1180  exit(1);
1181 
1182  /* Clear any already-pending wakeups */
1184 
1186 
1187  /* Process any requests or signals received recently */
1188  if (got_SIGHUP)
1189  {
1190  got_SIGHUP = false;
1193  }
1194 
1195  /* Check for input from the client */
1197 
1198  /* Try to flush pending output to the client */
1199  if (pq_flush_if_writable() != 0)
1200  WalSndShutdown();
1201 
1202  /* If we finished clearing the buffered data, we're done here. */
1203  if (!pq_is_send_pending())
1204  break;
1205 
1206  now = GetCurrentTimestamp();
1207 
1208  /* die if timeout was reached */
1209  WalSndCheckTimeOut(now);
1210 
1211  /* Send keepalive if the time has come */
1213 
1214  sleeptime = WalSndComputeSleeptime(now);
1215 
1216  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1218 
1219  /* Sleep until something happens or we time out */
1220  WaitLatchOrSocket(MyLatch, wakeEvents,
1221  MyProcPort->sock, sleeptime,
1223  }
1224 
1225  /* reactivate latch so WalSndLoop knows to continue */
1226  SetLatch(MyLatch);
1227 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
pgsocket sock
Definition: libpq-be.h:118
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:179
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
void SyncRepInitConfig(void)
Definition: syncrep.c:381
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:223
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3188
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1938
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1980
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
static StringInfoData tmpbuf
Definition: walsender.c:155
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:59
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1505
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void WalSndXLogSendHandler ( SIGNAL_ARGS  )
static

Definition at line 2830 of file walsender.c.

References latch_sigusr1_handler().

Referenced by WalSndSignals().

2831 {
2832  int save_errno = errno;
2833 
2835 
2836  errno = save_errno;
2837 }
void latch_sigusr1_handler(void)
Definition: latch.c:1572
static void XLogRead ( char *  buf,
XLogRecPtr  startptr,
Size  count 
)
static

Definition at line 2232 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), buf, CheckXLogRemoved(), close, curFileTimeLine, ereport, errcode_for_file_access(), errmsg(), ERROR, MAXPGPATH, WalSnd::mutex, MyWalSnd, WalSnd::needreload, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sendFile, sendOff, sendSegNo, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, WAIT_EVENT_WAL_READ, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegSize.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2233 {
2234  char *p;
2235  XLogRecPtr recptr;
2236  Size nbytes;
2237  XLogSegNo segno;
2238 
2239 retry:
2240  p = buf;
2241  recptr = startptr;
2242  nbytes = count;
2243 
2244  while (nbytes > 0)
2245  {
2246  uint32 startoff;
2247  int segbytes;
2248  int readbytes;
2249 
2250  startoff = recptr % XLogSegSize;
2251 
2252  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2253  {
2254  char path[MAXPGPATH];
2255 
2256  /* Switch to another logfile segment */
2257  if (sendFile >= 0)
2258  close(sendFile);
2259 
2260  XLByteToSeg(recptr, sendSegNo);
2261 
2262  /*-------
2263  * When reading from a historic timeline, and there is a timeline
2264  * switch within this segment, read from the WAL segment belonging
2265  * to the new timeline.
2266  *
2267  * For example, imagine that this server is currently on timeline
2268  * 5, and we're streaming timeline 4. The switch from timeline 4
2269  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2270  *
2271  * ...
2272  * 000000040000000000000012
2273  * 000000040000000000000013
2274  * 000000050000000000000013
2275  * 000000050000000000000014
2276  * ...
2277  *
2278  * In this situation, when requested to send the WAL from
2279  * segment 0x13, on timeline 4, we read the WAL from file
2280  * 000000050000000000000013. Archive recovery prefers files from
2281  * newer timelines, so if the segment was restored from the
2282  * archive on this server, the file belonging to the old timeline,
2283  * 000000040000000000000013, might not exist. Their contents are
2284  * equal up to the switchpoint, because at a timeline switch, the
2285  * used portion of the old segment is copied to the new file.
2286  *-------
2287  */
2290  {
2291  XLogSegNo endSegNo;
2292 
2294  if (sendSegNo == endSegNo)
2296  }
2297 
2299 
2300  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2301  if (sendFile < 0)
2302  {
2303  /*
2304  * If the file is not found, assume it's because the standby
2305  * asked for a too old WAL segment that has already been
2306  * removed or recycled.
2307  */
2308  if (errno == ENOENT)
2309  ereport(ERROR,
2311  errmsg("requested WAL segment %s has already been removed",
2313  else
2314  ereport(ERROR,
2316  errmsg("could not open file \"%s\": %m",
2317  path)));
2318  }
2319  sendOff = 0;
2320  }
2321 
2322  /* Need to seek in the file? */
2323  if (sendOff != startoff)
2324  {
2325  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2326  ereport(ERROR,
2328  errmsg("could not seek in log segment %s to offset %u: %m",
2330  startoff)));
2331  sendOff = startoff;
2332  }
2333 
2334  /* How many bytes are within this segment? */
2335  if (nbytes > (XLogSegSize - startoff))
2336  segbytes = XLogSegSize - startoff;
2337  else
2338  segbytes = nbytes;
2339 
2341  readbytes = read(sendFile, p, segbytes);
2343  if (readbytes <= 0)
2344  {
2345  ereport(ERROR,
2347  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2349  sendOff, (unsigned long) segbytes)));
2350  }
2351 
2352  /* Update state for read */
2353  recptr += readbytes;
2354 
2355  sendOff += readbytes;
2356  nbytes -= readbytes;
2357  p += readbytes;
2358  }
2359 
2360  /*
2361  * After reading into the buffer, check that what we read was valid. We do
2362  * this after reading, because even though the segment was present when we
2363  * opened it, it might get recycled or removed while we read it. The
2364  * read() succeeds in that case, but the data we tried to read might
2365  * already have been overwritten with new WAL records.
2366  */
2367  XLByteToSeg(startptr, segno);
2369 
2370  /*
2371  * During recovery, the currently-open WAL file might be replaced with the
2372  * file of the same name retrieved from archive. So we always need to
2373  * check what we read was valid after reading into the buffer. If it's
2374  * invalid, we try to open and read the file again.
2375  */
2377  {
2378  WalSnd *walsnd = MyWalSnd;
2379  bool reload;
2380 
2381  SpinLockAcquire(&walsnd->mutex);
2382  reload = walsnd->needreload;
2383  walsnd->needreload = false;
2384  SpinLockRelease(&walsnd->mutex);
2385 
2386  if (reload && sendFile >= 0)
2387  {
2388  close(sendFile);
2389  sendFile = -1;
2390 
2391  goto retry;
2392  }
2393  }
2394 }
#define XLogSegSize
Definition: xlog_internal.h:92
static int sendFile
Definition: walsender.c:128
#define PG_BINARY
Definition: c.h:1038
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3766
#define MAXPGPATH
static TimeLineID curFileTimeLine
Definition: walsender.c:133
#define XLogFilePath(path, tli, logSegNo)
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10120
static char * buf
Definition: pg_test_fsync.c:66
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
#define ereport(elevel, rest)
Definition: elog.h:122
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:105
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define XLByteToSeg(xlrp, logSegNo)
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool needreload
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
static XLogSegNo sendSegNo
Definition: walsender.c:129
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 sendOff
Definition: walsender.c:130
#define close(a)
Definition: win32.h:12
#define XLByteInSeg(xlrp, logSegNo)
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
bool am_cascading_walsender
Definition: walsender.c:109
int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:936
static void XLogSendLogical ( void  )
static

Definition at line 2666 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, NULL, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2667 {
2668  XLogRecord *record;
2669  char *errm;
2670 
2671  /*
2672  * Don't know whether we've caught up yet. We'll set it to true in
2673  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2674  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2675  * i.e. when we're shutting down.
2676  */
2677  WalSndCaughtUp = false;
2678 
2681 
2682  /* xlog record was invalid */
2683  if (errm != NULL)
2684  elog(ERROR, "%s", errm);
2685 
2686  if (record != NULL)
2687  {
2688  /*
2689  * Note the lack of any call to LagTrackerWrite() which is the responsibility
2690  * of the logical decoding plugin. Response messages are handled normally,
2691  * so this responsibility does not extend to needing to call LagTrackerRead().
2692  */
2694 
2696  }
2697  else
2698  {
2699  /*
2700  * If the record we just wanted read is at or beyond the flushed
2701  * point, then we're caught up.
2702  */
2704  WalSndCaughtUp = true;
2705  }
2706 
2707  /* Update shared memory status */
2708  {
2709  WalSnd *walsnd = MyWalSnd;
2710 
2711  SpinLockAcquire(&walsnd->mutex);
2712  walsnd->sentPtr = sentPtr;
2713  SpinLockRelease(&walsnd->mutex);
2714  }
2715 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8223
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:190
static XLogRecPtr logical_startptr
Definition: walsender.c:191
static bool WalSndCaughtUp
Definition: walsender.c:176
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
#define NULL
Definition: c.h:229
XLogReaderState * reader
Definition: logical.h:38
#define elog
Definition: elog.h:219
static void XLogSendPhysical ( void  )
static

Definition at line 2407 of file walsender.c.

References am_cascading_walsender, Assert, close, StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), LagTrackerWrite(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, NULL, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendFile, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, WalSndCaughtUp, and XLogRead().

Referenced by StartReplication().

2408 {
2409  XLogRecPtr SendRqstPtr;
2410  XLogRecPtr startptr;
2411  XLogRecPtr endptr;
2412  Size nbytes;
2413 
2415  {
2416  WalSndCaughtUp = true;
2417  return;
2418  }
2419 
2420  /* Figure out how far we can safely send the WAL. */
2422  {
2423  /*
2424  * Streaming an old timeline that's in this server's history, but is
2425  * not the one we're currently inserting or replaying. It can be
2426  * streamed up to the point where we switched off that timeline.
2427  */
2428  SendRqstPtr = sendTimeLineValidUpto;
2429  }
2430  else if (am_cascading_walsender)
2431  {
2432  /*
2433  * Streaming the latest timeline on a standby.
2434  *
2435  * Attempt to send all WAL that has already been replayed, so that we
2436  * know it's valid. If we're receiving WAL through streaming
2437  * replication, it's also OK to send any WAL that has been received
2438  * but not replayed.
2439  *
2440  * The timeline we're recovering from can change, or we can be
2441  * promoted. In either case, the current timeline becomes historic. We
2442  * need to detect that so that we don't try to stream past the point
2443  * where we switched to another timeline. We check for promotion or
2444  * timeline switch after calculating FlushPtr, to avoid a race
2445  * condition: if the timeline becomes historic just after we checked
2446  * that it was still current, it's still be OK to stream it up to the
2447  * FlushPtr that was calculated before it became historic.
2448  */
2449  bool becameHistoric = false;
2450 
2451  SendRqstPtr = GetStandbyFlushRecPtr();
2452 
2453  if (!RecoveryInProgress())
2454  {
2455  /*
2456  * We have been promoted. RecoveryInProgress() updated
2457  * ThisTimeLineID to the new current timeline.
2458  */
2459  am_cascading_walsender = false;
2460  becameHistoric = true;
2461  }
2462  else
2463  {
2464  /*
2465  * Still a cascading standby. But is the timeline we're sending
2466  * still the one recovery is recovering from? ThisTimeLineID was
2467  * updated by the GetStandbyFlushRecPtr() call above.
2468  */
2470  becameHistoric = true;
2471  }
2472 
2473  if (becameHistoric)
2474  {
2475  /*
2476  * The timeline we were sending has become historic. Read the
2477  * timeline history file of the new timeline to see where exactly
2478  * we forked off from the timeline we were sending.
2479  */
2480  List *history;
2481 
2484 
2486  list_free_deep(history);
2487 
2488  sendTimeLineIsHistoric = true;
2489 
2490  SendRqstPtr = sendTimeLineValidUpto;
2491  }
2492  }
2493  else
2494  {
2495  /*
2496  * Streaming the current timeline on a master.
2497  *
2498  * Attempt to send all data that's already been written out and
2499  * fsync'd to disk. We cannot go further than what's been written out
2500  * given the current implementation of XLogRead(). And in any case
2501  * it's unsafe to send WAL that is not securely down to disk on the
2502  * master: if the master subsequently crashes and restarts, slaves
2503  * must not have applied any WAL that got lost on the master.
2504  */
2505  SendRqstPtr = GetFlushRecPtr();
2506  }
2507 
2508  /*
2509  * Record the current system time as an approximation of the time at which
2510  * this WAL position was written for the purposes of lag tracking.
2511  *
2512  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2513  * is flushed and we could get that time as well as the LSN when we call
2514  * GetFlushRecPtr() above (and likewise for the cascading standby
2515  * equivalent), but rather than putting any new code into the hot WAL path
2516  * it seems good enough to capture the time here. We should reach this
2517  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2518  * may take some time, we read the WAL flush pointer and take the time
2519  * very close to together here so that we'll get a later position if it
2520  * is still moving.
2521  *
2522  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2523  * this gives us a cheap approximation for the WAL flush time for this
2524  * LSN.
2525  *
2526  * Note that the LSN is not necessarily the LSN for the data contained in
2527  * the present message; it's the end of the WAL, which might be
2528  * further ahead. All the lag tracking machinery cares about is finding
2529  * out when that arbitrary LSN is eventually reported as written, flushed
2530  * and applied, so that it can measure the elapsed time.
2531  */
2532  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2533 
2534  /*
2535  * If this is a historic timeline and we've reached the point where we
2536  * forked to the next timeline, stop streaming.
2537  *
2538  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2539  * startup process will normally replay all WAL that has been received
2540  * from the master, before promoting, but if the WAL streaming is
2541  * terminated at a WAL page boundary, the valid portion of the timeline
2542  * might end in the middle of a WAL record. We might've already sent the
2543  * first half of that partial WAL record to the cascading standby, so that
2544  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2545  * replay the partial WAL record either, so it can still follow our
2546  * timeline switch.
2547  */
2549  {
2550  /* close the current file. */
2551  if (sendFile >= 0)
2552  close(sendFile);
2553  sendFile = -1;
2554 
2555  /* Send CopyDone */
2556  pq_putmessage_noblock('c', NULL, 0);
2557  streamingDoneSending = true;
2558 
2559  WalSndCaughtUp = true;
2560 
2561  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2563  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2564  return;
2565  }
2566 
2567  /* Do we have any work to do? */
2568  Assert(sentPtr <= SendRqstPtr);
2569  if (SendRqstPtr <= sentPtr)
2570  {
2571  WalSndCaughtUp = true;
2572  return;
2573  }
2574 
2575  /*
2576  * Figure out how much to send in one message. If there's no more than
2577  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2578  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2579  *
2580  * The rounding is not only for performance reasons. Walreceiver relies on
2581  * the fact that we never split a WAL record across two messages. Since a
2582  * long WAL record is split at page boundary into continuation records,
2583  * page boundary is always a safe cut-off point. We also assume that
2584  * SendRqstPtr never points to the middle of a WAL record.
2585  */
2586  startptr = sentPtr;
2587  endptr = startptr;
2588  endptr += MAX_SEND_SIZE;
2589 
2590  /* if we went beyond SendRqstPtr, back off */
2591  if (SendRqstPtr <= endptr)
2592  {
2593  endptr = SendRqstPtr;
2595  WalSndCaughtUp = false;
2596  else
2597  WalSndCaughtUp = true;
2598  }
2599  else
2600  {
2601  /* round down to page boundary. */
2602  endptr -= (endptr % XLOG_BLCKSZ);
2603  WalSndCaughtUp = false;
2604  }
2605 
2606  nbytes = endptr - startptr;
2607  Assert(nbytes <= MAX_SEND_SIZE);
2608 
2609  /*
2610  * OK to read and send the slice.
2611  */
2613  pq_sendbyte(&output_message, 'w');
2614 
2615  pq_sendint64(&output_message, startptr); /* dataStart */
2616  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2617  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2618 
2619  /*
2620  * Read the log directly into the output buffer to avoid extra memcpy
2621  * calls.
2622  */
2624  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2625  output_message.len += nbytes;
2627 
2628  /*
2629  * Fill the send timestamp last, so that it is taken as late as possible.
2630  */
2633  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2634  tmpbuf.data, sizeof(int64));
2635 
2637 
2638  sentPtr = endptr;
2639 
2640  /* Update shared memory status */
2641  {
2642  WalSnd *walsnd = MyWalSnd;
2643 
2644  SpinLockAcquire(&walsnd->mutex);
2645  walsnd->sentPtr = sentPtr;
2646  SpinLockRelease(&walsnd->mutex);
2647  }
2648 
2649  /* Report progress of XLOG streaming in PS display */
2651  {
2652  char activitymsg[50];
2653 
2654  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2655  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2656  set_ps_display(activitymsg, false);
2657  }
2658 
2659  return;
2660 }
#define DEBUG1
Definition: elog.h:25
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
bool update_process_title
Definition: ps_status.c:35
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3228
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static int sendFile
Definition: walsender.c:128
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
static StringInfoData output_message
Definition: walsender.c:153
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8223
bool RecoveryInProgress(void)
Definition: xlog.c:7874
void list_free_deep(List *list)
Definition: list.c:1147
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
static bool streamingDoneSending
Definition: walsender.c:172
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:277
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
unsigned int uint32
Definition: c.h:268
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2232
static bool WalSndCaughtUp
Definition: walsender.c:176
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define NULL
Definition: c.h:229
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
static StringInfoData tmpbuf
Definition: walsender.c:155
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2768
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:12
Definition: pg_list.h:45
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define MAX_SEND_SIZE
Definition: walsender.c:99
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
bool am_cascading_walsender
Definition: walsender.c:109

Variable Documentation

bool am_db_walsender = false

Definition at line 111 of file walsender.c.

Referenced by check_db(), InitPostgres(), and ProcessStartupPacket().

Definition at line 207 of file walsender.c.

Referenced by _ShowOption(), allocNewBuffer(), bitgetpage(), blbulkdelete(), blgetbitmap(), blinsert(), BloomNewBuffer(), blvacuumcleanup(), BootStrapXLOG(), brin_xlog_desummarize_page(), brin_xlog_insert_update(), brin_xlog_samepage_update(), brin_xlog_update(), bt_metap(), bt_page_items(), bt_page_stats(), btree_xlog_delete(), btree_xlog_insert(), btree_xlog_mark_page_halfdead(), btree_xlog_newroot(), btree_xlog_split(), btree_xlog_unlink_page(), btree_xlog_vacuum(), collect_corrupt_items(), collect_visibility_data(), convertToJsonb(), copy_file(), copyFile(), CreateLockFile(), createPostingTree(), CreateSharedBackendStatus(), des_cipher(), DllRegisterServer(), ecpg_process_output(), EvalPlanQualFetch(), EvalPlanQualFetchRowMarks(), ExecCheckTIDVisible(), ExecLockRows(), ExecOnConflictUpdate(), ExecStoreTuple(), fill_hba_line(), flushCachedPage(), ForgetPrivateRefCountEntry(), GenericXLogRegisterBuffer(), GetConfigOption(), GetConfigOptionByNum(), GetConfigOptionResetString(), getObjectDescription(), getObjectIdentityParts(), getObjectTypeDescription(), GetPrivateRefCountEntry(), gets_fromFile(), GetTupleForTrigger(), ginbulkdelete(), ginFindParents(), ginHeapTupleFastInsert(), ginInsertCleanup(), GinNewBuffer(), ginRedoClearIncompleteSplit(), ginRedoCreatePTree(), ginRedoDeleteListPages(), ginRedoInsert(), ginRedoInsertListPage(), ginRedoUpdateMetapage(), ginRedoVacuumDataLeafPage(), ginRedoVacuumPage(), ginScanToDelete(), ginvacuumcleanup(), ginVacuumPostingTreeLeaves(), gistBufferingFindCorrectParent(), gistbuild(), gistbuildempty(), gistbulkdelete(), gistFindPath(), gistGetMaxLevel(), gistkillitems(), gistNewBuffer(), gistplacetopage(), gistProcessItup(), gistRedoClearFollowRight(), gistRedoCreateIndex(), gistRedoPageSplitRecord(), gistRedoPageUpdateRecord(), gistScanPage(), gistvacuumcleanup(), hash_xlog_insert(), hash_xlog_split_cleanup(), hash_xlog_vacuum_one_page(), heap_abort_speculative(), heap_delete(), heap_fetch(), heap_finish_speculative(), heap_get_latest_tid(), heap_hot_search(), heap_inplace_update(), heap_insert(), heap_multi_insert(), heap_update(), heap_xlog_clean(), heap_xlog_confirm(), heap_xlog_delete(), heap_xlog_freeze_page(), heap_xlog_inplace(), heap_xlog_insert(), heap_xlog_lock(), heap_xlog_lock_updated(), heap_xlog_multi_insert(), heap_xlog_visible(), heapgetpage(), helpSQL(), init(), initBloomState(), main(), NewPrivateRefCountEntry(), palloc_btree_page(), pg_timezone_abbrevs(), pg_vfprintf(), pg_visibility(), pgss_shmem_startup(), pgstat_get_crashed_backend_activity(), pgstat_heap(), pgstatginindex_internal(), pgstatindex_impl(), PGTYPESnumeric_from_double(), PLy_get_sqlerrcode(), PQunescapeBytea(), print_addr(), ReadBufferBI(), ReadControlFile(), readfile(), RecheckDataDirLockFile(), RelationAddExtraBlocks(), RelationGetBufferForTuple(), ReleaseAndReadBuffer(), ReorderBufferAllocate(), replace_variables(), report_fork_failure_to_client(), report_multiple_error_messages(), RestoreArchive(), rewind_parseTimeLineHistory(), RewriteControlFile(), rewriteVisibilityMap(), scanPostingTree(), seq_redo(), slurpFile(), smgrextend(), smgrread(), smgrwrite(), SpGistGetBuffer(), SpGistNewBuffer(), spgprocesspending(), spgRedoAddLeaf(), spgRedoAddNode(), spgRedoCreateIndex(), spgRedoMoveLeafs(), spgRedoSplitTuple(), spgRedoVacuumLeaf(), spgRedoVacuumRedirect(), spgRedoVacuumRoot(), spgvacuumpage(), spgWalk(), TidNext(), TouchSocketLockFiles(), updateControlFile(), uuid_recv(), uuid_send(), WriteControlFile(), WriteEmptyXLOG(), writeTimeLineHistory(), xlog_redo(), XLogFileCopy(), XLogReadBufferExtended(), and XLogReadRecord().

TimeLineID curFileTimeLine = 0
static

Definition at line 133 of file walsender.c.

Referenced by XLogRead().

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 179 of file walsender.c.

Referenced by WalSndLoop(), WalSndSigHupHandler(), WalSndWaitForWal(), and WalSn