PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define PG_STAT_GET_WAL_SENDERS_COLS   8
 

Typedefs

typedef void(* WalSndSendDataCallback )(void)
 

Functions

static void WalSndSigHupHandler (SIGNAL_ARGS)
 
static void WalSndXLogSendHandler (SIGNAL_ARGS)
 
static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
void exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (TimestampTz now)
 
static void XLogRead (char *buf, XLogRecPtr startptr, Size count)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static int sendFile = -1
 
static XLogSegNo sendSegNo = 0
 
static uint32 sendOff = 0
 
static TimeLineID curFileTimeLine = 0
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = 0
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t walsender_ready_to_stop = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static XLogRecPtr logical_startptr = InvalidXLogRecPtr
 

Macro Definition Documentation

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 97 of file walsender.c.

Referenced by XLogSendPhysical().

#define PG_STAT_GET_WAL_SENDERS_COLS   8

Referenced by pg_stat_get_wal_senders().

Typedef Documentation

typedef void(* WalSndSendDataCallback)(void)

Definition at line 197 of file walsender.c.

Function Documentation

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 739 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), FreeDecodingContext(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, NULL, CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), CreateReplicationSlotCmd::reserve_wal, RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf(), CreateReplicationSlotCmd::temporary, TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), values, WalSndPrepareWrite(), and WalSndWriteData().

Referenced by exec_replication_command(), and main().

740 {
741  const char *snapshot_name = NULL;
742  char xpos[MAXFNAMELEN];
743  char *slot_name;
744  DestReceiver *dest;
745  TupOutputState *tstate;
746  TupleDesc tupdesc;
747  Datum values[4];
748  bool nulls[4];
749 
751 
752  /* setup state for XLogReadPage */
753  sendTimeLineIsHistoric = false;
755 
756  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
757  {
758  ReplicationSlotCreate(cmd->slotname, false,
760  }
761  else
762  {
764 
765  /*
766  * Initially create persistent slot as ephemeral - that allows us to
767  * nicely handle errors during initialization because it'll get
768  * dropped if this transaction fails. We'll make it persistent at the
769  * end. Temporary slots can be created as temporary from beginning as
770  * they get dropped on error as well.
771  */
772  ReplicationSlotCreate(cmd->slotname, true,
774  }
775 
776  if (cmd->kind == REPLICATION_KIND_LOGICAL)
777  {
779 
783 
784  /*
785  * Signal that we don't need the timeout mechanism. We're just
786  * creating the replication slot and don't yet accept feedback
787  * messages or send keepalives. As we possibly need to wait for
788  * further WAL the walsender would otherwise possibly be killed too
789  * soon.
790  */
792 
793  /* build initial snapshot, might take a while */
795 
796  /*
797  * Export a plain (not of the snapbuild.c type) snapshot to the user
798  * that can be imported into another session.
799  */
800  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
801 
802  /* don't need the decoding context anymore */
803  FreeDecodingContext(ctx);
804 
805  if (!cmd->temporary)
807  }
808  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
809  {
811 
813 
814  /* Write this slot to disk if it's permanent one. */
815  if (!cmd->temporary)
817  }
818 
819  snprintf(xpos, sizeof(xpos), "%X/%X",
822 
824  MemSet(nulls, false, sizeof(nulls));
825 
826  /*----------
827  * Need a tuple descriptor representing four columns:
828  * - first field: the slot name
829  * - second field: LSN at which we became consistent
830  * - third field: exported snapshot's name
831  * - fourth field: output plugin
832  *----------
833  */
834  tupdesc = CreateTemplateTupleDesc(4, false);
835  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
836  TEXTOID, -1, 0);
837  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
838  TEXTOID, -1, 0);
839  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
840  TEXTOID, -1, 0);
841  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
842  TEXTOID, -1, 0);
843 
844  /* prepare for projection of tuples */
845  tstate = begin_tup_output_tupdesc(dest, tupdesc);
846 
847  /* slot_name */
848  slot_name = NameStr(MyReplicationSlot->data.name);
849  values[0] = CStringGetTextDatum(slot_name);
850 
851  /* consistent wal location */
852  values[1] = CStringGetTextDatum(xpos);
853 
854  /* snapshot name, or NULL if none */
855  if (snapshot_name != NULL)
856  values[2] = CStringGetTextDatum(snapshot_name);
857  else
858  nulls[2] = true;
859 
860  /* plugin, or NULL if none */
861  if (cmd->plugin != NULL)
862  values[3] = CStringGetTextDatum(cmd->plugin);
863  else
864  nulls[3] = true;
865 
866  /* send it to dest */
867  do_tup_output(tstate, values, nulls);
868  end_tup_output(tstate);
869 
871 }
#define NIL
Definition: pg_list.h:69
#define TEXTOID
Definition: pg_type.h:324
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:219
#define MemSet(start, val, len)
Definition: c.h:853
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ReplicationSlotSave(void)
Definition: slot.c:562
ReplicationSlotPersistentData data
Definition: slot.h:115
XLogRecPtr confirmed_flush
Definition: slot.h:76
ReplicationKind kind
Definition: replnodes.h:56
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:710
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:511
void ReplicationSlotReserveWal(void)
Definition: slot.c:824
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:414
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:563
void ReplicationSlotPersist(void)
Definition: slot.c:597
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1006
unsigned int uint32
Definition: c.h:265
void ReplicationSlotRelease(void)
Definition: slot.c:374
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:979
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:374
TimeLineID ThisTimeLineID
Definition: xlog.c:178
struct SnapBuild * snapshot_builder
Definition: logical.h:38
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
#define NULL
Definition: c.h:226
static TimeLineID sendTimeLine
Definition: walsender.c:139
#define Assert(condition)
Definition: c.h:671
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:457
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static Datum values[MAXATTR]
Definition: bootstrap.c:162
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:211
#define NameStr(name)
Definition: c.h:495
#define CStringGetTextDatum(s)
Definition: builtins.h:90
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:159
static bool sendTimeLineIsHistoric
Definition: walsender.c:141
void ReplicationSlotMarkDirty(void)
Definition: slot.c:580
static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 877 of file walsender.c.

References DestRemote, EndCommand(), ReplicationSlotDrop(), and DropReplicationSlotCmd::slotname.

Referenced by exec_replication_command(), and main().

878 {
880  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
881 }
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void ReplicationSlotDrop(const char *name)
Definition: slot.c:439
void exec_replication_command ( const char *  cmd_string)

Definition at line 1222 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), IdentifySystem(), initStringInfo(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), VariableShowStmt::name, REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, and Node::type.

Referenced by PostgresMain().

1223 {
1224  int parse_rc;
1225  Node *cmd_node;
1226  MemoryContext cmd_context;
1227  MemoryContext old_context;
1228 
1229  /*
1230  * Log replication command if log_replication_commands is enabled. Even
1231  * when it's disabled, log the command with DEBUG1 level for backward
1232  * compatibility.
1233  */
1235  (errmsg("received replication command: %s", cmd_string)));
1236 
1237  /*
1238  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1239  * command arrives. Clean up the old stuff if there's anything.
1240  */
1242 
1244 
1246  "Replication command context",
1248  old_context = MemoryContextSwitchTo(cmd_context);
1249 
1250  replication_scanner_init(cmd_string);
1251  parse_rc = replication_yyparse();
1252  if (parse_rc != 0)
1253  ereport(ERROR,
1254  (errcode(ERRCODE_SYNTAX_ERROR),
1255  (errmsg_internal("replication command parser returned %d",
1256  parse_rc))));
1257 
1258  cmd_node = replication_parse_result;
1259 
1260  /*
1261  * Allocate buffers that will be used for each outgoing and incoming
1262  * message. We do this just once per command to reduce palloc overhead.
1263  */
1267 
1268  switch (cmd_node->type)
1269  {
1270  case T_IdentifySystemCmd:
1271  IdentifySystem();
1272  break;
1273 
1274  case T_BaseBackupCmd:
1275  SendBaseBackup((BaseBackupCmd *) cmd_node);
1276  break;
1277 
1280  break;
1281 
1284  break;
1285 
1286  case T_StartReplicationCmd:
1287  {
1288  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1289 
1290  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1291  StartReplication(cmd);
1292  else
1294  break;
1295  }
1296 
1297  case T_TimeLineHistoryCmd:
1299  break;
1300 
1301  case T_VariableShowStmt:
1302  {
1304  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1305 
1306  GetPGVariable(n->name, dest);
1307  }
1308  break;
1309 
1310  default:
1311  elog(ERROR, "unrecognized replication command node tag: %u",
1312  cmd_node->type);
1313  }
1314 
1315  /* done */
1316  MemoryContextSwitchTo(old_context);
1317  MemoryContextDelete(cmd_context);
1318 
1319  /* Send CommandComplete message */
1320  EndCommand("SELECT", DestRemote);
1321 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:392
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:877
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:508
static StringInfoData output_message
Definition: walsender.c:151
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7848
Node * replication_parse_result
ReplicationKind kind
Definition: replnodes.h:81
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:682
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
NodeTag type
Definition: nodes.h:510
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:486
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
static StringInfoData reply_message
Definition: walsender.c:152
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:888
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:739
static StringInfoData tmpbuf
Definition: walsender.c:153
bool log_replication_commands
Definition: walsender.c:115
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
static void IdentifySystem(void)
Definition: walsender.c:303
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:629
void replication_scanner_init(const char *query_string)
static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2469 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), NULL, receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2470 {
2471  XLogRecPtr replayPtr;
2472  TimeLineID replayTLI;
2473  XLogRecPtr receivePtr;
2475  XLogRecPtr result;
2476 
2477  /*
2478  * We can safely send what's already been replayed. Also, if walreceiver
2479  * is streaming WAL from the same timeline, we can send anything that it
2480  * has streamed, but hasn't been replayed yet.
2481  */
2482 
2483  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2484  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2485 
2486  ThisTimeLineID = replayTLI;
2487 
2488  result = replayPtr;
2489  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2490  result = receivePtr;
2491 
2492  return result;
2493 }
uint32 TimeLineID
Definition: xlogdefs.h:45
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:10984
static TimeLineID receiveTLI
Definition: xlog.c:200
TimeLineID ThisTimeLineID
Definition: xlog.c:178
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void IdentifySystem ( void  )
static

Definition at line 303 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, INT4OID, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, NULL, RecoveryInProgress(), snprintf(), StartTransactionCommand(), TEXTOID, ThisTimeLineID, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

304 {
305  char sysid[32];
306  char xpos[MAXFNAMELEN];
307  XLogRecPtr logptr;
308  char *dbname = NULL;
309  DestReceiver *dest;
310  TupOutputState *tstate;
311  TupleDesc tupdesc;
312  Datum values[4];
313  bool nulls[4];
314 
315  /*
316  * Reply with a result set with one row, four columns. First col is system
317  * ID, second is timeline ID, third is current xlog location and the
318  * fourth contains the database name if we are connected to one.
319  */
320 
321  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
323 
326  {
327  /* this also updates ThisTimeLineID */
328  logptr = GetStandbyFlushRecPtr();
329  }
330  else
331  logptr = GetFlushRecPtr();
332 
333  snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
334 
335  if (MyDatabaseId != InvalidOid)
336  {
338 
339  /* syscache access needs a transaction env. */
341  /* make dbname live outside TX context */
345  /* CommitTransactionCommand switches to TopMemoryContext */
347  }
348 
350  MemSet(nulls, false, sizeof(nulls));
351 
352  /* need a tuple descriptor representing four columns */
353  tupdesc = CreateTemplateTupleDesc(4, false);
354  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
355  TEXTOID, -1, 0);
356  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
357  INT4OID, -1, 0);
358  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
359  TEXTOID, -1, 0);
360  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
361  TEXTOID, -1, 0);
362 
363  /* prepare for projection of tuples */
364  tstate = begin_tup_output_tupdesc(dest, tupdesc);
365 
366  /* column 1: system identifier */
367  values[0] = CStringGetTextDatum(sysid);
368 
369  /* column 2: timeline */
370  values[1] = Int32GetDatum(ThisTimeLineID);
371 
372  /* column 3: xlog position */
373  values[2] = CStringGetTextDatum(xpos);
374 
375  /* column 4: database name, or NULL if none */
376  if (dbname)
377  values[3] = CStringGetTextDatum(dbname);
378  else
379  nulls[3] = true;
380 
381  /* send it to dest */
382  do_tup_output(tstate, values, nulls);
383 
384  end_tup_output(tstate);
385 }
#define TEXTOID
Definition: pg_type.h:324
void CommitTransactionCommand(void)
Definition: xact.c:2745
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define INT4OID
Definition: pg_type.h:316
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:853
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8154
bool RecoveryInProgress(void)
Definition: xlog.c:7805
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:563
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2048
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
unsigned int uint32
Definition: c.h:265
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:374
Oid MyDatabaseId
Definition: globals.c:76
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:178
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2675
char * dbname
Definition: streamutil.c:38
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static Datum values[MAXATTR]
Definition: bootstrap.c:162
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4661
#define Int32GetDatum(X)
Definition: postgres.h:487
#define CStringGetTextDatum(s)
Definition: builtins.h:90
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2469
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:313
bool am_cascading_walsender
Definition: walsender.c:107
static void InitWalSenderSlot ( void  )
static

Definition at line 1880 of file walsender.c.

References WalSnd::apply, Assert, ereport, errcode(), errmsg(), FATAL, WalSnd::flush, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, NULL, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, and WalSnd::write.

Referenced by WalSndShutdown().

1881 {
1882  int i;
1883 
1884  /*
1885  * WalSndCtl should be set up already (we inherit this by fork() or
1886  * EXEC_BACKEND mechanism from the postmaster).
1887  */
1888  Assert(WalSndCtl != NULL);
1889  Assert(MyWalSnd == NULL);
1890 
1891  /*
1892  * Find a free walsender slot and reserve it. If this fails, we must be
1893  * out of WalSnd structures.
1894  */
1895  for (i = 0; i < max_wal_senders; i++)
1896  {
1897  WalSnd *walsnd = &WalSndCtl->walsnds[i];
1898 
1899  SpinLockAcquire(&walsnd->mutex);
1900 
1901  if (walsnd->pid != 0)
1902  {
1903  SpinLockRelease(&walsnd->mutex);
1904  continue;
1905  }
1906  else
1907  {
1908  /*
1909  * Found a free slot. Reserve it for us.
1910  */
1911  walsnd->pid = MyProcPid;
1912  walsnd->sentPtr = InvalidXLogRecPtr;
1913  walsnd->write = InvalidXLogRecPtr;
1914  walsnd->flush = InvalidXLogRecPtr;
1915  walsnd->apply = InvalidXLogRecPtr;
1916  walsnd->state = WALSNDSTATE_STARTUP;
1917  walsnd->latch = &MyProc->procLatch;
1918  SpinLockRelease(&walsnd->mutex);
1919  /* don't need the lock anymore */
1920  MyWalSnd = (WalSnd *) walsnd;
1921 
1922  break;
1923  }
1924  }
1925  if (MyWalSnd == NULL)
1926  ereport(FATAL,
1927  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
1928  errmsg("number of requested standby connections "
1929  "exceeds max_wal_senders (currently %d)",
1930  max_wal_senders)));
1931 
1932  /* Arrange to clean up at walsender exit */
1934 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:38
PGPROC * MyProc
Definition: proc.c:67
WalSndCtlData * WalSndCtl
Definition: walsender.c:100
int errcode(int sqlerrcode)
Definition: elog.c:575
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:1938
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
Latch procLatch
Definition: proc.h:93
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define FATAL
Definition: elog.h:52
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:112
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:103
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
XLogRecPtr apply
static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page,
TimeLineID pageTLI 
)
static

Definition at line 710 of file walsender.c.

References WalSndWaitForWal(), and XLogRead().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

712 {
713  XLogRecPtr flushptr;
714  int count;
715 
716  /* make sure we have enough WAL available */
717  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
718 
719  /* more than one block available */
720  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
721  count = XLOG_BLCKSZ;
722  /* not enough WAL synced, that can happen during shutdown */
723  else if (targetPagePtr + reqLen > flushptr)
724  return -1;
725  /* part of the page available */
726  else
727  count = flushptr - targetPagePtr;
728 
729  /* now actually read the data, we know it's there */
730  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
731 
732  return count;
733 }
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:1966
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1094
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 2696 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, get_call_result_type(), i, Int32GetDatum, IsA, list_member_int(), LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, NULL, PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, superuser(), SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncStandbys(), tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, and XLogRecPtrIsInvalid.

2697 {
2698 #define PG_STAT_GET_WAL_SENDERS_COLS 8
2699  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2700  TupleDesc tupdesc;
2701  Tuplestorestate *tupstore;
2702  MemoryContext per_query_ctx;
2703  MemoryContext oldcontext;
2704  List *sync_standbys;
2705  int i;
2706 
2707  /* check to see if caller supports us returning a tuplestore */
2708  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
2709  ereport(ERROR,
2710  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2711  errmsg("set-valued function called in context that cannot accept a set")));
2712  if (!(rsinfo->allowedModes & SFRM_Materialize))
2713  ereport(ERROR,
2714  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2715  errmsg("materialize mode required, but it is not " \
2716  "allowed in this context")));
2717 
2718  /* Build a tuple descriptor for our result type */
2719  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
2720  elog(ERROR, "return type must be a row type");
2721 
2722  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
2723  oldcontext = MemoryContextSwitchTo(per_query_ctx);
2724 
2725  tupstore = tuplestore_begin_heap(true, false, work_mem);
2726  rsinfo->returnMode = SFRM_Materialize;
2727  rsinfo->setResult = tupstore;
2728  rsinfo->setDesc = tupdesc;
2729 
2730  MemoryContextSwitchTo(oldcontext);
2731 
2732  /*
2733  * Get the currently active synchronous standbys.
2734  */
2735  LWLockAcquire(SyncRepLock, LW_SHARED);
2736  sync_standbys = SyncRepGetSyncStandbys(NULL);
2737  LWLockRelease(SyncRepLock);
2738 
2739  for (i = 0; i < max_wal_senders; i++)
2740  {
2741  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2743  XLogRecPtr write;
2744  XLogRecPtr flush;
2745  XLogRecPtr apply;
2746  int priority;
2749  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
2750 
2751  if (walsnd->pid == 0)
2752  continue;
2753 
2754  SpinLockAcquire(&walsnd->mutex);
2755  sentPtr = walsnd->sentPtr;
2756  state = walsnd->state;
2757  write = walsnd->write;
2758  flush = walsnd->flush;
2759  apply = walsnd->apply;
2760  priority = walsnd->sync_standby_priority;
2761  SpinLockRelease(&walsnd->mutex);
2762 
2763  memset(nulls, 0, sizeof(nulls));
2764  values[0] = Int32GetDatum(walsnd->pid);
2765 
2766  if (!superuser())
2767  {
2768  /*
2769  * Only superusers can see details. Other users only get the pid
2770  * value to know it's a walsender, but no details.
2771  */
2772  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
2773  }
2774  else
2775  {
2776  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
2777 
2778  if (XLogRecPtrIsInvalid(sentPtr))
2779  nulls[2] = true;
2780  values[2] = LSNGetDatum(sentPtr);
2781 
2782  if (XLogRecPtrIsInvalid(write))
2783  nulls[3] = true;
2784  values[3] = LSNGetDatum(write);
2785 
2786  if (XLogRecPtrIsInvalid(flush))
2787  nulls[4] = true;
2788  values[4] = LSNGetDatum(flush);
2789 
2790  if (XLogRecPtrIsInvalid(apply))
2791  nulls[5] = true;
2792  values[5] = LSNGetDatum(apply);
2793 
2794  /*
2795  * Treat a standby such as a pg_basebackup background process
2796  * which always returns an invalid flush location, as an
2797  * asynchronous standby.
2798  */
2799  priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
2800 
2801  values[6] = Int32GetDatum(priority);
2802 
2803  /*
2804  * More easily understood version of standby state. This is purely
2805  * informational.
2806  *
2807  * In quorum-based sync replication, the role of each standby
2808  * listed in synchronous_standby_names can be changing very
2809  * frequently. Any standbys considered as "sync" at one moment can
2810  * be switched to "potential" ones at the next moment. So, it's
2811  * basically useless to report "sync" or "potential" as their sync
2812  * states. We report just "quorum" for them.
2813  */
2814  if (priority == 0)
2815  values[7] = CStringGetTextDatum("async");
2816  else if (list_member_int(sync_standbys, i))
2818  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
2819  else
2820  values[7] = CStringGetTextDatum("potential");
2821  }
2822 
2823  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
2824  }
2825 
2826  /* clean up and return the tuplestore */
2827  tuplestore_donestoring(tupstore);
2828 
2829  return (Datum) 0;
2830 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:735
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:559
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
#define write(a, b, c)
Definition: win32.h:19
uint8 syncrep_method
Definition: syncrep.h:51
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:100
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:853
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:672
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:93
slock_t mutex
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1714
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
bool list_member_int(const List *list, int datum)
Definition: list.c:485
WalSndState state
#define ereport(elevel, rest)
Definition: elog.h:122
int max_wal_senders
Definition: walsender.c:112
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:316
uintptr_t Datum
Definition: postgres.h:374
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:2674
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:148
int allowedModes
Definition: execnodes.h:199
SetFunctionReturnMode returnMode
Definition: execnodes.h:201
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1110
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:133
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:204
static Datum values[MAXATTR]
Definition: bootstrap.c:162
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:197
#define Int32GetDatum(X)
Definition: postgres.h:487
TupleDesc setDesc
Definition: execnodes.h:205
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:90
#define elog
Definition: elog.h:219
XLogRecPtr apply
Definition: pg_list.h:45
static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1461 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1462 {
1463  bool changed = false;
1465 
1466  Assert(lsn != InvalidXLogRecPtr);
1467  SpinLockAcquire(&slot->mutex);
1468  if (slot->data.restart_lsn != lsn)
1469  {
1470  changed = true;
1471  slot->data.restart_lsn = lsn;
1472  }
1473  SpinLockRelease(&slot->mutex);
1474 
1475  if (changed)
1476  {
1479  }
1480 
1481  /*
1482  * One could argue that the slot should be saved to disk now, but that'd
1483  * be energy wasted - the worst lost information can do here is give us
1484  * wrong information in a statistics view - we'll just potentially be more
1485  * conservative in removing files.
1486  */
1487 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:664
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
#define Assert(condition)
Definition: c.h:671
XLogRecPtr restart_lsn
Definition: slot.h:68
slock_t mutex
Definition: slot.h:88
void ReplicationSlotMarkDirty(void)
Definition: slot.c:580
static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin)
static

Definition at line 1548 of file walsender.c.

References ReplicationSlot::data, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyPgXact, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGXACT::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1549 {
1550  bool changed = false;
1552 
1553  SpinLockAcquire(&slot->mutex);
1555 
1556  /*
1557  * For physical replication we don't need the interlock provided by xmin
1558  * and effective_xmin since the consequences of a missed increase are
1559  * limited to query cancellations, so set both at once.
1560  */
1561  if (!TransactionIdIsNormal(slot->data.xmin) ||
1562  !TransactionIdIsNormal(feedbackXmin) ||
1563  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1564  {
1565  changed = true;
1566  slot->data.xmin = feedbackXmin;
1567  slot->effective_xmin = feedbackXmin;
1568  }
1569  SpinLockRelease(&slot->mutex);
1570 
1571  if (changed)
1572  {
1575  }
1576 }
TransactionId xmin
Definition: proc.h:203
ReplicationSlotPersistentData data
Definition: slot.h:115
PGXACT * MyPgXact
Definition: proc.c:68
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:111
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:57
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
slock_t mutex
Definition: slot.h:88
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:616
void ReplicationSlotMarkDirty(void)
Definition: slot.c:580
static void ProcessRepliesIfAny ( void  )
static

Definition at line 1328 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_reply_timestamp, NULL, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1329 {
1330  unsigned char firstchar;
1331  int r;
1332  bool received = false;
1333 
1334  for (;;)
1335  {
1336  pq_startmsgread();
1337  r = pq_getbyte_if_available(&firstchar);
1338  if (r < 0)
1339  {
1340  /* unexpected error or EOF */
1342  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1343  errmsg("unexpected EOF on standby connection")));
1344  proc_exit(0);
1345  }
1346  if (r == 0)
1347  {
1348  /* no data available without blocking */
1349  pq_endmsgread();
1350  break;
1351  }
1352 
1353  /* Read the message contents */
1355  if (pq_getmessage(&reply_message, 0))
1356  {
1358  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1359  errmsg("unexpected EOF on standby connection")));
1360  proc_exit(0);
1361  }
1362 
1363  /*
1364  * If we already received a CopyDone from the frontend, the frontend
1365  * should not send us anything until we've closed our end of the COPY.
1366  * XXX: In theory, the frontend could already send the next command
1367  * before receiving the CopyDone, but libpq doesn't currently allow
1368  * that.
1369  */
1370  if (streamingDoneReceiving && firstchar != 'X')
1371  ereport(FATAL,
1372  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1373  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1374  firstchar)));
1375 
1376  /* Handle the very limited subset of commands expected in this phase */
1377  switch (firstchar)
1378  {
1379  /*
1380  * 'd' means a standby reply wrapped in a CopyData packet.
1381  */
1382  case 'd':
1384  received = true;
1385  break;
1386 
1387  /*
1388  * CopyDone means the standby requested to finish streaming.
1389  * Reply with CopyDone, if we had not sent that already.
1390  */
1391  case 'c':
1392  if (!streamingDoneSending)
1393  {
1394  pq_putmessage_noblock('c', NULL, 0);
1395  streamingDoneSending = true;
1396  }
1397 
1398  streamingDoneReceiving = true;
1399  received = true;
1400  break;
1401 
1402  /*
1403  * 'X' means that the standby is closing down the socket.
1404  */
1405  case 'X':
1406  proc_exit(0);
1407 
1408  default:
1409  ereport(FATAL,
1410  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1411  errmsg("invalid standby message type \"%c\"",
1412  firstchar)));
1413  }
1414  }
1415 
1416  /*
1417  * Save the last reply timestamp if we've received at least one reply.
1418  */
1419  if (received)
1420  {
1422  waiting_for_ping_response = false;
1423  }
1424 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1430
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
void pq_startmsgread(void)
Definition: pqcomm.c:1157
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:987
static bool streamingDoneSending
Definition: walsender.c:170
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
#define ereport(elevel, rest)
Definition: elog.h:122
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1219
static StringInfoData reply_message
Definition: walsender.c:152
void pq_endmsgread(void)
Definition: pqcomm.c:1181
#define NULL
Definition: c.h:226
static bool streamingDoneReceiving
Definition: walsender.c:171
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:162
static TimestampTz last_reply_timestamp
Definition: walsender.c:159
static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 1582 of file walsender.c.

References DEBUG2, elog, GetNextXidAndEpoch(), InvalidTransactionId, MyPgXact, MyReplicationSlot, NULL, PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), TransactionIdIsNormal, TransactionIdPrecedesOrEquals(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

1583 {
1584  TransactionId nextXid;
1585  uint32 nextEpoch;
1586  TransactionId feedbackXmin;
1587  uint32 feedbackEpoch;
1588 
1589  /*
1590  * Decipher the reply message. The caller already consumed the msgtype
1591  * byte.
1592  */
1593  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1594  feedbackXmin = pq_getmsgint(&reply_message, 4);
1595  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1596 
1597  elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
1598  feedbackXmin,
1599  feedbackEpoch);
1600 
1601  /* Unset WalSender's xmin if the feedback message value is invalid */
1602  if (!TransactionIdIsNormal(feedbackXmin))
1603  {
1605  if (MyReplicationSlot != NULL)
1606  PhysicalReplicationSlotNewXmin(feedbackXmin);
1607  return;
1608  }
1609 
1610  /*
1611  * Check that the provided xmin/epoch are sane, that is, not in the future
1612  * and not so far back as to be already wrapped around. Ignore if not.
1613  *
1614  * Epoch of nextXid should be same as standby, or if the counter has
1615  * wrapped, then one greater than standby.
1616  */
1617  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1618 
1619  if (feedbackXmin <= nextXid)
1620  {
1621  if (feedbackEpoch != nextEpoch)
1622  return;
1623  }
1624  else
1625  {
1626  if (feedbackEpoch + 1 != nextEpoch)
1627  return;
1628  }
1629 
1630  if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
1631  return; /* epoch OK, but it's wrapped around */
1632 
1633  /*
1634  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1635  * the xmin will be taken into account by GetOldestXmin. This will hold
1636  * back the removal of dead rows and thereby prevent the generation of
1637  * cleanup conflicts on the standby server.
1638  *
1639  * There is a small window for a race condition here: although we just
1640  * checked that feedbackXmin precedes nextXid, the nextXid could have
1641  * gotten advanced between our fetching it and applying the xmin below,
1642  * perhaps far enough to make feedbackXmin wrap around. In that case the
1643  * xmin we set here would be "in the future" and have no effect. No point
1644  * in worrying about this since it's too late to save the desired data
1645  * anyway. Assuming that the standby sends us an increasing sequence of
1646  * xmins, this could only happen during the first reply cycle, else our
1647  * own xmin would prevent nextXid from advancing so far.
1648  *
1649  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1650  * is assumed atomic, and there's no real need to prevent a concurrent
1651  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1652  * safe, and if we're moving it backwards, well, the data is at risk
1653  * already since a VACUUM could have just finished calling GetOldestXmin.)
1654  *
1655  * If we're using a replication slot we reserve the xmin via that,
1656  * otherwise via the walsender's PGXACT entry.
1657  *
1658  * XXX: It might make sense to generalize the ephemeral slot concept and
1659  * always use the slot mechanism to handle the feedback xmin.
1660  */
1661  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1662  PhysicalReplicationSlotNewXmin(feedbackXmin);
1663  else
1664  MyPgXact->xmin = feedbackXmin;
1665 }
uint32 TransactionId
Definition: c.h:394
TransactionId xmin
Definition: proc.h:203
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
Definition: walsender.c:1548
PGXACT * MyPgXact
Definition: proc.c:68
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8223
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:265
static StringInfoData reply_message
Definition: walsender.c:152
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
#define NULL
Definition: c.h:226
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define elog
Definition: elog.h:219
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static void ProcessStandbyMessage ( void  )
static

Definition at line 1430 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1431 {
1432  char msgtype;
1433 
1434  /*
1435  * Check message type from the first byte.
1436  */
1437  msgtype = pq_getmsgbyte(&reply_message);
1438 
1439  switch (msgtype)
1440  {
1441  case 'r':
1443  break;
1444 
1445  case 'h':
1447  break;
1448 
1449  default:
1451  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1452  errmsg("unexpected message type \"%c\"", msgtype)));
1453  proc_exit(0);
1454  }
1455 }
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static StringInfoData reply_message
Definition: walsender.c:152
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1493
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1582
static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1493 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, DEBUG2, elog, WalSnd::flush, InvalidXLogRecPtr, LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), SlotIsLogical, SpinLockAcquire, SpinLockRelease, SyncRepReleaseWaiters(), WalSndKeepalive(), and WalSnd::write.

Referenced by ProcessStandbyMessage().

1494 {
1495  XLogRecPtr writePtr,
1496  flushPtr,
1497  applyPtr;
1498  bool replyRequested;
1499 
1500  /* the caller already consumed the msgtype byte */
1501  writePtr = pq_getmsgint64(&reply_message);
1502  flushPtr = pq_getmsgint64(&reply_message);
1503  applyPtr = pq_getmsgint64(&reply_message);
1504  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1505  replyRequested = pq_getmsgbyte(&reply_message);
1506 
1507  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1508  (uint32) (writePtr >> 32), (uint32) writePtr,
1509  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1510  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1511  replyRequested ? " (reply requested)" : "");
1512 
1513  /* Send a reply if the standby requested one. */
1514  if (replyRequested)
1515  WalSndKeepalive(false);
1516 
1517  /*
1518  * Update shared state for this WalSender process based on reply data from
1519  * standby.
1520  */
1521  {
1522  WalSnd *walsnd = MyWalSnd;
1523 
1524  SpinLockAcquire(&walsnd->mutex);
1525  walsnd->write = writePtr;
1526  walsnd->flush = flushPtr;
1527  walsnd->apply = applyPtr;
1528  SpinLockRelease(&walsnd->mutex);
1529  }
1530 
1533 
1534  /*
1535  * Advance our local xmin horizon when the client confirmed a flush.
1536  */
1537  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1538  {
1541  else
1543  }
1544 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:2838
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1461
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:265
#define SlotIsLogical(slot)
Definition: slot.h:134
static StringInfoData reply_message
Definition: walsender.c:152
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:103
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:884
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define elog
Definition: elog.h:219
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:405
bool am_cascading_walsender
Definition: walsender.c:107
static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 392 of file walsender.c.

References buf, BYTEAOID, CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint(), pq_sendstring(), read, TEXTOID, TimeLineHistoryCmd::timeline, TLHistoryFileName, and TLHistoryFilePath.

Referenced by exec_replication_command().

393 {
395  char histfname[MAXFNAMELEN];
396  char path[MAXPGPATH];
397  int fd;
398  off_t histfilelen;
399  off_t bytesleft;
400  Size len;
401 
402  /*
403  * Reply with a result set with one row, and two columns. The first col is
404  * the name of the history file, 2nd is the contents.
405  */
406 
407  TLHistoryFileName(histfname, cmd->timeline);
408  TLHistoryFilePath(path, cmd->timeline);
409 
410  /* Send a RowDescription message */
411  pq_beginmessage(&buf, 'T');
412  pq_sendint(&buf, 2, 2); /* 2 fields */
413 
414  /* first field */
415  pq_sendstring(&buf, "filename"); /* col name */
416  pq_sendint(&buf, 0, 4); /* table oid */
417  pq_sendint(&buf, 0, 2); /* attnum */
418  pq_sendint(&buf, TEXTOID, 4); /* type oid */
419  pq_sendint(&buf, -1, 2); /* typlen */
420  pq_sendint(&buf, 0, 4); /* typmod */
421  pq_sendint(&buf, 0, 2); /* format code */
422 
423  /* second field */
424  pq_sendstring(&buf, "content"); /* col name */
425  pq_sendint(&buf, 0, 4); /* table oid */
426  pq_sendint(&buf, 0, 2); /* attnum */
427  pq_sendint(&buf, BYTEAOID, 4); /* type oid */
428  pq_sendint(&buf, -1, 2); /* typlen */
429  pq_sendint(&buf, 0, 4); /* typmod */
430  pq_sendint(&buf, 0, 2); /* format code */
431  pq_endmessage(&buf);
432 
433  /* Send a DataRow message */
434  pq_beginmessage(&buf, 'D');
435  pq_sendint(&buf, 2, 2); /* # of columns */
436  len = strlen(histfname);
437  pq_sendint(&buf, len, 4); /* col1 len */
438  pq_sendbytes(&buf, histfname, len);
439 
440  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
441  if (fd < 0)
442  ereport(ERROR,
444  errmsg("could not open file \"%s\": %m", path)));
445 
446  /* Determine file length and send it to client */
447  histfilelen = lseek(fd, 0, SEEK_END);
448  if (histfilelen < 0)
449  ereport(ERROR,
451  errmsg("could not seek to end of file \"%s\": %m", path)));
452  if (lseek(fd, 0, SEEK_SET) != 0)
453  ereport(ERROR,
455  errmsg("could not seek to beginning of file \"%s\": %m", path)));
456 
457  pq_sendint(&buf, histfilelen, 4); /* col2 len */
458 
459  bytesleft = histfilelen;
460  while (bytesleft > 0)
461  {
462  char rbuf[BLCKSZ];
463  int nread;
464 
465  nread = read(fd, rbuf, sizeof(rbuf));
466  if (nread <= 0)
467  ereport(ERROR,
469  errmsg("could not read file \"%s\": %m",
470  path)));
471  pq_sendbytes(&buf, rbuf, nread);
472  bytesleft -= nread;
473  }
474  CloseTransientFile(fd);
475 
476  pq_endmessage(&buf);
477 }
#define TEXTOID
Definition: pg_type.h:324
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
TimeLineID timeline
Definition: replnodes.h:96
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:65
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2093
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2254
#define MAXFNAMELEN
size_t Size
Definition: c.h:353
#define BYTEAOID
Definition: pg_type.h:292
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:115
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define read(a, b, c)
Definition: win32.h:18
#define TLHistoryFilePath(path, tli)
static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 888 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), LOG, logical_read_xlog_page(), logical_startptr, WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint(), proc_exit(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sendTimeLine, sendTimeLineIsHistoric, WalSnd::sentPtr, sentPtr, StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), ThisTimeLineID, walsender_ready_to_stop, WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndWriteData(), and XLogSendLogical().

Referenced by exec_replication_command().

889 {
891 
892  /* make sure that our requirements are still fulfilled */
894 
896 
898 
899  /*
900  * Force a disconnect, so that the decoding code doesn't need to care
901  * about an eventual switch from running in recovery, to running in a
902  * normal environment. Client code is expected to handle reconnects.
903  */
905  {
906  ereport(LOG,
907  (errmsg("terminating walsender process after promotion")));
909  }
910 
912 
913  /* Send a CopyBothResponse message, and start streaming */
914  pq_beginmessage(&buf, 'W');
915  pq_sendbyte(&buf, 0);
916  pq_sendint(&buf, 0, 2);
917  pq_endmessage(&buf);
918  pq_flush();
919 
920  /* setup state for XLogReadPage */
921  sendTimeLineIsHistoric = false;
923 
924  /*
925  * Initialize position to the last ack'ed one, then the xlog records begin
926  * to be shipped from that position.
927  */
929  cmd->startpoint, cmd->options,
932 
933  /* Start reading WAL from the oldest required WAL. */
935 
936  /*
937  * Report the location after which we'll send out further commits as the
938  * current sentPtr.
939  */
941 
942  /* Also update the sent position status in shared memory */
943  {
944  WalSnd *walsnd = MyWalSnd;
945 
946  SpinLockAcquire(&walsnd->mutex);
948  SpinLockRelease(&walsnd->mutex);
949  }
950 
951  replication_active = true;
952 
954 
955  /* Main loop of walsender */
957 
960 
961  replication_active = false;
963  proc_exit(0);
965 
966  /* Get out of COPY mode (CommandComplete). */
967  EndCommand("COPY 0", DestRemote);
968 }
XLogRecPtr startpoint
Definition: replnodes.h:84
#define pq_flush()
Definition: libpq.h:39
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void proc_exit(int code)
Definition: ipc.c:99
ReplicationSlotPersistentData data
Definition: slot.h:115
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7805
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
XLogRecPtr confirmed_flush
Definition: slot.h:76
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:710
#define SpinLockAcquire(lock)
Definition: spin.h:62
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:178
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:188
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:327
static char * buf
Definition: pg_test_fsync.c:65
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1006
static XLogRecPtr logical_startptr
Definition: walsender.c:189
void ReplicationSlotRelease(void)
Definition: slot.c:374
void SyncRepInitConfig(void)
Definition: syncrep.c:377
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:1744
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:979
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:186
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:103
static XLogRecPtr sentPtr
Definition: walsender.c:148
TimeLineID ThisTimeLineID
Definition: xlog.c:178
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
static TimeLineID sendTimeLine
Definition: walsender.c:139
#define Assert(condition)
Definition: c.h:671
void WalSndSetState(WalSndState state)
Definition: walsender.c:2655
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:457
static void XLogSendLogical(void)
Definition: walsender.c:2372
XLogRecPtr restart_lsn
Definition: slot.h:68
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:325
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
static bool sendTimeLineIsHistoric
Definition: walsender.c:141
bool am_cascading_walsender
Definition: walsender.c:107
static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 486 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), Int64GetDatum(), INT8OID, InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_puttextmessage(), pq_sendbyte(), pq_sendint(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TEXTOID, ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TupleDescInitBuiltinEntry(), values, walsender_ready_to_stop, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

487 {
489  XLogRecPtr FlushPtr;
490 
491  /*
492  * We assume here that we're logging enough information in the WAL for
493  * log-shipping, since this is checked in PostmasterMain().
494  *
495  * NOTE: wal_level can only change at shutdown, so in most cases it is
496  * difficult for there to be WAL data that we can still see that was
497  * written at wal_level='minimal'.
498  */
499 
500  if (cmd->slotname)
501  {
504  ereport(ERROR,
505  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
506  (errmsg("cannot use a logical replication slot for physical replication"))));
507  }
508 
509  /*
510  * Select the timeline. If it was given explicitly by the client, use
511  * that. Otherwise use the timeline of the last replayed record, which is
512  * kept in ThisTimeLineID.
513  */
515  {
516  /* this also updates ThisTimeLineID */
517  FlushPtr = GetStandbyFlushRecPtr();
518  }
519  else
520  FlushPtr = GetFlushRecPtr();
521 
522  if (cmd->timeline != 0)
523  {
524  XLogRecPtr switchpoint;
525 
526  sendTimeLine = cmd->timeline;
528  {
529  sendTimeLineIsHistoric = false;
531  }
532  else
533  {
534  List *timeLineHistory;
535 
536  sendTimeLineIsHistoric = true;
537 
538  /*
539  * Check that the timeline the client requested for exists, and
540  * the requested start location is on that timeline.
541  */
542  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
543  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
545  list_free_deep(timeLineHistory);
546 
547  /*
548  * Found the requested timeline in the history. Check that
549  * requested startpoint is on that timeline in our history.
550  *
551  * This is quite loose on purpose. We only check that we didn't
552  * fork off the requested timeline before the switchpoint. We
553  * don't check that we switched *to* it before the requested
554  * starting point. This is because the client can legitimately
555  * request to start replication from the beginning of the WAL
556  * segment that contains switchpoint, but on the new timeline, so
557  * that it doesn't end up with a partial segment. If you ask for a
558  * too old starting point, you'll get an error later when we fail
559  * to find the requested WAL segment in pg_wal.
560  *
561  * XXX: we could be more strict here and only allow a startpoint
562  * that's older than the switchpoint, if it's still in the same
563  * WAL segment.
564  */
565  if (!XLogRecPtrIsInvalid(switchpoint) &&
566  switchpoint < cmd->startpoint)
567  {
568  ereport(ERROR,
569  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
570  (uint32) (cmd->startpoint >> 32),
571  (uint32) (cmd->startpoint),
572  cmd->timeline),
573  errdetail("This server's history forked from timeline %u at %X/%X.",
574  cmd->timeline,
575  (uint32) (switchpoint >> 32),
576  (uint32) (switchpoint))));
577  }
578  sendTimeLineValidUpto = switchpoint;
579  }
580  }
581  else
582  {
585  sendTimeLineIsHistoric = false;
586  }
587 
589 
590  /* If there is nothing to stream, don't even enter COPY mode */
592  {
593  /*
594  * When we first start replication the standby will be behind the
595  * primary. For some applications, for example, synchronous
596  * replication, it is important to have a clear state for this initial
597  * catchup mode, so we can trigger actions when we change streaming
598  * state later. We may stay in this state for a long time, which is
599  * exactly why we want to be able to monitor whether or not we are
600  * still here.
601  */
603 
604  /* Send a CopyBothResponse message, and start streaming */
605  pq_beginmessage(&buf, 'W');
606  pq_sendbyte(&buf, 0);
607  pq_sendint(&buf, 0, 2);
608  pq_endmessage(&buf);
609  pq_flush();
610 
611  /*
612  * Don't allow a request to stream from a future point in WAL that
613  * hasn't been flushed to disk in this server yet.
614  */
615  if (FlushPtr < cmd->startpoint)
616  {
617  ereport(ERROR,
618  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
619  (uint32) (cmd->startpoint >> 32),
620  (uint32) (cmd->startpoint),
621  (uint32) (FlushPtr >> 32),
622  (uint32) (FlushPtr))));
623  }
624 
625  /* Start streaming from the requested point */
626  sentPtr = cmd->startpoint;
627 
628  /* Initialize shared memory status, too */
629  {
630  WalSnd *walsnd = MyWalSnd;
631 
632  SpinLockAcquire(&walsnd->mutex);
633  walsnd->sentPtr = sentPtr;
634  SpinLockRelease(&walsnd->mutex);
635  }
636 
638 
639  /* Main loop of walsender */
640  replication_active = true;
641 
643 
644  replication_active = false;
646  proc_exit(0);
648 
650  }
651 
652  if (cmd->slotname)
654 
655  /*
656  * Copy is finished now. Send a single-row result set indicating the next
657  * timeline.
658  */
660  {
661  char startpos_str[8 + 1 + 8 + 1];
662  DestReceiver *dest;
663  TupOutputState *tstate;
664  TupleDesc tupdesc;
665  Datum values[2];
666  bool nulls[2];
667 
668  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
669  (uint32) (sendTimeLineValidUpto >> 32),
671 
673  MemSet(nulls, false, sizeof(nulls));
674 
675  /*
676  * Need a tuple descriptor representing two columns.
677  * int8 may seem like a surprising data type for this, but in theory
678  * int4 would not be wide enough for this, as TimeLineID is unsigned.
679  */
680  tupdesc = CreateTemplateTupleDesc(2, false);
681  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
682  INT8OID, -1, 0);
683  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
684  TEXTOID, -1, 0);
685 
686  /* prepare for projection of tuple */
687  tstate = begin_tup_output_tupdesc(dest, tupdesc);
688 
689  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
690  values[1] = CStringGetTextDatum(startpos_str);
691 
692  /* send it to dest */
693  do_tup_output(tstate, values, nulls);
694 
695  end_tup_output(tstate);
696  }
697 
698  /* Send CommandComplete message */
699  pq_puttextmessage('C', "START_STREAMING");
700 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:84
#define pq_flush()
Definition: libpq.h:39
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define TEXTOID
Definition: pg_type.h:324
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:74
static void XLogSendPhysical(void)
Definition: walsender.c:2139
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define MemSet(start, val, len)
Definition: c.h:853
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8154
void list_free_deep(List *list)
Definition: list.c:1147
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:178
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:563
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:65
static bool streamingDoneSending
Definition: walsender.c:170
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
int errdetail(const char *fmt,...)
Definition: elog.c:873
unsigned int uint32
Definition: c.h:265
void ReplicationSlotRelease(void)
Definition: slot.c:374
#define SlotIsLogical(slot)
Definition: slot.h:134
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:2102
void SyncRepInitConfig(void)
Definition: syncrep.c:377
#define ereport(elevel, rest)
Definition: elog.h:122
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:1744
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:186
uintptr_t Datum
Definition: postgres.h:374
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:552
WalSnd * MyWalSnd
Definition: walsender.c:103
static XLogRecPtr sentPtr
Definition: walsender.c:148
TimeLineID ThisTimeLineID
Definition: xlog.c:178
#define INT8OID
Definition: pg_type.h:304
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
static TimeLineID sendTimeLine
Definition: walsender.c:139
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
void WalSndSetState(WalSndState state)
Definition: walsender.c:2655
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:325
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:140
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:142
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
static bool streamingDoneReceiving
Definition: walsender.c:171
static Datum values[MAXATTR]
Definition: bootstrap.c:162
int errmsg(const char *fmt,...)
Definition: elog.c:797
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
TimeLineID timeline
Definition: replnodes.h:83
#define CStringGetTextDatum(s)
Definition: builtins.h:90
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2469
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:400
Definition: pg_list.h:45
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:141
bool am_cascading_walsender
Definition: walsender.c:107
static void WalSndCheckTimeOut ( TimestampTz  now)
static

Definition at line 1717 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1718 {
1719  TimestampTz timeout;
1720 
1721  /* don't bail out if we're doing something that doesn't require timeouts */
1722  if (last_reply_timestamp <= 0)
1723  return;
1724 
1727 
1728  if (wal_sender_timeout > 0 && now >= timeout)
1729  {
1730  /*
1731  * Since typically expiration of replication timeout means
1732  * communication problem, we don't send the error message to the
1733  * standby.
1734  */
1736  (errmsg("terminating walsender process due to replication timeout")));
1737 
1738  WalSndShutdown();
1739  }
1740 }
int wal_sender_timeout
Definition: walsender.c:113
int64 TimestampTz
Definition: timestamp.h:39
#define COMMERROR
Definition: elog.h:30
#define ereport(elevel, rest)
Definition: elog.h:122
static void WalSndShutdown(void)
Definition: walsender.c:201
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int errmsg(const char *fmt,...)
Definition: elog.c:797
static TimestampTz last_reply_timestamp
Definition: walsender.c:159
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 1675 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1676 {
1677  long sleeptime = 10000; /* 10 s */
1678 
1680  {
1681  TimestampTz wakeup_time;
1682  long sec_to_timeout;
1683  int microsec_to_timeout;
1684 
1685  /*
1686  * At the latest stop sleeping once wal_sender_timeout has been
1687  * reached.
1688  */
1691 
1692  /*
1693  * If no ping has been sent yet, wakeup when it's time to do so.
1694  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
1695  * the timeout passed without a response.
1696  */
1699  wal_sender_timeout / 2);
1700 
1701  /* Compute relative time until wakeup. */
1702  TimestampDifference(now, wakeup_time,
1703  &sec_to_timeout, &microsec_to_timeout);
1704 
1705  sleeptime = sec_to_timeout * 1000 +
1706  microsec_to_timeout / 1000;
1707  }
1708 
1709  return sleeptime;
1710 }
int wal_sender_timeout
Definition: walsender.c:113
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1623
static bool waiting_for_ping_response
Definition: walsender.c:162
static TimestampTz last_reply_timestamp
Definition: walsender.c:159
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2428 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2429 {
2430  XLogRecPtr replicatedPtr;
2431 
2432  /* ... let's just be real sure we're caught up ... */
2433  send_data();
2434 
2435  /*
2436  * To figure out whether all WAL has successfully been replicated, check
2437  * flush location if valid, write otherwise. Tools like pg_receivewal
2438  * will usually (unless in synchronous mode) return an invalid flush
2439  * location.
2440  */
2441  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2443 
2444  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2445  !pq_is_send_pending())
2446  {
2447  /* Inform the standby that XLOG streaming is done */
2448  EndCommand("COPY 0", DestRemote);
2449  pq_flush();
2450 
2451  proc_exit(0);
2452  }
2454  {
2455  WalSndKeepalive(true);
2457  }
2458 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:2838
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
static bool WalSndCaughtUp
Definition: walsender.c:174
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:103
static XLogRecPtr sentPtr
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool waiting_for_ping_response
Definition: walsender.c:162
void WalSndErrorCleanup ( void  )

Definition at line 257 of file walsender.c.

References close, ConditionVariableCancelSleep(), LWLockReleaseAll(), MyReplicationSlot, NULL, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, walsender_ready_to_stop, WalSndSetState(), and WALSNDSTATE_STARTUP.

Referenced by PostgresMain().

258 {
262 
263  if (sendFile >= 0)
264  {
265  close(sendFile);
266  sendFile = -1;
267  }
268 
269  if (MyReplicationSlot != NULL)
271 
273 
274  replication_active = false;
276  proc_exit(0);
277 
278  /* Revert back to startup state */
280 }
static int sendFile
Definition: walsender.c:126
void proc_exit(int code)
Definition: ipc.c:99
void ReplicationSlotCleanup()
Definition: slot.c:412
void ConditionVariableCancelSleep(void)
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:178
void ReplicationSlotRelease(void)
Definition: slot.c:374
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1124
static volatile sig_atomic_t replication_active
Definition: walsender.c:186
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
#define NULL
Definition: c.h:226
void WalSndSetState(WalSndState state)
Definition: walsender.c:2655
void LWLockReleaseAll(void)
Definition: lwlock.c:1813
#define close(a)
Definition: win32.h:17
static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 2674 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

2675 {
2676  switch (state)
2677  {
2678  case WALSNDSTATE_STARTUP:
2679  return "startup";
2680  case WALSNDSTATE_BACKUP:
2681  return "backup";
2682  case WALSNDSTATE_CATCHUP:
2683  return "catchup";
2684  case WALSNDSTATE_STREAMING:
2685  return "streaming";
2686  }
2687  return "UNKNOWN";
2688 }
Definition: regguts.h:298
static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 2838 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), and sentPtr.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

2839 {
2840  elog(DEBUG2, "sending replication keepalive");
2841 
2842  /* construct the message... */
2844  pq_sendbyte(&output_message, 'k');
2847  pq_sendbyte(&output_message, requestReply ? 1 : 0);
2848 
2849  /* ... and send it wrapped in CopyData */
2851 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
static StringInfoData output_message
Definition: walsender.c:151
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
static XLogRecPtr sentPtr
Definition: walsender.c:148
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog
Definition: elog.h:219
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
static void WalSndKeepaliveIfNecessary ( TimestampTz  now)
static

Definition at line 2857 of file walsender.c.

References last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2858 {
2859  TimestampTz ping_time;
2860 
2861  /*
2862  * Don't send keepalive messages if timeouts are globally disabled or
2863  * we're doing something not partaking in timeouts.
2864  */
2865  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
2866  return;
2867 
2869  return;
2870 
2871  /*
2872  * If half of wal_sender_timeout has lapsed without receiving any reply
2873  * from the standby, send a keep-alive message to the standby requesting
2874  * an immediate reply.
2875  */
2877  wal_sender_timeout / 2);
2878  if (now >= ping_time)
2879  {
2880  WalSndKeepalive(true);
2882 
2883  /* Try to flush pending output to the client */
2884  if (pq_flush_if_writable() != 0)
2885  WalSndShutdown();
2886  }
2887 }
int wal_sender_timeout
Definition: walsender.c:113
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:2838
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:201
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:162
static TimestampTz last_reply_timestamp
Definition: walsender.c:159
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 1938 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, NULL, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

1939 {
1940  WalSnd *walsnd = MyWalSnd;
1941 
1942  Assert(walsnd != NULL);
1943 
1944  MyWalSnd = NULL;
1945 
1946  SpinLockAcquire(&walsnd->mutex);
1947  /* clear latch while holding the spinlock, so it can safely be read */
1948  walsnd->latch = NULL;
1949  /* Mark WalSnd struct as no longer being in use. */
1950  walsnd->pid = 0;
1951  SpinLockRelease(&walsnd->mutex);
1952 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:103
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 2542 of file walsender.c.

References MyLatch, MyProcPid, replication_active, SetLatch(), and walsender_ready_to_stop.

Referenced by WalSndSignals().

2543 {
2544  int save_errno = errno;
2545 
2546  /*
2547  * If replication has not yet started, die like with SIGTERM. If
2548  * replication is active, only set a flag and wake up the main loop. It
2549  * will send any outstanding WAL, wait for it to be replicated to the
2550  * standby, and then exit gracefully.
2551  */
2552  if (!replication_active)
2553  kill(MyProcPid, SIGTERM);
2554 
2555  walsender_ready_to_stop = true;
2556  SetLatch(MyLatch);
2557 
2558  errno = save_errno;
2559 }
int MyProcPid
Definition: globals.c:38
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:178
static volatile sig_atomic_t replication_active
Definition: walsender.c:186
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
struct Latch * MyLatch
Definition: globals.c:51
static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 1744 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGHUP, last_reply_timestamp, MyLatch, MyProcPort, now(), PGC_SIGHUP, pgstat_report_activity(), PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, STATE_RUNNING, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), walsender_ready_to_stop, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by StartLogicalReplication(), and StartReplication().

1745 {
1746  /*
1747  * Initialize the last reply timestamp. That enables timeout processing
1748  * from hereon.
1749  */
1751  waiting_for_ping_response = false;
1752 
1753  /* Report to pgstat that this process is a WAL sender */
1754  pgstat_report_activity(STATE_RUNNING, "walsender");
1755 
1756  /*
1757  * Loop until we reach the end of this timeline or the client requests to
1758  * stop streaming.
1759  */
1760  for (;;)
1761  {
1762  TimestampTz now;
1763 
1764  /*
1765  * Emergency bailout if postmaster has died. This is to avoid the
1766  * necessity for manual cleanup of all postmaster children.
1767  */
1768  if (!PostmasterIsAlive())
1769  exit(1);
1770 
1771  /* Clear any already-pending wakeups */
1773 
1775 
1776  /* Process any requests or signals received recently */
1777  if (got_SIGHUP)
1778  {
1779  got_SIGHUP = false;
1782  }
1783 
1784  /* Check for input from the client */
1786 
1787  /*
1788  * If we have received CopyDone from the client, sent CopyDone
1789  * ourselves, and the output buffer is empty, it's time to exit
1790  * streaming.
1791  */
1793  break;
1794 
1795  /*
1796  * If we don't have any pending data in the output buffer, try to send
1797  * some more. If there is some, we don't bother to call send_data
1798  * again until we've flushed it ... but we'd better assume we are not
1799  * caught up.
1800  */
1801  if (!pq_is_send_pending())
1802  send_data();
1803  else
1804  WalSndCaughtUp = false;
1805 
1806  /* Try to flush pending output to the client */
1807  if (pq_flush_if_writable() != 0)
1808  WalSndShutdown();
1809 
1810  /* If nothing remains to be sent right now ... */
1812  {
1813  /*
1814  * If we're in catchup state, move to streaming. This is an
1815  * important state change for users to know about, since before
1816  * this point data loss might occur if the primary dies and we
1817  * need to failover to the standby. The state change is also
1818  * important for synchronous replication, since commits that
1819  * started to wait at that point might wait for some time.
1820  */
1822  {
1823  ereport(DEBUG1,
1824  (errmsg("standby \"%s\" has now caught up with primary",
1825  application_name)));
1827  }
1828 
1829  /*
1830  * When SIGUSR2 arrives, we send any outstanding logs up to the
1831  * shutdown checkpoint record (i.e., the latest record), wait for
1832  * them to be replicated to the standby, and exit. This may be a
1833  * normal termination at shutdown, or a promotion, the walsender
1834  * is not sure which.
1835  */
1837  WalSndDone(send_data);
1838  }
1839 
1840  now = GetCurrentTimestamp();
1841 
1842  /* Check for replication timeout. */
1843  WalSndCheckTimeOut(now);
1844 
1845  /* Send keepalive if the time has come */
1847 
1848  /*
1849  * We don't block if not caught up, unless there is unsent data
1850  * pending in which case we'd better block until the socket is
1851  * write-ready. This test is only needed for the case where the
1852  * send_data callback handled a subset of the available data but then
1853  * pq_flush_if_writable flushed it all --- we should immediately try
1854  * to send more.
1855  */
1857  {
1858  long sleeptime;
1859  int wakeEvents;
1860 
1861  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
1863 
1864  sleeptime = WalSndComputeSleeptime(now);
1865 
1866  if (pq_is_send_pending())
1867  wakeEvents |= WL_SOCKET_WRITEABLE;
1868 
1869  /* Sleep until something happens or we time out */
1870  WaitLatchOrSocket(MyLatch, wakeEvents,
1871  MyProcPort->sock, sleeptime,
1873  }
1874  }
1875  return;
1876 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2805
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2428
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
pgsocket sock
Definition: libpq-be.h:118
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:177
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:178
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:170
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void SyncRepInitConfig(void)
Definition: syncrep.c:377
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:174
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:201
WalSnd * MyWalSnd
Definition: walsender.c:103
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:2857
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1675
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1717
void WalSndSetState(WalSndState state)
Definition: walsender.c:2655
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
static bool streamingDoneReceiving
Definition: walsender.c:171
char * application_name
Definition: guc.c:471
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static bool waiting_for_ping_response
Definition: walsender.c:162
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:159
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1328
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 979 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

980 {
981  /* can't have sync rep confused by sending the same LSN several times */
982  if (!last_write)
983  lsn = InvalidXLogRecPtr;
984 
985  resetStringInfo(ctx->out);
986 
987  pq_sendbyte(ctx->out, 'w');
988  pq_sendint64(ctx->out, lsn); /* dataStart */
989  pq_sendint64(ctx->out, lsn); /* walEnd */
990 
991  /*
992  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
993  * reserve space here.
994  */
995  pq_sendint64(ctx->out, 0); /* sendtime */
996 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
StringInfo out
Definition: logical.h:57
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
void WalSndRqstFileReload ( void  )

Definition at line 2499 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2500 {
2501  int i;
2502 
2503  for (i = 0; i < max_wal_senders; i++)
2504  {
2505  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2506 
2507  if (walsnd->pid == 0)
2508  continue;
2509 
2510  SpinLockAcquire(&walsnd->mutex);
2511  walsnd->needreload = true;
2512  SpinLockRelease(&walsnd->mutex);
2513  }
2514 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:100
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:112
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i
void WalSndSetState ( WalSndState  state)

Definition at line 2655 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), and WalSndLoop().

2656 {
2657  WalSnd *walsnd = MyWalSnd;
2658 
2660 
2661  if (walsnd->state == state)
2662  return;
2663 
2664  SpinLockAcquire(&walsnd->mutex);
2665  walsnd->state = state;
2666  SpinLockRelease(&walsnd->mutex);
2667 }
slock_t mutex
bool am_walsender
Definition: walsender.c:106
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:103
#define Assert(condition)
Definition: c.h:671
Definition: regguts.h:298
void WalSndShmemInit ( void  )

Definition at line 2599 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

2600 {
2601  bool found;
2602  int i;
2603 
2604  WalSndCtl = (WalSndCtlData *)
2605  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2606 
2607  if (!found)
2608  {
2609  /* First time through, so initialize */
2611 
2612  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2614 
2615  for (i = 0; i < max_wal_senders; i++)
2616  {
2617  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2618 
2619  SpinLockInit(&walsnd->mutex);
2620  }
2621  }
2622 }
Size WalSndShmemSize(void)
Definition: walsender.c:2587
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:100
#define MemSet(start, val, len)
Definition: c.h:853
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:112
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
Size WalSndShmemSize ( void  )

Definition at line 2587 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2588 {
2589  Size size = 0;
2590 
2591  size = offsetof(WalSndCtlData, walsnds);
2592  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2593 
2594  return size;
2595 }
int max_wal_senders
Definition: walsender.c:112
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:353
#define offsetof(type, field)
Definition: c.h:551
static void WalSndShutdown ( void  )
static

Definition at line 201 of file walsender.c.

References am_cascading_walsender, CurrentResourceOwner, InitWalSenderSlot(), MarkPostmasterChildWalSender(), NULL, PMSIGNAL_ADVANCE_STATE_MACHINE, RecoveryInProgress(), ResourceOwnerCreate(), and SendPostmasterSignal().

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

229 {
231 
232  /* Create a per-walsender data structure in shared memory */
234 
235  /* Set up resource owner */
236  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
237 
238  /*
239  * Let postmaster know that we're a WAL sender. Once we've declared us as
240  * a WAL sender process, postmaster will let us outlive the bgwriter and
241  * kill us last in the shutdown sequence, so we get a chance to stream all
242  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
243  * there's no going back, and we mustn't write any WAL records after this.
244  */
247 }
static void InitWalSenderSlot(void)
Definition: walsender.c:1880
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
bool RecoveryInProgress(void)
Definition: xlog.c:7805
#define NULL
Definition: c.h:226
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
bool am_cascading_walsender
Definition: walsender.c:107
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
static void WalSndSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 2518 of file walsender.c.

References got_SIGHUP, MyLatch, and SetLatch().

Referenced by WalSndSignals().

2519 {
2520  int save_errno = errno;
2521 
2522  got_SIGHUP = true;
2523 
2524  SetLatch(MyLatch);
2525 
2526  errno = save_errno;
2527 }
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:177
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
struct Latch * MyLatch
Definition: globals.c:51
void WalSndSignals ( void  )

Definition at line 2563 of file walsender.c.

References die(), InitializeTimeouts(), pqsignal(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalSndLastCycleHandler(), WalSndSigHupHandler(), and WalSndXLogSendHandler().

Referenced by PostgresMain().

2564 {
2565  /* Set up signal handlers */
2566  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2567  * file */
2568  pqsignal(SIGINT, SIG_IGN); /* not used */
2569  pqsignal(SIGTERM, die); /* request shutdown */
2570  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2571  InitializeTimeouts(); /* establishes SIGALRM handler */
2573  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2574  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2575  * shutdown */
2576 
2577  /* Reset some signals that are accepted by postmaster but not here */
2583 }
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2531
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGUSR1
Definition: win32.h:211
#define SIGCONT
Definition: win32.h:205
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2542
#define SIGWINCH
Definition: win32.h:209
#define SIGTTIN
Definition: win32.h:207
#define SIGQUIT
Definition: win32.h:197
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2518
#define SIG_IGN
Definition: win32.h:193
#define SIGPIPE
Definition: win32.h:201
#define SIGHUP
Definition: win32.h:196
#define SIG_DFL
Definition: win32.h:191
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIGTTOU
Definition: win32.h:208
void die(SIGNAL_ARGS)
Definition: postgres.c:2617
#define SIGCHLD
Definition: win32.h:206
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2556
#define SIGUSR2
Definition: win32.h:212
static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1094 of file walsender.c.

References CHECK_FOR_INTERRUPTS, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGHUP, InvalidXLogRecPtr, MyLatch, MyProcPort, now(), NULL, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), walsender_ready_to_stop, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, and WalSnd::write.

Referenced by logical_read_xlog_page().

1095 {
1096  int wakeEvents;
1097  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1098 
1099 
1100  /*
1101  * Fast path to avoid acquiring the spinlock in the we already know we
1102  * have enough WAL available. This is particularly interesting if we're
1103  * far behind.
1104  */
1105  if (RecentFlushPtr != InvalidXLogRecPtr &&
1106  loc <= RecentFlushPtr)
1107  return RecentFlushPtr;
1108 
1109  /* Get a more recent flush pointer. */
1110  if (!RecoveryInProgress())
1111  RecentFlushPtr = GetFlushRecPtr();
1112  else
1113  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1114 
1115  for (;;)
1116  {
1117  long sleeptime;
1118  TimestampTz now;
1119 
1120  /*
1121  * Emergency bailout if postmaster has died. This is to avoid the
1122  * necessity for manual cleanup of all postmaster children.
1123  */
1124  if (!PostmasterIsAlive())
1125  exit(1);
1126 
1127  /* Clear any already-pending wakeups */
1129 
1131 
1132  /* Process any requests or signals received recently */
1133  if (got_SIGHUP)
1134  {
1135  got_SIGHUP = false;
1138  }
1139 
1140  /* Check for input from the client */
1142 
1143  /* Update our idea of the currently flushed position. */
1144  if (!RecoveryInProgress())
1145  RecentFlushPtr = GetFlushRecPtr();
1146  else
1147  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1148 
1149  /*
1150  * If postmaster asked us to stop, don't wait here anymore. This will
1151  * cause the xlogreader to return without reading a full record, which
1152  * is the fastest way to reach the mainloop which then can quit.
1153  *
1154  * It's important to do this check after the recomputation of
1155  * RecentFlushPtr, so we can send all remaining data before shutting
1156  * down.
1157  */
1159  break;
1160 
1161  /*
1162  * We only send regular messages to the client for full decoded
1163  * transactions, but a synchronous replication and walsender shutdown
1164  * possibly are waiting for a later location. So we send pings
1165  * containing the flush location every now and then.
1166  */
1167  if (MyWalSnd->flush < sentPtr &&
1168  MyWalSnd->write < sentPtr &&
1170  {
1171  WalSndKeepalive(false);
1173  }
1174 
1175  /* check whether we're done */
1176  if (loc <= RecentFlushPtr)
1177  break;
1178 
1179  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1180  WalSndCaughtUp = true;
1181 
1182  /*
1183  * Try to flush pending output to the client. Also wait for the socket
1184  * becoming writable, if there's still pending output after an attempt
1185  * to flush. Otherwise we might just sit on output data while waiting
1186  * for new WAL being generated.
1187  */
1188  if (pq_flush_if_writable() != 0)
1189  WalSndShutdown();
1190 
1191  now = GetCurrentTimestamp();
1192 
1193  /* die if timeout was reached */
1194  WalSndCheckTimeOut(now);
1195 
1196  /* Send keepalive if the time has come */
1198 
1199  sleeptime = WalSndComputeSleeptime(now);
1200 
1201  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1203 
1204  if (pq_is_send_pending())
1205  wakeEvents |= WL_SOCKET_WRITEABLE;
1206 
1207  /* Sleep until something happens or we time out */
1208  WaitLatchOrSocket(MyLatch, wakeEvents,
1209  MyProcPort->sock, sleeptime,
1211  }
1212 
1213  /* reactivate latch so WalSndLoop knows to continue */
1214  SetLatch(MyLatch);
1215  return RecentFlushPtr;
1216 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8154
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
bool RecoveryInProgress(void)
Definition: xlog.c:7805
pgsocket sock
Definition: libpq-be.h:118
XLogRecPtr flush
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:177
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:2838
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:178
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:10984
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void SyncRepInitConfig(void)
Definition: syncrep.c:377
static bool WalSndCaughtUp
Definition: walsender.c:174
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:201
WalSnd * MyWalSnd
Definition: walsender.c:103
static XLogRecPtr sentPtr
Definition: walsender.c:148
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:2857
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1675
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1717
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static bool waiting_for_ping_response
Definition: walsender.c:162
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1328
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
void WalSndWakeup ( void  )

Definition at line 2631 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, NULL, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

2632 {
2633  int i;
2634 
2635  for (i = 0; i < max_wal_senders; i++)
2636  {
2637  Latch *latch;
2638  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2639 
2640  /*
2641  * Get latch pointer with spinlock held, for the unlikely case that
2642  * pointer reads aren't atomic (as they're 8 bytes).
2643  */
2644  SpinLockAcquire(&walsnd->mutex);
2645  latch = walsnd->latch;
2646  SpinLockRelease(&walsnd->mutex);
2647 
2648  if (latch != NULL)
2649  SetLatch(latch);
2650  }
2651 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:100
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:112
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
#define NULL
Definition: c.h:226
int i
static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1006 of file walsender.c.

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), got_SIGHUP, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, PostmasterIsAlive(), pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1008 {
1009  /* output previously gathered data in a CopyData packet */
1010  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1011 
1012  /*
1013  * Fill the send timestamp last, so that it is taken as late as possible.
1014  * This is somewhat ugly, but the protocol's set as it's already used for
1015  * several releases by streaming physical replication.
1016  */
1019  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1020  tmpbuf.data, sizeof(int64));
1021 
1022  /* fast path */
1023  /* Try to flush pending output to the client */
1024  if (pq_flush_if_writable() != 0)
1025  WalSndShutdown();
1026 
1027  if (!pq_is_send_pending())
1028  return;
1029 
1030  for (;;)
1031  {
1032  int wakeEvents;
1033  long sleeptime;
1034  TimestampTz now;
1035 
1036  /*
1037  * Emergency bailout if postmaster has died. This is to avoid the
1038  * necessity for manual cleanup of all postmaster children.
1039  */
1040  if (!PostmasterIsAlive())
1041  exit(1);
1042 
1043  /* Clear any already-pending wakeups */
1045 
1047 
1048  /* Process any requests or signals received recently */
1049  if (got_SIGHUP)
1050  {
1051  got_SIGHUP = false;
1054  }
1055 
1056  /* Check for input from the client */
1058 
1059  /* Try to flush pending output to the client */
1060  if (pq_flush_if_writable() != 0)
1061  WalSndShutdown();
1062 
1063  /* If we finished clearing the buffered data, we're done here. */
1064  if (!pq_is_send_pending())
1065  break;
1066 
1067  now = GetCurrentTimestamp();
1068 
1069  /* die if timeout was reached */
1070  WalSndCheckTimeOut(now);
1071 
1072  /* Send keepalive if the time has come */
1074 
1075  sleeptime = WalSndComputeSleeptime(now);
1076 
1077  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1079 
1080  /* Sleep until something happens or we time out */
1081  WaitLatchOrSocket(MyLatch, wakeEvents,
1082  MyProcPort->sock, sleeptime,
1084  }
1085 
1086  /* reactivate latch so WalSndLoop knows to continue */
1087  SetLatch(MyLatch);
1088 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
struct Port * MyProcPort
Definition: globals.c:40
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:40
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
pgsocket sock
Definition: libpq-be.h:118
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:177
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
#define pq_flush_if_writable()
Definition: libpq.h:40
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
void SyncRepInitConfig(void)
Definition: syncrep.c:377
Definition: guc.h:72
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void WalSndShutdown(void)
Definition: walsender.c:201
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:2857
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1675
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1717
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
static StringInfoData tmpbuf
Definition: walsender.c:153
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:57
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1328
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static void WalSndXLogSendHandler ( SIGNAL_ARGS  )
static

Definition at line 2531 of file walsender.c.

References latch_sigusr1_handler().

Referenced by WalSndSignals().

2532 {
2533  int save_errno = errno;
2534 
2536 
2537  errno = save_errno;
2538 }
void latch_sigusr1_handler(void)
Definition: latch.c:1540
static void XLogRead ( char *  buf,
XLogRecPtr  startptr,
Size  count 
)
static

Definition at line 1966 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), buf, CheckXLogRemoved(), close, curFileTimeLine, ereport, errcode_for_file_access(), errmsg(), ERROR, MAXPGPATH, WalSnd::mutex, MyWalSnd, WalSnd::needreload, PG_BINARY, read, sendFile, sendOff, sendSegNo, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegSize.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

1967 {
1968  char *p;
1969  XLogRecPtr recptr;
1970  Size nbytes;
1971  XLogSegNo segno;
1972 
1973 retry:
1974  p = buf;
1975  recptr = startptr;
1976  nbytes = count;
1977 
1978  while (nbytes > 0)
1979  {
1980  uint32 startoff;
1981  int segbytes;
1982  int readbytes;
1983 
1984  startoff = recptr % XLogSegSize;
1985 
1986  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
1987  {
1988  char path[MAXPGPATH];
1989 
1990  /* Switch to another logfile segment */
1991  if (sendFile >= 0)
1992  close(sendFile);
1993 
1994  XLByteToSeg(recptr, sendSegNo);
1995 
1996  /*-------
1997  * When reading from a historic timeline, and there is a timeline
1998  * switch within this segment, read from the WAL segment belonging
1999  * to the new timeline.
2000  *
2001  * For example, imagine that this server is currently on timeline
2002  * 5, and we're streaming timeline 4. The switch from timeline 4
2003  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2004  *
2005  * ...
2006  * 000000040000000000000012
2007  * 000000040000000000000013
2008  * 000000050000000000000013
2009  * 000000050000000000000014
2010  * ...
2011  *
2012  * In this situation, when requested to send the WAL from
2013  * segment 0x13, on timeline 4, we read the WAL from file
2014  * 000000050000000000000013. Archive recovery prefers files from
2015  * newer timelines, so if the segment was restored from the
2016  * archive on this server, the file belonging to the old timeline,
2017  * 000000040000000000000013, might not exist. Their contents are
2018  * equal up to the switchpoint, because at a timeline switch, the
2019  * used portion of the old segment is copied to the new file.
2020  *-------
2021  */
2024  {
2025  XLogSegNo endSegNo;
2026 
2028  if (sendSegNo == endSegNo)
2030  }
2031 
2033 
2034  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2035  if (sendFile < 0)
2036  {
2037  /*
2038  * If the file is not found, assume it's because the standby
2039  * asked for a too old WAL segment that has already been
2040  * removed or recycled.
2041  */
2042  if (errno == ENOENT)
2043  ereport(ERROR,
2045  errmsg("requested WAL segment %s has already been removed",
2047  else
2048  ereport(ERROR,
2050  errmsg("could not open file \"%s\": %m",
2051  path)));
2052  }
2053  sendOff = 0;
2054  }
2055 
2056  /* Need to seek in the file? */
2057  if (sendOff != startoff)
2058  {
2059  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2060  ereport(ERROR,
2062  errmsg("could not seek in log segment %s to offset %u: %m",
2064  startoff)));
2065  sendOff = startoff;
2066  }
2067 
2068  /* How many bytes are within this segment? */
2069  if (nbytes > (XLogSegSize - startoff))
2070  segbytes = XLogSegSize - startoff;
2071  else
2072  segbytes = nbytes;
2073 
2074  readbytes = read(sendFile, p, segbytes);
2075  if (readbytes <= 0)
2076  {
2077  ereport(ERROR,
2079  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2081  sendOff, (unsigned long) segbytes)));
2082  }
2083 
2084  /* Update state for read */
2085  recptr += readbytes;
2086 
2087  sendOff += readbytes;
2088  nbytes -= readbytes;
2089  p += readbytes;
2090  }
2091 
2092  /*
2093  * After reading into the buffer, check that what we read was valid. We do
2094  * this after reading, because even though the segment was present when we
2095  * opened it, it might get recycled or removed while we read it. The
2096  * read() succeeds in that case, but the data we tried to read might
2097  * already have been overwritten with new WAL records.
2098  */
2099  XLByteToSeg(startptr, segno);
2101 
2102  /*
2103  * During recovery, the currently-open WAL file might be replaced with the
2104  * file of the same name retrieved from archive. So we always need to
2105  * check what we read was valid after reading into the buffer. If it's
2106  * invalid, we try to open and read the file again.
2107  */
2109  {
2110  WalSnd *walsnd = MyWalSnd;
2111  bool reload;
2112 
2113  SpinLockAcquire(&walsnd->mutex);
2114  reload = walsnd->needreload;
2115  walsnd->needreload = false;
2116  SpinLockRelease(&walsnd->mutex);
2117 
2118  if (reload && sendFile >= 0)
2119  {
2120  close(sendFile);
2121  sendFile = -1;
2122 
2123  goto retry;
2124  }
2125  }
2126 }
#define XLogSegSize
Definition: xlog_internal.h:92
static int sendFile
Definition: walsender.c:126
#define PG_BINARY
Definition: c.h:1038
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3743
#define MAXPGPATH
static TimeLineID curFileTimeLine
Definition: walsender.c:131
#define XLogFilePath(path, tli, logSegNo)
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10040
static char * buf
Definition: pg_test_fsync.c:65
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:265
#define ereport(elevel, rest)
Definition: elog.h:122
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:103
TimeLineID ThisTimeLineID
Definition: xlog.c:178
#define XLByteToSeg(xlrp, logSegNo)
static TimeLineID sendTimeLine
Definition: walsender.c:139
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool needreload
size_t Size
Definition: c.h:353
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:140
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:142
static XLogSegNo sendSegNo
Definition: walsender.c:127
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 sendOff
Definition: walsender.c:128
#define close(a)
Definition: win32.h:17
#define XLByteInSeg(xlrp, logSegNo)
#define read(a, b, c)
Definition: win32.h:18
static bool sendTimeLineIsHistoric
Definition: walsender.c:141
bool am_cascading_walsender
Definition: walsender.c:107
int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:899
static void XLogSendLogical ( void  )
static

Definition at line 2372 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), InvalidXLogRecPtr, logical_startptr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, NULL, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication().

2373 {
2374  XLogRecord *record;
2375  char *errm;
2376 
2377  /*
2378  * Don't know whether we've caught up yet. We'll set it to true in
2379  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2380  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2381  * i.e. when we're shutting down.
2382  */
2383  WalSndCaughtUp = false;
2384 
2387 
2388  /* xlog record was invalid */
2389  if (errm != NULL)
2390  elog(ERROR, "%s", errm);
2391 
2392  if (record != NULL)
2393  {
2395 
2397  }
2398  else
2399  {
2400  /*
2401  * If the record we just wanted read is at or beyond the flushed
2402  * point, then we're caught up.
2403  */
2405  WalSndCaughtUp = true;
2406  }
2407 
2408  /* Update shared memory status */
2409  {
2410  WalSnd *walsnd = MyWalSnd;
2411 
2412  SpinLockAcquire(&walsnd->mutex);
2413  walsnd->sentPtr = sentPtr;
2414  SpinLockRelease(&walsnd->mutex);
2415  }
2416 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8154
slock_t mutex
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:188
static XLogRecPtr logical_startptr
Definition: walsender.c:189
static bool WalSndCaughtUp
Definition: walsender.c:174
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:103
static XLogRecPtr sentPtr
Definition: walsender.c:148
#define NULL
Definition: c.h:226
XLogReaderState * reader
Definition: logical.h:35
#define elog
Definition: elog.h:219
static void XLogSendPhysical ( void  )
static

Definition at line 2139 of file walsender.c.

References am_cascading_walsender, Assert, close, StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, NULL, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendFile, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, WalSndCaughtUp, and XLogRead().

Referenced by StartReplication().

2140 {
2141  XLogRecPtr SendRqstPtr;
2142  XLogRecPtr startptr;
2143  XLogRecPtr endptr;
2144  Size nbytes;
2145 
2147  {
2148  WalSndCaughtUp = true;
2149  return;
2150  }
2151 
2152  /* Figure out how far we can safely send the WAL. */
2154  {
2155  /*
2156  * Streaming an old timeline that's in this server's history, but is
2157  * not the one we're currently inserting or replaying. It can be
2158  * streamed up to the point where we switched off that timeline.
2159  */
2160  SendRqstPtr = sendTimeLineValidUpto;
2161  }
2162  else if (am_cascading_walsender)
2163  {
2164  /*
2165  * Streaming the latest timeline on a standby.
2166  *
2167  * Attempt to send all WAL that has already been replayed, so that we
2168  * know it's valid. If we're receiving WAL through streaming
2169  * replication, it's also OK to send any WAL that has been received
2170  * but not replayed.
2171  *
2172  * The timeline we're recovering from can change, or we can be
2173  * promoted. In either case, the current timeline becomes historic. We
2174  * need to detect that so that we don't try to stream past the point
2175  * where we switched to another timeline. We check for promotion or
2176  * timeline switch after calculating FlushPtr, to avoid a race
2177  * condition: if the timeline becomes historic just after we checked
2178  * that it was still current, it's still be OK to stream it up to the
2179  * FlushPtr that was calculated before it became historic.
2180  */
2181  bool becameHistoric = false;
2182 
2183  SendRqstPtr = GetStandbyFlushRecPtr();
2184 
2185  if (!RecoveryInProgress())
2186  {
2187  /*
2188  * We have been promoted. RecoveryInProgress() updated
2189  * ThisTimeLineID to the new current timeline.
2190  */
2191  am_cascading_walsender = false;
2192  becameHistoric = true;
2193  }
2194  else
2195  {
2196  /*
2197  * Still a cascading standby. But is the timeline we're sending
2198  * still the one recovery is recovering from? ThisTimeLineID was
2199  * updated by the GetStandbyFlushRecPtr() call above.
2200  */
2202  becameHistoric = true;
2203  }
2204 
2205  if (becameHistoric)
2206  {
2207  /*
2208  * The timeline we were sending has become historic. Read the
2209  * timeline history file of the new timeline to see where exactly
2210  * we forked off from the timeline we were sending.
2211  */
2212  List *history;
2213 
2216 
2218  list_free_deep(history);
2219 
2220  sendTimeLineIsHistoric = true;
2221 
2222  SendRqstPtr = sendTimeLineValidUpto;
2223  }
2224  }
2225  else
2226  {
2227  /*
2228  * Streaming the current timeline on a master.
2229  *
2230  * Attempt to send all data that's already been written out and
2231  * fsync'd to disk. We cannot go further than what's been written out
2232  * given the current implementation of XLogRead(). And in any case
2233  * it's unsafe to send WAL that is not securely down to disk on the
2234  * master: if the master subsequently crashes and restarts, slaves
2235  * must not have applied any WAL that gets lost on the master.
2236  */
2237  SendRqstPtr = GetFlushRecPtr();
2238  }
2239 
2240  /*
2241  * If this is a historic timeline and we've reached the point where we
2242  * forked to the next timeline, stop streaming.
2243  *
2244  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2245  * startup process will normally replay all WAL that has been received
2246  * from the master, before promoting, but if the WAL streaming is
2247  * terminated at a WAL page boundary, the valid portion of the timeline
2248  * might end in the middle of a WAL record. We might've already sent the
2249  * first half of that partial WAL record to the cascading standby, so that
2250  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2251  * replay the partial WAL record either, so it can still follow our
2252  * timeline switch.
2253  */
2255  {
2256  /* close the current file. */
2257  if (sendFile >= 0)
2258  close(sendFile);
2259  sendFile = -1;
2260 
2261  /* Send CopyDone */
2262  pq_putmessage_noblock('c', NULL, 0);
2263  streamingDoneSending = true;
2264 
2265  WalSndCaughtUp = true;
2266 
2267  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2269  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2270  return;
2271  }
2272 
2273  /* Do we have any work to do? */
2274  Assert(sentPtr <= SendRqstPtr);
2275  if (SendRqstPtr <= sentPtr)
2276  {
2277  WalSndCaughtUp = true;
2278  return;
2279  }
2280 
2281  /*
2282  * Figure out how much to send in one message. If there's no more than
2283  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2284  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2285  *
2286  * The rounding is not only for performance reasons. Walreceiver relies on
2287  * the fact that we never split a WAL record across two messages. Since a
2288  * long WAL record is split at page boundary into continuation records,
2289  * page boundary is always a safe cut-off point. We also assume that
2290  * SendRqstPtr never points to the middle of a WAL record.
2291  */
2292  startptr = sentPtr;
2293  endptr = startptr;
2294  endptr += MAX_SEND_SIZE;
2295 
2296  /* if we went beyond SendRqstPtr, back off */
2297  if (SendRqstPtr <= endptr)
2298  {
2299  endptr = SendRqstPtr;
2301  WalSndCaughtUp = false;
2302  else
2303  WalSndCaughtUp = true;
2304  }
2305  else
2306  {
2307  /* round down to page boundary. */
2308  endptr -= (endptr % XLOG_BLCKSZ);
2309  WalSndCaughtUp = false;
2310  }
2311 
2312  nbytes = endptr - startptr;
2313  Assert(nbytes <= MAX_SEND_SIZE);
2314 
2315  /*
2316  * OK to read and send the slice.
2317  */
2319  pq_sendbyte(&output_message, 'w');
2320 
2321  pq_sendint64(&output_message, startptr); /* dataStart */
2322  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2323  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2324 
2325  /*
2326  * Read the log directly into the output buffer to avoid extra memcpy
2327  * calls.
2328  */
2330  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2331  output_message.len += nbytes;
2333 
2334  /*
2335  * Fill the send timestamp last, so that it is taken as late as possible.
2336  */
2339  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2340  tmpbuf.data, sizeof(int64));
2341 
2343 
2344  sentPtr = endptr;
2345 
2346  /* Update shared memory status */
2347  {
2348  WalSnd *walsnd = MyWalSnd;
2349 
2350  SpinLockAcquire(&walsnd->mutex);
2351  walsnd->sentPtr = sentPtr;
2352  SpinLockRelease(&walsnd->mutex);
2353  }
2354 
2355  /* Report progress of XLOG streaming in PS display */
2357  {
2358  char activitymsg[50];
2359 
2360  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2361  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2362  set_ps_display(activitymsg, false);
2363  }
2364 
2365  return;
2366 }
#define DEBUG1
Definition: elog.h:25
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
bool update_process_title
Definition: ps_status.c:35
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:74
static int sendFile
Definition: walsender.c:126
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
static StringInfoData output_message
Definition: walsender.c:151
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8154
bool RecoveryInProgress(void)
Definition: xlog.c:7805
void list_free_deep(List *list)
Definition: list.c:1147
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
static bool streamingDoneSending
Definition: walsender.c:170
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:277
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
unsigned int uint32
Definition: c.h:265
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:1966
static bool WalSndCaughtUp
Definition: walsender.c:174
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:552
WalSnd * MyWalSnd
Definition: walsender.c:103
static XLogRecPtr sentPtr
Definition: walsender.c:148
TimeLineID ThisTimeLineID
Definition: xlog.c:178
#define NULL
Definition: c.h:226
static TimeLineID sendTimeLine
Definition: walsender.c:139
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
size_t Size
Definition: c.h:353
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:140
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:142
static StringInfoData tmpbuf
Definition: walsender.c:153
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2469
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:17
Definition: pg_list.h:45
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define MAX_SEND_SIZE
Definition: walsender.c:97
static bool sendTimeLineIsHistoric
Definition: walsender.c:141
bool am_cascading_walsender
Definition: walsender.c:107

Variable Documentation

bool am_db_walsender = false

Definition at line 109 of file walsender.c.

Referenced by InitPostgres(), and ProcessStartupPacket().

TimeLineID curFileTimeLine = 0
static

Definition at line 131 of file walsender.c.

Referenced by XLogRead().

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 177 of file walsender.c.

Referenced by WalSndLoop(), WalSndSigHupHandler(), WalSndWaitForWal(), and WalSndWriteData().

bool log_replication_commands = false

Definition at line 115 of file walsender.c.

Referenced by exec_replication_command().

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 188 of file walsender.c.

XLogRecPtr logical_startptr = InvalidXLogRecPtr
static

Definition at line 189 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

StringInfoData output_message
static

Definition at line 151 of file walsender.c.

volatile sig_atomic_t replication_active = false
static
StringInfoData reply_message
static

Definition at line 152 of file walsender.c.

int sendFile = -1
static

Definition at line 126 of file walsender.c.

Referenced by WalSndErrorCleanup(), XLogDumpXLogRead(), XLogRead(), and XLogSendPhysical().

uint32 sendOff = 0
static

Definition at line 128 of file walsender.c.

Referenced by XLogDumpXLogRead(), and XLogRead().

XLogSegNo sendSegNo = 0
static

Definition at line 127 of file walsender.c.

Referenced by XLogDumpXLogRead(), and XLogRead().

TimeLineID sendTimeLine = 0
static
bool sendTimeLineIsHistoric = false
static
TimeLineID sendTimeLineNextTLI = 0
static

Definition at line 140 of file walsender.c.

Referenced by StartReplication(), XLogRead(), and XLogSendPhysical().

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

Definition at line 142 of file walsender.c.

Referenced by StartReplication(), XLogRead(), and XLogSendPhysical().

bool streamingDoneReceiving
static

Definition at line 171 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), and WalSndLoop().

bool streamingDoneSending
static

Definition at line 170 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and XLogSendPhysical().

bool waiting_for_ping_response = false
static
bool wake_wal_senders = false

Definition at line 120 of file walsender.c.

int wal_sender_timeout = 60 * 1000
volatile sig_atomic_t walsender_ready_to_stop = false
static
bool WalSndCaughtUp = false
static