PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogutils.h"
58 
59 #include "catalog/pg_type.h"
60 #include "commands/dbcommands.h"
61 #include "commands/defrem.h"
62 #include "funcapi.h"
63 #include "libpq/libpq.h"
64 #include "libpq/pqformat.h"
65 #include "miscadmin.h"
66 #include "nodes/replnodes.h"
67 #include "pgstat.h"
68 #include "replication/basebackup.h"
69 #include "replication/decode.h"
70 #include "replication/logical.h"
72 #include "replication/slot.h"
73 #include "replication/snapbuild.h"
74 #include "replication/syncrep.h"
76 #include "replication/walsender.h"
79 #include "storage/fd.h"
80 #include "storage/ipc.h"
81 #include "storage/pmsignal.h"
82 #include "storage/proc.h"
83 #include "storage/procarray.h"
84 #include "tcop/dest.h"
85 #include "tcop/tcopprot.h"
86 #include "utils/builtins.h"
87 #include "utils/guc.h"
88 #include "utils/memutils.h"
89 #include "utils/pg_lsn.h"
90 #include "utils/portal.h"
91 #include "utils/ps_status.h"
92 #include "utils/resowner.h"
93 #include "utils/timeout.h"
94 #include "utils/timestamp.h"
95 
96 /*
97  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
98  *
99  * We don't have a good idea of what a good value would be; there's some
100  * overhead per message in both walsender and walreceiver, but on the other
101  * hand sending large batches makes walsender less responsive to signals
102  * because signals are checked only between messages. 128kB (with
103  * default 8k blocks) seems like a reasonable guess for now.
104  */
105 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
106 
107 /* Array of WalSnds in shared memory */
109 
110 /* My slot in the shared memory array */
112 
113 /* Global state */
114 bool am_walsender = false; /* Am I a walsender process? */
115 bool am_cascading_walsender = false; /* Am I cascading WAL to another
116  * standby? */
117 bool am_db_walsender = false; /* Connected to a database? */
118 
119 /* User-settable parameters for walsender */
120 int max_wal_senders = 0; /* the maximum number of concurrent
121  * walsenders */
122 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
123  * data message */
125 
126 /*
127  * State for WalSndWakeupRequest
128  */
129 bool wake_wal_senders = false;
130 
131 /*
132  * These variables are used similarly to openLogFile/SegNo/Off,
133  * but for walsender to read the XLOG.
134  */
135 static int sendFile = -1;
136 static XLogSegNo sendSegNo = 0;
137 static uint32 sendOff = 0;
138 
139 /* Timeline ID of the currently open file */
141 
142 /*
143  * These variables keep track of the state of the timeline we're currently
144  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
145  * the timeline is not the latest timeline on this server, and the server's
146  * history forked off from that timeline at sendTimeLineValidUpto.
147  */
150 static bool sendTimeLineIsHistoric = false;
152 
153 /*
154  * How far have we sent WAL already? This is also advertised in
155  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
156  */
157 static XLogRecPtr sentPtr = 0;
158 
159 /* Buffers for constructing outgoing messages and processing reply messages. */
163 
164 /*
165  * Timestamp of the last receipt of the reply from the standby. Set to 0 if
166  * wal_sender_timeout doesn't need to be active.
167  */
169 
170 /* Have we sent a heartbeat message asking for reply, since last reply? */
171 static bool waiting_for_ping_response = false;
172 
173 /*
174  * While streaming WAL in Copy mode, streamingDoneSending is set to true
175  * after we have sent CopyDone. We should not send any more CopyData messages
176  * after that. streamingDoneReceiving is set to true when we receive CopyDone
177  * from the other end. When both become true, it's time to exit Copy mode.
178  */
181 
182 /* Are we there yet? */
183 static bool WalSndCaughtUp = false;
184 
185 /* Flags set by signal handlers for later service in main loop */
186 static volatile sig_atomic_t got_SIGUSR2 = false;
187 static volatile sig_atomic_t got_STOPPING = false;
188 
189 /*
190  * This is set while we are streaming. When not set
191  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
192  * the main loop is responsible for checking got_STOPPING and terminating when
193  * it's set (after streaming any remaining WAL).
194  */
195 static volatile sig_atomic_t replication_active = false;
196 
199 
200 /* A sample associating a WAL location with the time it was written. */
201 typedef struct
202 {
205 } WalTimeSample;
206 
207 /* The size of our buffer of time samples. */
208 #define LAG_TRACKER_BUFFER_SIZE 8192
209 
210 /* A mechanism for tracking replication lag. */
211 static struct
212 {
218 } LagTracker;
219 
220 /* Signal handlers */
222 
223 /* Prototypes for private functions */
224 typedef void (*WalSndSendDataCallback) (void);
225 static void WalSndLoop(WalSndSendDataCallback send_data);
226 static void InitWalSenderSlot(void);
227 static void WalSndKill(int code, Datum arg);
229 static void XLogSendPhysical(void);
230 static void XLogSendLogical(void);
231 static void WalSndDone(WalSndSendDataCallback send_data);
232 static XLogRecPtr GetStandbyFlushRecPtr(void);
233 static void IdentifySystem(void);
236 static void StartReplication(StartReplicationCmd *cmd);
238 static void ProcessStandbyMessage(void);
239 static void ProcessStandbyReplyMessage(void);
240 static void ProcessStandbyHSFeedbackMessage(void);
241 static void ProcessRepliesIfAny(void);
242 static void WalSndKeepalive(bool requestReply);
244 static void WalSndCheckTimeOut(TimestampTz now);
245 static long WalSndComputeSleeptime(TimestampTz now);
246 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
247 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
250 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
251 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
253 
254 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
255 
256 
257 /* Initialize walsender process before entering the main command loop */
258 void
259 InitWalSender(void)
260 {
262 
263  /* Create a per-walsender data structure in shared memory */
265 
266  /* Set up resource owner */
267  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
268 
269  /*
270  * Let postmaster know that we're a WAL sender. Once we've declared us as
271  * a WAL sender process, postmaster will let us outlive the bgwriter and
272  * kill us last in the shutdown sequence, so we get a chance to stream all
273  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
274  * there's no going back, and we mustn't write any WAL records after this.
275  */
278 
279  /* Initialize empty timestamp buffer for lag tracking. */
280  memset(&LagTracker, 0, sizeof(LagTracker));
281 }
282 
283 /*
284  * Clean up after an error.
285  *
286  * WAL sender processes don't use transactions like regular backends do.
287  * This function does any cleanup required after an error in a WAL sender
288  * process, similar to what transaction abort does in a regular backend.
289  */
290 void
292 {
296 
297  if (sendFile >= 0)
298  {
299  close(sendFile);
300  sendFile = -1;
301  }
302 
303  if (MyReplicationSlot != NULL)
305 
307 
308  replication_active = false;
309 
310  if (got_STOPPING || got_SIGUSR2)
311  proc_exit(0);
312 
313  /* Revert back to startup state */
315 }
316 
317 /*
318  * Handle a client's connection abort in an orderly manner.
319  */
320 static void
321 WalSndShutdown(void)
322 {
323  /*
324  * Reset whereToSendOutput to prevent ereport from attempting to send any
325  * more messages to the standby.
326  */
329 
330  proc_exit(0);
331  abort(); /* keep the compiler quiet */
332 }
333 
334 /*
335  * Handle the IDENTIFY_SYSTEM command.
336  */
337 static void
339 {
340  char sysid[32];
341  char xloc[MAXFNAMELEN];
342  XLogRecPtr logptr;
343  char *dbname = NULL;
344  DestReceiver *dest;
345  TupOutputState *tstate;
346  TupleDesc tupdesc;
347  Datum values[4];
348  bool nulls[4];
349 
350  /*
351  * Reply with a result set with one row, four columns. First col is system
352  * ID, second is timeline ID, third is current xlog location and the
353  * fourth contains the database name if we are connected to one.
354  */
355 
356  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
358 
361  {
362  /* this also updates ThisTimeLineID */
363  logptr = GetStandbyFlushRecPtr();
364  }
365  else
366  logptr = GetFlushRecPtr();
367 
368  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
369 
370  if (MyDatabaseId != InvalidOid)
371  {
373 
374  /* syscache access needs a transaction env. */
376  /* make dbname live outside TX context */
380  /* CommitTransactionCommand switches to TopMemoryContext */
382  }
383 
385  MemSet(nulls, false, sizeof(nulls));
386 
387  /* need a tuple descriptor representing four columns */
388  tupdesc = CreateTemplateTupleDesc(4, false);
389  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
390  TEXTOID, -1, 0);
391  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
392  INT4OID, -1, 0);
393  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
394  TEXTOID, -1, 0);
395  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
396  TEXTOID, -1, 0);
397 
398  /* prepare for projection of tuples */
399  tstate = begin_tup_output_tupdesc(dest, tupdesc);
400 
401  /* column 1: system identifier */
402  values[0] = CStringGetTextDatum(sysid);
403 
404  /* column 2: timeline */
405  values[1] = Int32GetDatum(ThisTimeLineID);
406 
407  /* column 3: wal location */
408  values[2] = CStringGetTextDatum(xloc);
409 
410  /* column 4: database name, or NULL if none */
411  if (dbname)
412  values[3] = CStringGetTextDatum(dbname);
413  else
414  nulls[3] = true;
415 
416  /* send it to dest */
417  do_tup_output(tstate, values, nulls);
418 
419  end_tup_output(tstate);
420 }
421 
422 
423 /*
424  * Handle TIMELINE_HISTORY command.
425  */
426 static void
428 {
430  char histfname[MAXFNAMELEN];
431  char path[MAXPGPATH];
432  int fd;
433  off_t histfilelen;
434  off_t bytesleft;
435  Size len;
436 
437  /*
438  * Reply with a result set with one row, and two columns. The first col is
439  * the name of the history file, 2nd is the contents.
440  */
441 
442  TLHistoryFileName(histfname, cmd->timeline);
443  TLHistoryFilePath(path, cmd->timeline);
444 
445  /* Send a RowDescription message */
446  pq_beginmessage(&buf, 'T');
447  pq_sendint(&buf, 2, 2); /* 2 fields */
448 
449  /* first field */
450  pq_sendstring(&buf, "filename"); /* col name */
451  pq_sendint(&buf, 0, 4); /* table oid */
452  pq_sendint(&buf, 0, 2); /* attnum */
453  pq_sendint(&buf, TEXTOID, 4); /* type oid */
454  pq_sendint(&buf, -1, 2); /* typlen */
455  pq_sendint(&buf, 0, 4); /* typmod */
456  pq_sendint(&buf, 0, 2); /* format code */
457 
458  /* second field */
459  pq_sendstring(&buf, "content"); /* col name */
460  pq_sendint(&buf, 0, 4); /* table oid */
461  pq_sendint(&buf, 0, 2); /* attnum */
462  pq_sendint(&buf, BYTEAOID, 4); /* type oid */
463  pq_sendint(&buf, -1, 2); /* typlen */
464  pq_sendint(&buf, 0, 4); /* typmod */
465  pq_sendint(&buf, 0, 2); /* format code */
466  pq_endmessage(&buf);
467 
468  /* Send a DataRow message */
469  pq_beginmessage(&buf, 'D');
470  pq_sendint(&buf, 2, 2); /* # of columns */
471  len = strlen(histfname);
472  pq_sendint(&buf, len, 4); /* col1 len */
473  pq_sendbytes(&buf, histfname, len);
474 
475  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
476  if (fd < 0)
477  ereport(ERROR,
479  errmsg("could not open file \"%s\": %m", path)));
480 
481  /* Determine file length and send it to client */
482  histfilelen = lseek(fd, 0, SEEK_END);
483  if (histfilelen < 0)
484  ereport(ERROR,
486  errmsg("could not seek to end of file \"%s\": %m", path)));
487  if (lseek(fd, 0, SEEK_SET) != 0)
488  ereport(ERROR,
490  errmsg("could not seek to beginning of file \"%s\": %m", path)));
491 
492  pq_sendint(&buf, histfilelen, 4); /* col2 len */
493 
494  bytesleft = histfilelen;
495  while (bytesleft > 0)
496  {
497  char rbuf[BLCKSZ];
498  int nread;
499 
501  nread = read(fd, rbuf, sizeof(rbuf));
503  if (nread <= 0)
504  ereport(ERROR,
506  errmsg("could not read file \"%s\": %m",
507  path)));
508  pq_sendbytes(&buf, rbuf, nread);
509  bytesleft -= nread;
510  }
511  CloseTransientFile(fd);
512 
513  pq_endmessage(&buf);
514 }
515 
516 /*
517  * Handle START_REPLICATION command.
518  *
519  * At the moment, this never returns, but an ereport(ERROR) will take us back
520  * to the main loop.
521  */
522 static void
524 {
526  XLogRecPtr FlushPtr;
527 
528  if (ThisTimeLineID == 0)
529  ereport(ERROR,
530  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
531  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
532 
533  /*
534  * We assume here that we're logging enough information in the WAL for
535  * log-shipping, since this is checked in PostmasterMain().
536  *
537  * NOTE: wal_level can only change at shutdown, so in most cases it is
538  * difficult for there to be WAL data that we can still see that was
539  * written at wal_level='minimal'.
540  */
541 
542  if (cmd->slotname)
543  {
546  ereport(ERROR,
547  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548  (errmsg("cannot use a logical replication slot for physical replication"))));
549  }
550 
551  /*
552  * Select the timeline. If it was given explicitly by the client, use
553  * that. Otherwise use the timeline of the last replayed record, which is
554  * kept in ThisTimeLineID.
555  */
557  {
558  /* this also updates ThisTimeLineID */
559  FlushPtr = GetStandbyFlushRecPtr();
560  }
561  else
562  FlushPtr = GetFlushRecPtr();
563 
564  if (cmd->timeline != 0)
565  {
566  XLogRecPtr switchpoint;
567 
568  sendTimeLine = cmd->timeline;
570  {
571  sendTimeLineIsHistoric = false;
573  }
574  else
575  {
576  List *timeLineHistory;
577 
578  sendTimeLineIsHistoric = true;
579 
580  /*
581  * Check that the timeline the client requested exists, and the
582  * requested start location is on that timeline.
583  */
584  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
585  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
587  list_free_deep(timeLineHistory);
588 
589  /*
590  * Found the requested timeline in the history. Check that
591  * requested startpoint is on that timeline in our history.
592  *
593  * This is quite loose on purpose. We only check that we didn't
594  * fork off the requested timeline before the switchpoint. We
595  * don't check that we switched *to* it before the requested
596  * starting point. This is because the client can legitimately
597  * request to start replication from the beginning of the WAL
598  * segment that contains switchpoint, but on the new timeline, so
599  * that it doesn't end up with a partial segment. If you ask for
600  * too old a starting point, you'll get an error later when we
601  * fail to find the requested WAL segment in pg_wal.
602  *
603  * XXX: we could be more strict here and only allow a startpoint
604  * that's older than the switchpoint, if it's still in the same
605  * WAL segment.
606  */
607  if (!XLogRecPtrIsInvalid(switchpoint) &&
608  switchpoint < cmd->startpoint)
609  {
610  ereport(ERROR,
611  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
612  (uint32) (cmd->startpoint >> 32),
613  (uint32) (cmd->startpoint),
614  cmd->timeline),
615  errdetail("This server's history forked from timeline %u at %X/%X.",
616  cmd->timeline,
617  (uint32) (switchpoint >> 32),
618  (uint32) (switchpoint))));
619  }
620  sendTimeLineValidUpto = switchpoint;
621  }
622  }
623  else
624  {
627  sendTimeLineIsHistoric = false;
628  }
629 
631 
632  /* If there is nothing to stream, don't even enter COPY mode */
634  {
635  /*
636  * When we first start replication the standby will be behind the
637  * primary. For some applications, for example synchronous
638  * replication, it is important to have a clear state for this initial
639  * catchup mode, so we can trigger actions when we change streaming
640  * state later. We may stay in this state for a long time, which is
641  * exactly why we want to be able to monitor whether or not we are
642  * still here.
643  */
645 
646  /* Send a CopyBothResponse message, and start streaming */
647  pq_beginmessage(&buf, 'W');
648  pq_sendbyte(&buf, 0);
649  pq_sendint(&buf, 0, 2);
650  pq_endmessage(&buf);
651  pq_flush();
652 
653  /*
654  * Don't allow a request to stream from a future point in WAL that
655  * hasn't been flushed to disk in this server yet.
656  */
657  if (FlushPtr < cmd->startpoint)
658  {
659  ereport(ERROR,
660  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
661  (uint32) (cmd->startpoint >> 32),
662  (uint32) (cmd->startpoint),
663  (uint32) (FlushPtr >> 32),
664  (uint32) (FlushPtr))));
665  }
666 
667  /* Start streaming from the requested point */
668  sentPtr = cmd->startpoint;
669 
670  /* Initialize shared memory status, too */
671  SpinLockAcquire(&MyWalSnd->mutex);
672  MyWalSnd->sentPtr = sentPtr;
673  SpinLockRelease(&MyWalSnd->mutex);
674 
676 
677  /* Main loop of walsender */
678  replication_active = true;
679 
681 
682  replication_active = false;
683  if (got_STOPPING)
684  proc_exit(0);
686 
688  }
689 
690  if (cmd->slotname)
692 
693  /*
694  * Copy is finished now. Send a single-row result set indicating the next
695  * timeline.
696  */
698  {
699  char startpos_str[8 + 1 + 8 + 1];
700  DestReceiver *dest;
701  TupOutputState *tstate;
702  TupleDesc tupdesc;
703  Datum values[2];
704  bool nulls[2];
705 
706  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
707  (uint32) (sendTimeLineValidUpto >> 32),
709 
711  MemSet(nulls, false, sizeof(nulls));
712 
713  /*
714  * Need a tuple descriptor representing two columns. int8 may seem
715  * like a surprising data type for this, but in theory int4 would not
716  * be wide enough for this, as TimeLineID is unsigned.
717  */
718  tupdesc = CreateTemplateTupleDesc(2, false);
719  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
720  INT8OID, -1, 0);
721  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
722  TEXTOID, -1, 0);
723 
724  /* prepare for projection of tuple */
725  tstate = begin_tup_output_tupdesc(dest, tupdesc);
726 
727  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
728  values[1] = CStringGetTextDatum(startpos_str);
729 
730  /* send it to dest */
731  do_tup_output(tstate, values, nulls);
732 
733  end_tup_output(tstate);
734  }
735 
736  /* Send CommandComplete message */
737  pq_puttextmessage('C', "START_STREAMING");
738 }
739 
740 /*
741  * read_page callback for logical decoding contexts, as a walsender process.
742  *
743  * Inside the walsender we can do better than logical_read_local_xlog_page,
744  * which has to do a plain sleep/busy loop, because the walsender's latch gets
745  * set every time WAL is flushed.
746  */
747 static int
749  XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
750 {
751  XLogRecPtr flushptr;
752  int count;
753 
754  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
756  sendTimeLine = state->currTLI;
758  sendTimeLineNextTLI = state->nextTLI;
759 
760  /* make sure we have enough WAL available */
761  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
762 
763  /* fail if not (implies we are going to shut down) */
764  if (flushptr < targetPagePtr + reqLen)
765  return -1;
766 
767  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
768  count = XLOG_BLCKSZ; /* more than one block available */
769  else
770  count = flushptr - targetPagePtr; /* part of the page available */
771 
772  /* now actually read the data, we know it's there */
773  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
774 
775  return count;
776 }
777 
778 /*
779  * Process extra options given to CREATE_REPLICATION_SLOT.
780  */
781 static void
783  bool *reserve_wal,
784  CRSSnapshotAction *snapshot_action)
785 {
786  ListCell *lc;
787  bool snapshot_action_given = false;
788  bool reserve_wal_given = false;
789 
790  /* Parse options */
791  foreach(lc, cmd->options)
792  {
793  DefElem *defel = (DefElem *) lfirst(lc);
794 
795  if (strcmp(defel->defname, "export_snapshot") == 0)
796  {
797  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
798  ereport(ERROR,
799  (errcode(ERRCODE_SYNTAX_ERROR),
800  errmsg("conflicting or redundant options")));
801 
802  snapshot_action_given = true;
803  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
805  }
806  else if (strcmp(defel->defname, "use_snapshot") == 0)
807  {
808  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
809  ereport(ERROR,
810  (errcode(ERRCODE_SYNTAX_ERROR),
811  errmsg("conflicting or redundant options")));
812 
813  snapshot_action_given = true;
814  *snapshot_action = CRS_USE_SNAPSHOT;
815  }
816  else if (strcmp(defel->defname, "reserve_wal") == 0)
817  {
818  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
819  ereport(ERROR,
820  (errcode(ERRCODE_SYNTAX_ERROR),
821  errmsg("conflicting or redundant options")));
822 
823  reserve_wal_given = true;
824  *reserve_wal = true;
825  }
826  else
827  elog(ERROR, "unrecognized option: %s", defel->defname);
828  }
829 }
830 
831 /*
832  * Create a new replication slot.
833  */
834 static void
836 {
837  const char *snapshot_name = NULL;
838  char xloc[MAXFNAMELEN];
839  char *slot_name;
840  bool reserve_wal = false;
841  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
842  DestReceiver *dest;
843  TupOutputState *tstate;
844  TupleDesc tupdesc;
845  Datum values[4];
846  bool nulls[4];
847 
849 
850  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
851 
852  /* setup state for XLogReadPage */
853  sendTimeLineIsHistoric = false;
855 
856  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
857  {
858  ReplicationSlotCreate(cmd->slotname, false,
860  }
861  else
862  {
864 
865  /*
866  * Initially create persistent slot as ephemeral - that allows us to
867  * nicely handle errors during initialization because it'll get
868  * dropped if this transaction fails. We'll make it persistent at the
869  * end. Temporary slots can be created as temporary from beginning as
870  * they get dropped on error as well.
871  */
872  ReplicationSlotCreate(cmd->slotname, true,
874  }
875 
876  if (cmd->kind == REPLICATION_KIND_LOGICAL)
877  {
879  bool need_full_snapshot = false;
880 
881  /*
882  * Do options check early so that we can bail before calling the
883  * DecodingContextFindStartpoint which can take long time.
884  */
885  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
886  {
887  if (IsTransactionBlock())
888  ereport(ERROR,
889  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
890  "must not be called inside a transaction")));
891 
892  need_full_snapshot = true;
893  }
894  else if (snapshot_action == CRS_USE_SNAPSHOT)
895  {
896  if (!IsTransactionBlock())
897  ereport(ERROR,
898  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
899  "must be called inside a transaction")));
900 
902  ereport(ERROR,
903  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
904  "must be called in REPEATABLE READ isolation mode transaction")));
905 
906  if (FirstSnapshotSet)
907  ereport(ERROR,
908  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
909  "must be called before any query")));
910 
911  if (IsSubTransaction())
912  ereport(ERROR,
913  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
914  "must not be called in a subtransaction")));
915 
916  need_full_snapshot = true;
917  }
918 
919  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
923 
924  /*
925  * Signal that we don't need the timeout mechanism. We're just
926  * creating the replication slot and don't yet accept feedback
927  * messages or send keepalives. As we possibly need to wait for
928  * further WAL the walsender would otherwise possibly be killed too
929  * soon.
930  */
932 
933  /* build initial snapshot, might take a while */
935 
936  /*
937  * Export or use the snapshot if we've been asked to do so.
938  *
939  * NB. We will convert the snapbuild.c kind of snapshot to normal
940  * snapshot when doing this.
941  */
942  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
943  {
944  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
945  }
946  else if (snapshot_action == CRS_USE_SNAPSHOT)
947  {
948  Snapshot snap;
949 
952  }
953 
954  /* don't need the decoding context anymore */
955  FreeDecodingContext(ctx);
956 
957  if (!cmd->temporary)
959  }
960  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
961  {
963 
965 
966  /* Write this slot to disk if it's a permanent one. */
967  if (!cmd->temporary)
969  }
970 
971  snprintf(xloc, sizeof(xloc), "%X/%X",
974 
976  MemSet(nulls, false, sizeof(nulls));
977 
978  /*----------
979  * Need a tuple descriptor representing four columns:
980  * - first field: the slot name
981  * - second field: LSN at which we became consistent
982  * - third field: exported snapshot's name
983  * - fourth field: output plugin
984  *----------
985  */
986  tupdesc = CreateTemplateTupleDesc(4, false);
987  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
988  TEXTOID, -1, 0);
989  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
990  TEXTOID, -1, 0);
991  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
992  TEXTOID, -1, 0);
993  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
994  TEXTOID, -1, 0);
995 
996  /* prepare for projection of tuples */
997  tstate = begin_tup_output_tupdesc(dest, tupdesc);
998 
999  /* slot_name */
1000  slot_name = NameStr(MyReplicationSlot->data.name);
1001  values[0] = CStringGetTextDatum(slot_name);
1002 
1003  /* consistent wal location */
1004  values[1] = CStringGetTextDatum(xloc);
1005 
1006  /* snapshot name, or NULL if none */
1007  if (snapshot_name != NULL)
1008  values[2] = CStringGetTextDatum(snapshot_name);
1009  else
1010  nulls[2] = true;
1011 
1012  /* plugin, or NULL if none */
1013  if (cmd->plugin != NULL)
1014  values[3] = CStringGetTextDatum(cmd->plugin);
1015  else
1016  nulls[3] = true;
1017 
1018  /* send it to dest */
1019  do_tup_output(tstate, values, nulls);
1020  end_tup_output(tstate);
1021 
1023 }
1024 
1025 /*
1026  * Get rid of a replication slot that is no longer wanted.
1027  */
1028 static void
1030 {
1032  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1033 }
1034 
1035 /*
1036  * Load previously initiated logical slot and prepare for sending data (via
1037  * WalSndLoop).
1038  */
1039 static void
1041 {
1043 
1044  /* make sure that our requirements are still fulfilled */
1046 
1048 
1050 
1051  /*
1052  * Force a disconnect, so that the decoding code doesn't need to care
1053  * about an eventual switch from running in recovery, to running in a
1054  * normal environment. Client code is expected to handle reconnects.
1055  */
1057  {
1058  ereport(LOG,
1059  (errmsg("terminating walsender process after promotion")));
1060  got_STOPPING = true;
1061  }
1062 
1064 
1065  /* Send a CopyBothResponse message, and start streaming */
1066  pq_beginmessage(&buf, 'W');
1067  pq_sendbyte(&buf, 0);
1068  pq_sendint(&buf, 0, 2);
1069  pq_endmessage(&buf);
1070  pq_flush();
1071 
1072  /*
1073  * Initialize position to the last ack'ed one, then the xlog records begin
1074  * to be shipped from that position.
1075  */
1076  logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
1081 
1082  /* Start reading WAL from the oldest required WAL. */
1084 
1085  /*
1086  * Report the location after which we'll send out further commits as the
1087  * current sentPtr.
1088  */
1090 
1091  /* Also update the sent position status in shared memory */
1092  SpinLockAcquire(&MyWalSnd->mutex);
1094  SpinLockRelease(&MyWalSnd->mutex);
1095 
1096  replication_active = true;
1097 
1099 
1100  /* Main loop of walsender */
1102 
1103  FreeDecodingContext(logical_decoding_ctx);
1105 
1106  replication_active = false;
1107  if (got_STOPPING)
1108  proc_exit(0);
1110 
1111  /* Get out of COPY mode (CommandComplete). */
1112  EndCommand("COPY 0", DestRemote);
1113 }
1114 
1115 /*
1116  * LogicalDecodingContext 'prepare_write' callback.
1117  *
1118  * Prepare a write into a StringInfo.
1119  *
1120  * Don't do anything lasting in here, it's quite possible that nothing will be done
1121  * with the data.
1122  */
1123 static void
1125 {
1126  /* can't have sync rep confused by sending the same LSN several times */
1127  if (!last_write)
1128  lsn = InvalidXLogRecPtr;
1129 
1130  resetStringInfo(ctx->out);
1131 
1132  pq_sendbyte(ctx->out, 'w');
1133  pq_sendint64(ctx->out, lsn); /* dataStart */
1134  pq_sendint64(ctx->out, lsn); /* walEnd */
1135 
1136  /*
1137  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1138  * reserve space here.
1139  */
1140  pq_sendint64(ctx->out, 0); /* sendtime */
1141 }
1142 
1143 /*
1144  * LogicalDecodingContext 'write' callback.
1145  *
1146  * Actually write out data previously prepared by WalSndPrepareWrite out to
1147  * the network. Take as long as needed, but process replies from the other
1148  * side and check timeouts during that.
1149  */
1150 static void
1152  bool last_write)
1153 {
1154  /* output previously gathered data in a CopyData packet */
1155  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1156 
1157  /*
1158  * Fill the send timestamp last, so that it is taken as late as possible.
1159  * This is somewhat ugly, but the protocol is set as it's already used for
1160  * several releases by streaming physical replication.
1161  */
1162  resetStringInfo(&tmpbuf);
1163  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
1164  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1165  tmpbuf.data, sizeof(int64));
1166 
1167  /* fast path */
1168  /* Try to flush pending output to the client */
1169  if (pq_flush_if_writable() != 0)
1170  WalSndShutdown();
1171 
1172  if (!pq_is_send_pending())
1173  return;
1174 
1175  for (;;)
1176  {
1177  int wakeEvents;
1178  long sleeptime;
1179  TimestampTz now;
1180 
1181  /*
1182  * Emergency bailout if postmaster has died. This is to avoid the
1183  * necessity for manual cleanup of all postmaster children.
1184  */
1185  if (!PostmasterIsAlive())
1186  exit(1);
1187 
1188  /* Clear any already-pending wakeups */
1190 
1192 
1193  /* Process any requests or signals received recently */
1194  if (ConfigReloadPending)
1195  {
1196  ConfigReloadPending = false;
1199  }
1200 
1201  /* Check for input from the client */
1203 
1204  /* Try to flush pending output to the client */
1205  if (pq_flush_if_writable() != 0)
1206  WalSndShutdown();
1207 
1208  /* If we finished clearing the buffered data, we're done here. */
1209  if (!pq_is_send_pending())
1210  break;
1211 
1212  now = GetCurrentTimestamp();
1213 
1214  /* die if timeout was reached */
1215  WalSndCheckTimeOut(now);
1216 
1217  /* Send keepalive if the time has come */
1219 
1220  sleeptime = WalSndComputeSleeptime(now);
1221 
1222  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1224 
1225  /* Sleep until something happens or we time out */
1226  WaitLatchOrSocket(MyLatch, wakeEvents,
1227  MyProcPort->sock, sleeptime,
1229  }
1230 
1231  /* reactivate latch so WalSndLoop knows to continue */
1232  SetLatch(MyLatch);
1233 }
1234 
1235 /*
1236  * LogicalDecodingContext 'progress_update' callback.
1237  *
1238  * Write the current position to the log tracker (see XLogSendPhysical).
1239  */
1240 static void
1242 {
1243  static TimestampTz sendTime = 0;
1245 
1246  /*
1247  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1248  * avoid flooding the lag tracker when we commit frequently.
1249  */
1250 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1251  if (!TimestampDifferenceExceeds(sendTime, now,
1253  return;
1254 
1255  LagTrackerWrite(lsn, now);
1256  sendTime = now;
1257 }
1258 
1259 /*
1260  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1261  *
1262  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1263  * if we detect a shutdown request (either from postmaster or client)
1264  * we will return early, so caller must always check.
1265  */
1266 static XLogRecPtr
1268 {
1269  int wakeEvents;
1270  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1271 
1272 
1273  /*
1274  * Fast path to avoid acquiring the spinlock in case we already know we
1275  * have enough WAL available. This is particularly interesting if we're
1276  * far behind.
1277  */
1278  if (RecentFlushPtr != InvalidXLogRecPtr &&
1279  loc <= RecentFlushPtr)
1280  return RecentFlushPtr;
1281 
1282  /* Get a more recent flush pointer. */
1283  if (!RecoveryInProgress())
1284  RecentFlushPtr = GetFlushRecPtr();
1285  else
1286  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1287 
1288  for (;;)
1289  {
1290  long sleeptime;
1291  TimestampTz now;
1292 
1293  /*
1294  * Emergency bailout if postmaster has died. This is to avoid the
1295  * necessity for manual cleanup of all postmaster children.
1296  */
1297  if (!PostmasterIsAlive())
1298  exit(1);
1299 
1300  /* Clear any already-pending wakeups */
1302 
1304 
1305  /* Process any requests or signals received recently */
1306  if (ConfigReloadPending)
1307  {
1308  ConfigReloadPending = false;
1311  }
1312 
1313  /* Check for input from the client */
1315 
1316  /*
1317  * If we're shutting down, trigger pending WAL to be written out,
1318  * otherwise we'd possibly end up waiting for WAL that never gets
1319  * written, because walwriter has shut down already.
1320  */
1321  if (got_STOPPING)
1323 
1324  /* Update our idea of the currently flushed position. */
1325  if (!RecoveryInProgress())
1326  RecentFlushPtr = GetFlushRecPtr();
1327  else
1328  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1329 
1330  /*
1331  * If postmaster asked us to stop, don't wait anymore.
1332  *
1333  * It's important to do this check after the recomputation of
1334  * RecentFlushPtr, so we can send all remaining data before shutting
1335  * down.
1336  */
1337  if (got_STOPPING)
1338  break;
1339 
1340  /*
1341  * We only send regular messages to the client for full decoded
1342  * transactions, but a synchronous replication and walsender shutdown
1343  * possibly are waiting for a later location. So we send pings
1344  * containing the flush location every now and then.
1345  */
1346  if (MyWalSnd->flush < sentPtr &&
1347  MyWalSnd->write < sentPtr &&
1349  {
1350  WalSndKeepalive(false);
1352  }
1353 
1354  /* check whether we're done */
1355  if (loc <= RecentFlushPtr)
1356  break;
1357 
1358  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1359  WalSndCaughtUp = true;
1360 
1361  /*
1362  * Try to flush any pending output to the client.
1363  */
1364  if (pq_flush_if_writable() != 0)
1365  WalSndShutdown();
1366 
1367  /*
1368  * If we have received CopyDone from the client, sent CopyDone
1369  * ourselves, and the output buffer is empty, it's time to exit
1370  * streaming, so fail the current WAL fetch request.
1371  */
1373  !pq_is_send_pending())
1374  break;
1375 
1376  now = GetCurrentTimestamp();
1377 
1378  /* die if timeout was reached */
1379  WalSndCheckTimeOut(now);
1380 
1381  /* Send keepalive if the time has come */
1383 
1384  /*
1385  * Sleep until something happens or we time out. Also wait for the
1386  * socket becoming writable, if there's still pending output.
1387  * Otherwise we might sit on sendable output data while waiting for
1388  * new WAL to be generated. (But if we have nothing to send, we don't
1389  * want to wake on socket-writable.)
1390  */
1391  sleeptime = WalSndComputeSleeptime(now);
1392 
1393  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1395 
1396  if (pq_is_send_pending())
1397  wakeEvents |= WL_SOCKET_WRITEABLE;
1398 
1399  WaitLatchOrSocket(MyLatch, wakeEvents,
1400  MyProcPort->sock, sleeptime,
1402  }
1403 
1404  /* reactivate latch so WalSndLoop knows to continue */
1405  SetLatch(MyLatch);
1406  return RecentFlushPtr;
1407 }
1408 
1409 /*
1410  * Execute an incoming replication command.
1411  *
1412  * Returns true if the cmd_string was recognized as WalSender command, false
1413  * if not.
1414  */
1415 bool
1416 exec_replication_command(const char *cmd_string)
1417 {
1418  int parse_rc;
1419  Node *cmd_node;
1420  MemoryContext cmd_context;
1421  MemoryContext old_context;
1422 
1423  /*
1424  * If WAL sender has been told that shutdown is getting close, switch its
1425  * status accordingly to handle the next replication commands correctly.
1426  */
1427  if (got_STOPPING)
1429 
1430  /*
1431  * Throw error if in stopping mode. We need prevent commands that could
1432  * generate WAL while the shutdown checkpoint is being written. To be
1433  * safe, we just prohibit all new commands.
1434  */
1435  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1436  ereport(ERROR,
1437  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1438 
1439  /*
1440  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1441  * command arrives. Clean up the old stuff if there's anything.
1442  */
1444 
1446 
1448  "Replication command context",
1450  old_context = MemoryContextSwitchTo(cmd_context);
1451 
1452  replication_scanner_init(cmd_string);
1453  parse_rc = replication_yyparse();
1454  if (parse_rc != 0)
1455  ereport(ERROR,
1456  (errcode(ERRCODE_SYNTAX_ERROR),
1457  (errmsg_internal("replication command parser returned %d",
1458  parse_rc))));
1459 
1460  cmd_node = replication_parse_result;
1461 
1462  /*
1463  * Log replication command if log_replication_commands is enabled. Even
1464  * when it's disabled, log the command with DEBUG1 level for backward
1465  * compatibility. Note that SQL commands are not logged here, and will be
1466  * logged later if log_statement is enabled.
1467  */
1468  if (cmd_node->type != T_SQLCmd)
1470  (errmsg("received replication command: %s", cmd_string)));
1471 
1472  /*
1473  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1474  * called outside of transaction the snapshot should be cleared here.
1475  */
1476  if (!IsTransactionBlock())
1478 
1479  /*
1480  * For aborted transactions, don't allow anything except pure SQL, the
1481  * exec_simple_query() will handle it correctly.
1482  */
1483  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1484  ereport(ERROR,
1485  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1486  errmsg("current transaction is aborted, "
1487  "commands ignored until end of transaction block")));
1488 
1490 
1491  /*
1492  * Allocate buffers that will be used for each outgoing and incoming
1493  * message. We do this just once per command to reduce palloc overhead.
1494  */
1495  initStringInfo(&output_message);
1496  initStringInfo(&reply_message);
1497  initStringInfo(&tmpbuf);
1498 
1499  switch (cmd_node->type)
1500  {
1501  case T_IdentifySystemCmd:
1502  IdentifySystem();
1503  break;
1504 
1505  case T_BaseBackupCmd:
1506  PreventTransactionChain(true, "BASE_BACKUP");
1507  SendBaseBackup((BaseBackupCmd *) cmd_node);
1508  break;
1509 
1512  break;
1513 
1516  break;
1517 
1518  case T_StartReplicationCmd:
1519  {
1520  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1521 
1522  PreventTransactionChain(true, "START_REPLICATION");
1523 
1524  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1525  StartReplication(cmd);
1526  else
1528  break;
1529  }
1530 
1531  case T_TimeLineHistoryCmd:
1532  PreventTransactionChain(true, "TIMELINE_HISTORY");
1534  break;
1535 
1536  case T_VariableShowStmt:
1537  {
1539  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1540 
1541  GetPGVariable(n->name, dest);
1542  }
1543  break;
1544 
1545  case T_SQLCmd:
1546  if (MyDatabaseId == InvalidOid)
1547  ereport(ERROR,
1548  (errmsg("not connected to database")));
1549 
1550  /* Tell the caller that this wasn't a WalSender command. */
1551  return false;
1552 
1553  default:
1554  elog(ERROR, "unrecognized replication command node tag: %u",
1555  cmd_node->type);
1556  }
1557 
1558  /* done */
1559  MemoryContextSwitchTo(old_context);
1560  MemoryContextDelete(cmd_context);
1561 
1562  /* Send CommandComplete message */
1563  EndCommand("SELECT", DestRemote);
1564 
1565  return true;
1566 }
1567 
1568 /*
1569  * Process any incoming messages while streaming. Also checks if the remote
1570  * end has closed the connection.
1571  */
1572 static void
1574 {
1575  unsigned char firstchar;
1576  int r;
1577  bool received = false;
1578 
1579  for (;;)
1580  {
1581  pq_startmsgread();
1582  r = pq_getbyte_if_available(&firstchar);
1583  if (r < 0)
1584  {
1585  /* unexpected error or EOF */
1587  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1588  errmsg("unexpected EOF on standby connection")));
1589  proc_exit(0);
1590  }
1591  if (r == 0)
1592  {
1593  /* no data available without blocking */
1594  pq_endmsgread();
1595  break;
1596  }
1597 
1598  /* Read the message contents */
1599  resetStringInfo(&reply_message);
1600  if (pq_getmessage(&reply_message, 0))
1601  {
1603  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1604  errmsg("unexpected EOF on standby connection")));
1605  proc_exit(0);
1606  }
1607 
1608  /*
1609  * If we already received a CopyDone from the frontend, the frontend
1610  * should not send us anything until we've closed our end of the COPY.
1611  * XXX: In theory, the frontend could already send the next command
1612  * before receiving the CopyDone, but libpq doesn't currently allow
1613  * that.
1614  */
1615  if (streamingDoneReceiving && firstchar != 'X')
1616  ereport(FATAL,
1617  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1618  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1619  firstchar)));
1620 
1621  /* Handle the very limited subset of commands expected in this phase */
1622  switch (firstchar)
1623  {
1624  /*
1625  * 'd' means a standby reply wrapped in a CopyData packet.
1626  */
1627  case 'd':
1629  received = true;
1630  break;
1631 
1632  /*
1633  * CopyDone means the standby requested to finish streaming.
1634  * Reply with CopyDone, if we had not sent that already.
1635  */
1636  case 'c':
1637  if (!streamingDoneSending)
1638  {
1639  pq_putmessage_noblock('c', NULL, 0);
1640  streamingDoneSending = true;
1641  }
1642 
1643  streamingDoneReceiving = true;
1644  received = true;
1645  break;
1646 
1647  /*
1648  * 'X' means that the standby is closing down the socket.
1649  */
1650  case 'X':
1651  proc_exit(0);
1652 
1653  default:
1654  ereport(FATAL,
1655  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1656  errmsg("invalid standby message type \"%c\"",
1657  firstchar)));
1658  }
1659  }
1660 
1661  /*
1662  * Save the last reply timestamp if we've received at least one reply.
1663  */
1664  if (received)
1665  {
1667  waiting_for_ping_response = false;
1668  }
1669 }
1670 
1671 /*
1672  * Process a status update message received from standby.
1673  */
1674 static void
1676 {
1677  char msgtype;
1678 
1679  /*
1680  * Check message type from the first byte.
1681  */
1682  msgtype = pq_getmsgbyte(&reply_message);
1683 
1684  switch (msgtype)
1685  {
1686  case 'r':
1688  break;
1689 
1690  case 'h':
1692  break;
1693 
1694  default:
1696  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1697  errmsg("unexpected message type \"%c\"", msgtype)));
1698  proc_exit(0);
1699  }
1700 }
1701 
1702 /*
1703  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1704  */
1705 static void
1707 {
1708  bool changed = false;
1710 
1711  Assert(lsn != InvalidXLogRecPtr);
1712  SpinLockAcquire(&slot->mutex);
1713  if (slot->data.restart_lsn != lsn)
1714  {
1715  changed = true;
1716  slot->data.restart_lsn = lsn;
1717  }
1718  SpinLockRelease(&slot->mutex);
1719 
1720  if (changed)
1721  {
1724  }
1725 
1726  /*
1727  * One could argue that the slot should be saved to disk now, but that'd
1728  * be energy wasted - the worst lost information can do here is give us
1729  * wrong information in a statistics view - we'll just potentially be more
1730  * conservative in removing files.
1731  */
1732 }
1733 
1734 /*
1735  * Regular reply from standby advising of WAL locations on standby server.
1736  */
1737 static void
1739 {
1740  XLogRecPtr writePtr,
1741  flushPtr,
1742  applyPtr;
1743  bool replyRequested;
1744  TimeOffset writeLag,
1745  flushLag,
1746  applyLag;
1747  bool clearLagTimes;
1748  TimestampTz now;
1749 
1750  static bool fullyAppliedLastTime = false;
1751 
1752  /* the caller already consumed the msgtype byte */
1753  writePtr = pq_getmsgint64(&reply_message);
1754  flushPtr = pq_getmsgint64(&reply_message);
1755  applyPtr = pq_getmsgint64(&reply_message);
1756  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1757  replyRequested = pq_getmsgbyte(&reply_message);
1758 
1759  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1760  (uint32) (writePtr >> 32), (uint32) writePtr,
1761  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1762  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1763  replyRequested ? " (reply requested)" : "");
1764 
1765  /* See if we can compute the round-trip lag for these positions. */
1766  now = GetCurrentTimestamp();
1767  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1768  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1769  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1770 
1771  /*
1772  * If the standby reports that it has fully replayed the WAL in two
1773  * consecutive reply messages, then the second such message must result
1774  * from wal_receiver_status_interval expiring on the standby. This is a
1775  * convenient time to forget the lag times measured when it last
1776  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1777  * until more WAL traffic arrives.
1778  */
1779  clearLagTimes = false;
1780  if (applyPtr == sentPtr)
1781  {
1782  if (fullyAppliedLastTime)
1783  clearLagTimes = true;
1784  fullyAppliedLastTime = true;
1785  }
1786  else
1787  fullyAppliedLastTime = false;
1788 
1789  /* Send a reply if the standby requested one. */
1790  if (replyRequested)
1791  WalSndKeepalive(false);
1792 
1793  /*
1794  * Update shared state for this WalSender process based on reply data from
1795  * standby.
1796  */
1797  {
1798  WalSnd *walsnd = MyWalSnd;
1799 
1800  SpinLockAcquire(&walsnd->mutex);
1801  walsnd->write = writePtr;
1802  walsnd->flush = flushPtr;
1803  walsnd->apply = applyPtr;
1804  if (writeLag != -1 || clearLagTimes)
1805  walsnd->writeLag = writeLag;
1806  if (flushLag != -1 || clearLagTimes)
1807  walsnd->flushLag = flushLag;
1808  if (applyLag != -1 || clearLagTimes)
1809  walsnd->applyLag = applyLag;
1810  SpinLockRelease(&walsnd->mutex);
1811  }
1812 
1815 
1816  /*
1817  * Advance our local xmin horizon when the client confirmed a flush.
1818  */
1819  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1820  {
1823  else
1825  }
1826 }
1827 
1828 /* compute new replication slot xmin horizon if needed */
1829 static void
1831 {
1832  bool changed = false;
1834 
1835  SpinLockAcquire(&slot->mutex);
1837 
1838  /*
1839  * For physical replication we don't need the interlock provided by xmin
1840  * and effective_xmin since the consequences of a missed increase are
1841  * limited to query cancellations, so set both at once.
1842  */
1843  if (!TransactionIdIsNormal(slot->data.xmin) ||
1844  !TransactionIdIsNormal(feedbackXmin) ||
1845  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1846  {
1847  changed = true;
1848  slot->data.xmin = feedbackXmin;
1849  slot->effective_xmin = feedbackXmin;
1850  }
1851  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1852  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1853  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1854  {
1855  changed = true;
1856  slot->data.catalog_xmin = feedbackCatalogXmin;
1857  slot->effective_catalog_xmin = feedbackCatalogXmin;
1858  }
1859  SpinLockRelease(&slot->mutex);
1860 
1861  if (changed)
1862  {
1865  }
1866 }
1867 
1868 /*
1869  * Check that the provided xmin/epoch are sane, that is, not in the future
1870  * and not so far back as to be already wrapped around.
1871  *
1872  * Epoch of nextXid should be same as standby, or if the counter has
1873  * wrapped, then one greater than standby.
1874  *
1875  * This check doesn't care about whether clog exists for these xids
1876  * at all.
1877  */
1878 static bool
1880 {
1881  TransactionId nextXid;
1882  uint32 nextEpoch;
1883 
1884  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1885 
1886  if (xid <= nextXid)
1887  {
1888  if (epoch != nextEpoch)
1889  return false;
1890  }
1891  else
1892  {
1893  if (epoch + 1 != nextEpoch)
1894  return false;
1895  }
1896 
1897  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1898  return false; /* epoch OK, but it's wrapped around */
1899 
1900  return true;
1901 }
1902 
1903 /*
1904  * Hot Standby feedback
1905  */
1906 static void
1908 {
1909  TransactionId feedbackXmin;
1910  uint32 feedbackEpoch;
1911  TransactionId feedbackCatalogXmin;
1912  uint32 feedbackCatalogEpoch;
1913 
1914  /*
1915  * Decipher the reply message. The caller already consumed the msgtype
1916  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1917  * of this message.
1918  */
1919  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1920  feedbackXmin = pq_getmsgint(&reply_message, 4);
1921  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1922  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1923  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1924 
1925  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1926  feedbackXmin,
1927  feedbackEpoch,
1928  feedbackCatalogXmin,
1929  feedbackCatalogEpoch);
1930 
1931  /*
1932  * Unset WalSender's xmins if the feedback message values are invalid.
1933  * This happens when the downstream turned hot_standby_feedback off.
1934  */
1935  if (!TransactionIdIsNormal(feedbackXmin)
1936  && !TransactionIdIsNormal(feedbackCatalogXmin))
1937  {
1939  if (MyReplicationSlot != NULL)
1940  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1941  return;
1942  }
1943 
1944  /*
1945  * Check that the provided xmin/epoch are sane, that is, not in the future
1946  * and not so far back as to be already wrapped around. Ignore if not.
1947  */
1948  if (TransactionIdIsNormal(feedbackXmin) &&
1949  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1950  return;
1951 
1952  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1953  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1954  return;
1955 
1956  /*
1957  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1958  * the xmin will be taken into account by GetOldestXmin. This will hold
1959  * back the removal of dead rows and thereby prevent the generation of
1960  * cleanup conflicts on the standby server.
1961  *
1962  * There is a small window for a race condition here: although we just
1963  * checked that feedbackXmin precedes nextXid, the nextXid could have
1964  * gotten advanced between our fetching it and applying the xmin below,
1965  * perhaps far enough to make feedbackXmin wrap around. In that case the
1966  * xmin we set here would be "in the future" and have no effect. No point
1967  * in worrying about this since it's too late to save the desired data
1968  * anyway. Assuming that the standby sends us an increasing sequence of
1969  * xmins, this could only happen during the first reply cycle, else our
1970  * own xmin would prevent nextXid from advancing so far.
1971  *
1972  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1973  * is assumed atomic, and there's no real need to prevent a concurrent
1974  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1975  * safe, and if we're moving it backwards, well, the data is at risk
1976  * already since a VACUUM could have just finished calling GetOldestXmin.)
1977  *
1978  * If we're using a replication slot we reserve the xmin via that,
1979  * otherwise via the walsender's PGXACT entry. We can only track the
1980  * catalog xmin separately when using a slot, so we store the least of the
1981  * two provided when not using a slot.
1982  *
1983  * XXX: It might make sense to generalize the ephemeral slot concept and
1984  * always use the slot mechanism to handle the feedback xmin.
1985  */
1986  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1987  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1988  else
1989  {
1990  if (TransactionIdIsNormal(feedbackCatalogXmin)
1991  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
1992  MyPgXact->xmin = feedbackCatalogXmin;
1993  else
1994  MyPgXact->xmin = feedbackXmin;
1995  }
1996 }
1997 
1998 /*
1999  * Compute how long send/receive loops should sleep.
2000  *
2001  * If wal_sender_timeout is enabled we want to wake up in time to send
2002  * keepalives and to abort the connection if wal_sender_timeout has been
2003  * reached.
2004  */
2005 static long
2007 {
2008  long sleeptime = 10000; /* 10 s */
2009 
2011  {
2012  TimestampTz wakeup_time;
2013  long sec_to_timeout;
2014  int microsec_to_timeout;
2015 
2016  /*
2017  * At the latest stop sleeping once wal_sender_timeout has been
2018  * reached.
2019  */
2022 
2023  /*
2024  * If no ping has been sent yet, wakeup when it's time to do so.
2025  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2026  * the timeout passed without a response.
2027  */
2030  wal_sender_timeout / 2);
2031 
2032  /* Compute relative time until wakeup. */
2033  TimestampDifference(now, wakeup_time,
2034  &sec_to_timeout, &microsec_to_timeout);
2035 
2036  sleeptime = sec_to_timeout * 1000 +
2037  microsec_to_timeout / 1000;
2038  }
2039 
2040  return sleeptime;
2041 }
2042 
2043 /*
2044  * Check whether there have been responses by the client within
2045  * wal_sender_timeout and shutdown if not.
2046  */
2047 static void
2049 {
2050  TimestampTz timeout;
2051 
2052  /* don't bail out if we're doing something that doesn't require timeouts */
2053  if (last_reply_timestamp <= 0)
2054  return;
2055 
2058 
2059  if (wal_sender_timeout > 0 && now >= timeout)
2060  {
2061  /*
2062  * Since typically expiration of replication timeout means
2063  * communication problem, we don't send the error message to the
2064  * standby.
2065  */
2067  (errmsg("terminating walsender process due to replication timeout")));
2068 
2069  WalSndShutdown();
2070  }
2071 }
2072 
2073 /* Main loop of walsender process that streams the WAL over Copy messages. */
2074 static void
2076 {
2077  /*
2078  * Initialize the last reply timestamp. That enables timeout processing
2079  * from hereon.
2080  */
2082  waiting_for_ping_response = false;
2083 
2084  /* Report to pgstat that this process is running */
2086 
2087  /*
2088  * Loop until we reach the end of this timeline or the client requests to
2089  * stop streaming.
2090  */
2091  for (;;)
2092  {
2093  TimestampTz now;
2094 
2095  /*
2096  * Emergency bailout if postmaster has died. This is to avoid the
2097  * necessity for manual cleanup of all postmaster children.
2098  */
2099  if (!PostmasterIsAlive())
2100  exit(1);
2101 
2102  /* Clear any already-pending wakeups */
2104 
2106 
2107  /* Process any requests or signals received recently */
2108  if (ConfigReloadPending)
2109  {
2110  ConfigReloadPending = false;
2113  }
2114 
2115  /* Check for input from the client */
2117 
2118  /*
2119  * If we have received CopyDone from the client, sent CopyDone
2120  * ourselves, and the output buffer is empty, it's time to exit
2121  * streaming.
2122  */
2124  !pq_is_send_pending())
2125  break;
2126 
2127  /*
2128  * If we don't have any pending data in the output buffer, try to send
2129  * some more. If there is some, we don't bother to call send_data
2130  * again until we've flushed it ... but we'd better assume we are not
2131  * caught up.
2132  */
2133  if (!pq_is_send_pending())
2134  send_data();
2135  else
2136  WalSndCaughtUp = false;
2137 
2138  /* Try to flush pending output to the client */
2139  if (pq_flush_if_writable() != 0)
2140  WalSndShutdown();
2141 
2142  /* If nothing remains to be sent right now ... */
2144  {
2145  /*
2146  * If we're in catchup state, move to streaming. This is an
2147  * important state change for users to know about, since before
2148  * this point data loss might occur if the primary dies and we
2149  * need to failover to the standby. The state change is also
2150  * important for synchronous replication, since commits that
2151  * started to wait at that point might wait for some time.
2152  */
2153  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2154  {
2155  ereport(DEBUG1,
2156  (errmsg("standby \"%s\" has now caught up with primary",
2157  application_name)));
2159  }
2160 
2161  /*
2162  * When SIGUSR2 arrives, we send any outstanding logs up to the
2163  * shutdown checkpoint record (i.e., the latest record), wait for
2164  * them to be replicated to the standby, and exit. This may be a
2165  * normal termination at shutdown, or a promotion, the walsender
2166  * is not sure which.
2167  */
2168  if (got_SIGUSR2)
2169  WalSndDone(send_data);
2170  }
2171 
2172  now = GetCurrentTimestamp();
2173 
2174  /* Check for replication timeout. */
2175  WalSndCheckTimeOut(now);
2176 
2177  /* Send keepalive if the time has come */
2179 
2180  /*
2181  * We don't block if not caught up, unless there is unsent data
2182  * pending in which case we'd better block until the socket is
2183  * write-ready. This test is only needed for the case where the
2184  * send_data callback handled a subset of the available data but then
2185  * pq_flush_if_writable flushed it all --- we should immediately try
2186  * to send more.
2187  */
2189  {
2190  long sleeptime;
2191  int wakeEvents;
2192 
2193  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2195 
2196  sleeptime = WalSndComputeSleeptime(now);
2197 
2198  if (pq_is_send_pending())
2199  wakeEvents |= WL_SOCKET_WRITEABLE;
2200 
2201  /* Sleep until something happens or we time out */
2202  WaitLatchOrSocket(MyLatch, wakeEvents,
2203  MyProcPort->sock, sleeptime,
2205  }
2206  }
2207  return;
2208 }
2209 
2210 /* Initialize a per-walsender data structure for this walsender process */
2211 static void
2213 {
2214  int i;
2215 
2216  /*
2217  * WalSndCtl should be set up already (we inherit this by fork() or
2218  * EXEC_BACKEND mechanism from the postmaster).
2219  */
2220  Assert(WalSndCtl != NULL);
2221  Assert(MyWalSnd == NULL);
2222 
2223  /*
2224  * Find a free walsender slot and reserve it. If this fails, we must be
2225  * out of WalSnd structures.
2226  */
2227  for (i = 0; i < max_wal_senders; i++)
2228  {
2229  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2230 
2231  SpinLockAcquire(&walsnd->mutex);
2232 
2233  if (walsnd->pid != 0)
2234  {
2235  SpinLockRelease(&walsnd->mutex);
2236  continue;
2237  }
2238  else
2239  {
2240  /*
2241  * Found a free slot. Reserve it for us.
2242  */
2243  walsnd->pid = MyProcPid;
2244  walsnd->sentPtr = InvalidXLogRecPtr;
2245  walsnd->write = InvalidXLogRecPtr;
2246  walsnd->flush = InvalidXLogRecPtr;
2247  walsnd->apply = InvalidXLogRecPtr;
2248  walsnd->writeLag = -1;
2249  walsnd->flushLag = -1;
2250  walsnd->applyLag = -1;
2251  walsnd->state = WALSNDSTATE_STARTUP;
2252  walsnd->latch = &MyProc->procLatch;
2253  SpinLockRelease(&walsnd->mutex);
2254  /* don't need the lock anymore */
2255  MyWalSnd = (WalSnd *) walsnd;
2256 
2257  break;
2258  }
2259  }
2260  if (MyWalSnd == NULL)
2261  ereport(FATAL,
2262  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2263  errmsg("number of requested standby connections "
2264  "exceeds max_wal_senders (currently %d)",
2265  max_wal_senders)));
2266 
2267  /* Arrange to clean up at walsender exit */
2269 }
2270 
2271 /* Destroy the per-walsender data structure for this walsender process */
2272 static void
2274 {
2275  WalSnd *walsnd = MyWalSnd;
2276 
2277  Assert(walsnd != NULL);
2278 
2279  MyWalSnd = NULL;
2280 
2281  SpinLockAcquire(&walsnd->mutex);
2282  /* clear latch while holding the spinlock, so it can safely be read */
2283  walsnd->latch = NULL;
2284  /* Mark WalSnd struct as no longer being in use. */
2285  walsnd->pid = 0;
2286  SpinLockRelease(&walsnd->mutex);
2287 }
2288 
2289 /*
2290  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2291  *
2292  * XXX probably this should be improved to suck data directly from the
2293  * WAL buffers when possible.
2294  *
2295  * Will open, and keep open, one WAL segment stored in the global file
2296  * descriptor sendFile. This means if XLogRead is used once, there will
2297  * always be one descriptor left open until the process ends, but never
2298  * more than one.
2299  */
2300 static void
2301 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2302 {
2303  char *p;
2304  XLogRecPtr recptr;
2305  Size nbytes;
2306  XLogSegNo segno;
2307 
2308 retry:
2309  p = buf;
2310  recptr = startptr;
2311  nbytes = count;
2312 
2313  while (nbytes > 0)
2314  {
2315  uint32 startoff;
2316  int segbytes;
2317  int readbytes;
2318 
2319  startoff = recptr % XLogSegSize;
2320 
2321  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2322  {
2323  char path[MAXPGPATH];
2324 
2325  /* Switch to another logfile segment */
2326  if (sendFile >= 0)
2327  close(sendFile);
2328 
2329  XLByteToSeg(recptr, sendSegNo);
2330 
2331  /*-------
2332  * When reading from a historic timeline, and there is a timeline
2333  * switch within this segment, read from the WAL segment belonging
2334  * to the new timeline.
2335  *
2336  * For example, imagine that this server is currently on timeline
2337  * 5, and we're streaming timeline 4. The switch from timeline 4
2338  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2339  *
2340  * ...
2341  * 000000040000000000000012
2342  * 000000040000000000000013
2343  * 000000050000000000000013
2344  * 000000050000000000000014
2345  * ...
2346  *
2347  * In this situation, when requested to send the WAL from
2348  * segment 0x13, on timeline 4, we read the WAL from file
2349  * 000000050000000000000013. Archive recovery prefers files from
2350  * newer timelines, so if the segment was restored from the
2351  * archive on this server, the file belonging to the old timeline,
2352  * 000000040000000000000013, might not exist. Their contents are
2353  * equal up to the switchpoint, because at a timeline switch, the
2354  * used portion of the old segment is copied to the new file.
2355  *-------
2356  */
2359  {
2360  XLogSegNo endSegNo;
2361 
2363  if (sendSegNo == endSegNo)
2365  }
2366 
2368 
2369  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2370  if (sendFile < 0)
2371  {
2372  /*
2373  * If the file is not found, assume it's because the standby
2374  * asked for a too old WAL segment that has already been
2375  * removed or recycled.
2376  */
2377  if (errno == ENOENT)
2378  ereport(ERROR,
2380  errmsg("requested WAL segment %s has already been removed",
2382  else
2383  ereport(ERROR,
2385  errmsg("could not open file \"%s\": %m",
2386  path)));
2387  }
2388  sendOff = 0;
2389  }
2390 
2391  /* Need to seek in the file? */
2392  if (sendOff != startoff)
2393  {
2394  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2395  ereport(ERROR,
2397  errmsg("could not seek in log segment %s to offset %u: %m",
2399  startoff)));
2400  sendOff = startoff;
2401  }
2402 
2403  /* How many bytes are within this segment? */
2404  if (nbytes > (XLogSegSize - startoff))
2405  segbytes = XLogSegSize - startoff;
2406  else
2407  segbytes = nbytes;
2408 
2410  readbytes = read(sendFile, p, segbytes);
2412  if (readbytes <= 0)
2413  {
2414  ereport(ERROR,
2416  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2418  sendOff, (unsigned long) segbytes)));
2419  }
2420 
2421  /* Update state for read */
2422  recptr += readbytes;
2423 
2424  sendOff += readbytes;
2425  nbytes -= readbytes;
2426  p += readbytes;
2427  }
2428 
2429  /*
2430  * After reading into the buffer, check that what we read was valid. We do
2431  * this after reading, because even though the segment was present when we
2432  * opened it, it might get recycled or removed while we read it. The
2433  * read() succeeds in that case, but the data we tried to read might
2434  * already have been overwritten with new WAL records.
2435  */
2436  XLByteToSeg(startptr, segno);
2438 
2439  /*
2440  * During recovery, the currently-open WAL file might be replaced with the
2441  * file of the same name retrieved from archive. So we always need to
2442  * check what we read was valid after reading into the buffer. If it's
2443  * invalid, we try to open and read the file again.
2444  */
2446  {
2447  WalSnd *walsnd = MyWalSnd;
2448  bool reload;
2449 
2450  SpinLockAcquire(&walsnd->mutex);
2451  reload = walsnd->needreload;
2452  walsnd->needreload = false;
2453  SpinLockRelease(&walsnd->mutex);
2454 
2455  if (reload && sendFile >= 0)
2456  {
2457  close(sendFile);
2458  sendFile = -1;
2459 
2460  goto retry;
2461  }
2462  }
2463 }
2464 
2465 /*
2466  * Send out the WAL in its normal physical/stored form.
2467  *
2468  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2469  * but not yet sent to the client, and buffer it in the libpq output
2470  * buffer.
2471  *
2472  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2473  * otherwise WalSndCaughtUp is set to false.
2474  */
2475 static void
2477 {
2478  XLogRecPtr SendRqstPtr;
2479  XLogRecPtr startptr;
2480  XLogRecPtr endptr;
2481  Size nbytes;
2482 
2483  /* If requested switch the WAL sender to the stopping state. */
2484  if (got_STOPPING)
2486 
2488  {
2489  WalSndCaughtUp = true;
2490  return;
2491  }
2492 
2493  /* Figure out how far we can safely send the WAL. */
2495  {
2496  /*
2497  * Streaming an old timeline that's in this server's history, but is
2498  * not the one we're currently inserting or replaying. It can be
2499  * streamed up to the point where we switched off that timeline.
2500  */
2501  SendRqstPtr = sendTimeLineValidUpto;
2502  }
2503  else if (am_cascading_walsender)
2504  {
2505  /*
2506  * Streaming the latest timeline on a standby.
2507  *
2508  * Attempt to send all WAL that has already been replayed, so that we
2509  * know it's valid. If we're receiving WAL through streaming
2510  * replication, it's also OK to send any WAL that has been received
2511  * but not replayed.
2512  *
2513  * The timeline we're recovering from can change, or we can be
2514  * promoted. In either case, the current timeline becomes historic. We
2515  * need to detect that so that we don't try to stream past the point
2516  * where we switched to another timeline. We check for promotion or
2517  * timeline switch after calculating FlushPtr, to avoid a race
2518  * condition: if the timeline becomes historic just after we checked
2519  * that it was still current, it's still be OK to stream it up to the
2520  * FlushPtr that was calculated before it became historic.
2521  */
2522  bool becameHistoric = false;
2523 
2524  SendRqstPtr = GetStandbyFlushRecPtr();
2525 
2526  if (!RecoveryInProgress())
2527  {
2528  /*
2529  * We have been promoted. RecoveryInProgress() updated
2530  * ThisTimeLineID to the new current timeline.
2531  */
2532  am_cascading_walsender = false;
2533  becameHistoric = true;
2534  }
2535  else
2536  {
2537  /*
2538  * Still a cascading standby. But is the timeline we're sending
2539  * still the one recovery is recovering from? ThisTimeLineID was
2540  * updated by the GetStandbyFlushRecPtr() call above.
2541  */
2543  becameHistoric = true;
2544  }
2545 
2546  if (becameHistoric)
2547  {
2548  /*
2549  * The timeline we were sending has become historic. Read the
2550  * timeline history file of the new timeline to see where exactly
2551  * we forked off from the timeline we were sending.
2552  */
2553  List *history;
2554 
2557 
2559  list_free_deep(history);
2560 
2561  sendTimeLineIsHistoric = true;
2562 
2563  SendRqstPtr = sendTimeLineValidUpto;
2564  }
2565  }
2566  else
2567  {
2568  /*
2569  * Streaming the current timeline on a master.
2570  *
2571  * Attempt to send all data that's already been written out and
2572  * fsync'd to disk. We cannot go further than what's been written out
2573  * given the current implementation of XLogRead(). And in any case
2574  * it's unsafe to send WAL that is not securely down to disk on the
2575  * master: if the master subsequently crashes and restarts, slaves
2576  * must not have applied any WAL that got lost on the master.
2577  */
2578  SendRqstPtr = GetFlushRecPtr();
2579  }
2580 
2581  /*
2582  * Record the current system time as an approximation of the time at which
2583  * this WAL location was written for the purposes of lag tracking.
2584  *
2585  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2586  * is flushed and we could get that time as well as the LSN when we call
2587  * GetFlushRecPtr() above (and likewise for the cascading standby
2588  * equivalent), but rather than putting any new code into the hot WAL path
2589  * it seems good enough to capture the time here. We should reach this
2590  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2591  * may take some time, we read the WAL flush pointer and take the time
2592  * very close to together here so that we'll get a later position if it is
2593  * still moving.
2594  *
2595  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2596  * this gives us a cheap approximation for the WAL flush time for this
2597  * LSN.
2598  *
2599  * Note that the LSN is not necessarily the LSN for the data contained in
2600  * the present message; it's the end of the WAL, which might be further
2601  * ahead. All the lag tracking machinery cares about is finding out when
2602  * that arbitrary LSN is eventually reported as written, flushed and
2603  * applied, so that it can measure the elapsed time.
2604  */
2605  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2606 
2607  /*
2608  * If this is a historic timeline and we've reached the point where we
2609  * forked to the next timeline, stop streaming.
2610  *
2611  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2612  * startup process will normally replay all WAL that has been received
2613  * from the master, before promoting, but if the WAL streaming is
2614  * terminated at a WAL page boundary, the valid portion of the timeline
2615  * might end in the middle of a WAL record. We might've already sent the
2616  * first half of that partial WAL record to the cascading standby, so that
2617  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2618  * replay the partial WAL record either, so it can still follow our
2619  * timeline switch.
2620  */
2622  {
2623  /* close the current file. */
2624  if (sendFile >= 0)
2625  close(sendFile);
2626  sendFile = -1;
2627 
2628  /* Send CopyDone */
2629  pq_putmessage_noblock('c', NULL, 0);
2630  streamingDoneSending = true;
2631 
2632  WalSndCaughtUp = true;
2633 
2634  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2636  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2637  return;
2638  }
2639 
2640  /* Do we have any work to do? */
2641  Assert(sentPtr <= SendRqstPtr);
2642  if (SendRqstPtr <= sentPtr)
2643  {
2644  WalSndCaughtUp = true;
2645  return;
2646  }
2647 
2648  /*
2649  * Figure out how much to send in one message. If there's no more than
2650  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2651  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2652  *
2653  * The rounding is not only for performance reasons. Walreceiver relies on
2654  * the fact that we never split a WAL record across two messages. Since a
2655  * long WAL record is split at page boundary into continuation records,
2656  * page boundary is always a safe cut-off point. We also assume that
2657  * SendRqstPtr never points to the middle of a WAL record.
2658  */
2659  startptr = sentPtr;
2660  endptr = startptr;
2661  endptr += MAX_SEND_SIZE;
2662 
2663  /* if we went beyond SendRqstPtr, back off */
2664  if (SendRqstPtr <= endptr)
2665  {
2666  endptr = SendRqstPtr;
2668  WalSndCaughtUp = false;
2669  else
2670  WalSndCaughtUp = true;
2671  }
2672  else
2673  {
2674  /* round down to page boundary. */
2675  endptr -= (endptr % XLOG_BLCKSZ);
2676  WalSndCaughtUp = false;
2677  }
2678 
2679  nbytes = endptr - startptr;
2680  Assert(nbytes <= MAX_SEND_SIZE);
2681 
2682  /*
2683  * OK to read and send the slice.
2684  */
2685  resetStringInfo(&output_message);
2686  pq_sendbyte(&output_message, 'w');
2687 
2688  pq_sendint64(&output_message, startptr); /* dataStart */
2689  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2690  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2691 
2692  /*
2693  * Read the log directly into the output buffer to avoid extra memcpy
2694  * calls.
2695  */
2696  enlargeStringInfo(&output_message, nbytes);
2697  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2698  output_message.len += nbytes;
2699  output_message.data[output_message.len] = '\0';
2700 
2701  /*
2702  * Fill the send timestamp last, so that it is taken as late as possible.
2703  */
2704  resetStringInfo(&tmpbuf);
2705  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2706  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2707  tmpbuf.data, sizeof(int64));
2708 
2709  pq_putmessage_noblock('d', output_message.data, output_message.len);
2710 
2711  sentPtr = endptr;
2712 
2713  /* Update shared memory status */
2714  {
2715  WalSnd *walsnd = MyWalSnd;
2716 
2717  SpinLockAcquire(&walsnd->mutex);
2718  walsnd->sentPtr = sentPtr;
2719  SpinLockRelease(&walsnd->mutex);
2720  }
2721 
2722  /* Report progress of XLOG streaming in PS display */
2724  {
2725  char activitymsg[50];
2726 
2727  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2728  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2729  set_ps_display(activitymsg, false);
2730  }
2731 
2732  return;
2733 }
2734 
2735 /*
2736  * Stream out logically decoded data.
2737  */
2738 static void
2740 {
2741  XLogRecord *record;
2742  char *errm;
2743 
2744  /*
2745  * Don't know whether we've caught up yet. We'll set it to true in
2746  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2747  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2748  * i.e. when we're shutting down.
2749  */
2750  WalSndCaughtUp = false;
2751 
2752  record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2754 
2755  /* xlog record was invalid */
2756  if (errm != NULL)
2757  elog(ERROR, "%s", errm);
2758 
2759  if (record != NULL)
2760  {
2761  /*
2762  * Note the lack of any call to LagTrackerWrite() which is handled by
2763  * WalSndUpdateProgress which is called by output plugin through
2764  * logical decoding write api.
2765  */
2766  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2767 
2768  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2769  }
2770  else
2771  {
2772  /*
2773  * If the record we just wanted read is at or beyond the flushed
2774  * point, then we're caught up.
2775  */
2776  if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
2777  {
2778  WalSndCaughtUp = true;
2779 
2780  /*
2781  * Have WalSndLoop() terminate the connection in an orderly
2782  * manner, after writing out all the pending data.
2783  */
2784  if (got_STOPPING)
2785  got_SIGUSR2 = true;
2786  }
2787  }
2788 
2789  /* Update shared memory status */
2790  {
2791  WalSnd *walsnd = MyWalSnd;
2792 
2793  SpinLockAcquire(&walsnd->mutex);
2794  walsnd->sentPtr = sentPtr;
2795  SpinLockRelease(&walsnd->mutex);
2796  }
2797 }
2798 
2799 /*
2800  * Shutdown if the sender is caught up.
2801  *
2802  * NB: This should only be called when the shutdown signal has been received
2803  * from postmaster.
2804  *
2805  * Note that if we determine that there's still more data to send, this
2806  * function will return control to the caller.
2807  */
2808 static void
2810 {
2811  XLogRecPtr replicatedPtr;
2812 
2813  /* ... let's just be real sure we're caught up ... */
2814  send_data();
2815 
2816  /*
2817  * To figure out whether all WAL has successfully been replicated, check
2818  * flush location if valid, write otherwise. Tools like pg_receivewal will
2819  * usually (unless in synchronous mode) return an invalid flush location.
2820  */
2821  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2822  MyWalSnd->write : MyWalSnd->flush;
2823 
2824  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2825  !pq_is_send_pending())
2826  {
2827  /* Inform the standby that XLOG streaming is done */
2828  EndCommand("COPY 0", DestRemote);
2829  pq_flush();
2830 
2831  proc_exit(0);
2832  }
2834  {
2835  WalSndKeepalive(true);
2837  }
2838 }
2839 
2840 /*
2841  * Returns the latest point in WAL that has been safely flushed to disk, and
2842  * can be sent to the standby. This should only be called when in recovery,
2843  * ie. we're streaming to a cascaded standby.
2844  *
2845  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2846  * replayed WAL record.
2847  */
2848 static XLogRecPtr
2850 {
2851  XLogRecPtr replayPtr;
2852  TimeLineID replayTLI;
2853  XLogRecPtr receivePtr;
2856 
2857  /*
2858  * We can safely send what's already been replayed. Also, if walreceiver
2859  * is streaming WAL from the same timeline, we can send anything that it
2860  * has streamed, but hasn't been replayed yet.
2861  */
2862 
2863  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2864  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2865 
2866  ThisTimeLineID = replayTLI;
2867 
2868  result = replayPtr;
2869  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2870  result = receivePtr;
2871 
2872  return result;
2873 }
2874 
2875 /*
2876  * Request walsenders to reload the currently-open WAL file
2877  */
2878 void
2880 {
2881  int i;
2882 
2883  for (i = 0; i < max_wal_senders; i++)
2884  {
2885  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2886 
2887  SpinLockAcquire(&walsnd->mutex);
2888  if (walsnd->pid == 0)
2889  {
2890  SpinLockRelease(&walsnd->mutex);
2891  continue;
2892  }
2893  walsnd->needreload = true;
2894  SpinLockRelease(&walsnd->mutex);
2895  }
2896 }
2897 
2898 /*
2899  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2900  */
2901 void
2903 {
2905 
2906  /*
2907  * If replication has not yet started, die like with SIGTERM. If
2908  * replication is active, only set a flag and wake up the main loop. It
2909  * will send any outstanding WAL, wait for it to be replicated to the
2910  * standby, and then exit gracefully.
2911  */
2912  if (!replication_active)
2913  kill(MyProcPid, SIGTERM);
2914  else
2915  got_STOPPING = true;
2916 }
2917 
2918 /*
2919  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2920  * sender should already have been switched to WALSNDSTATE_STOPPING at
2921  * this point.
2922  */
2923 static void
2925 {
2926  int save_errno = errno;
2927 
2928  got_SIGUSR2 = true;
2929  SetLatch(MyLatch);
2930 
2931  errno = save_errno;
2932 }
2933 
2934 /* Set up signal handlers */
2935 void
2937 {
2938  /* Set up signal handlers */
2939  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2940  * file */
2941  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2942  pqsignal(SIGTERM, die); /* request shutdown */
2943  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2944  InitializeTimeouts(); /* establishes SIGALRM handler */
2947  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2948  * shutdown */
2949 
2950  /* Reset some signals that are accepted by postmaster but not here */
2956 }
2957 
2958 /* Report shared-memory space needed by WalSndShmemInit */
2959 Size
2961 {
2962  Size size = 0;
2963 
2964  size = offsetof(WalSndCtlData, walsnds);
2965  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2966 
2967  return size;
2968 }
2969 
2970 /* Allocate and initialize walsender-related shared memory */
2971 void
2973 {
2974  bool found;
2975  int i;
2976 
2977  WalSndCtl = (WalSndCtlData *)
2978  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2979 
2980  if (!found)
2981  {
2982  /* First time through, so initialize */
2983  MemSet(WalSndCtl, 0, WalSndShmemSize());
2984 
2985  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2986  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
2987 
2988  for (i = 0; i < max_wal_senders; i++)
2989  {
2990  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2991 
2992  SpinLockInit(&walsnd->mutex);
2993  }
2994  }
2995 }
2996 
2997 /*
2998  * Wake up all walsenders
2999  *
3000  * This will be called inside critical sections, so throwing an error is not
3001  * advisable.
3002  */
3003 void
3005 {
3006  int i;
3007 
3008  for (i = 0; i < max_wal_senders; i++)
3009  {
3010  Latch *latch;
3011  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3012 
3013  /*
3014  * Get latch pointer with spinlock held, for the unlikely case that
3015  * pointer reads aren't atomic (as they're 8 bytes).
3016  */
3017  SpinLockAcquire(&walsnd->mutex);
3018  latch = walsnd->latch;
3019  SpinLockRelease(&walsnd->mutex);
3020 
3021  if (latch != NULL)
3022  SetLatch(latch);
3023  }
3024 }
3025 
3026 /*
3027  * Signal all walsenders to move to stopping state.
3028  *
3029  * This will trigger walsenders to move to a state where no further WAL can be
3030  * generated. See this file's header for details.
3031  */
3032 void
3034 {
3035  int i;
3036 
3037  for (i = 0; i < max_wal_senders; i++)
3038  {
3039  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3040  pid_t pid;
3041 
3042  SpinLockAcquire(&walsnd->mutex);
3043  pid = walsnd->pid;
3044  SpinLockRelease(&walsnd->mutex);
3045 
3046  if (pid == 0)
3047  continue;
3048 
3050  }
3051 }
3052 
3053 /*
3054  * Wait that all the WAL senders have quit or reached the stopping state. This
3055  * is used by the checkpointer to control when the shutdown checkpoint can
3056  * safely be performed.
3057  */
3058 void
3060 {
3061  for (;;)
3062  {
3063  int i;
3064  bool all_stopped = true;
3065 
3066  for (i = 0; i < max_wal_senders; i++)
3067  {
3068  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3069 
3070  SpinLockAcquire(&walsnd->mutex);
3071 
3072  if (walsnd->pid == 0)
3073  {
3074  SpinLockRelease(&walsnd->mutex);
3075  continue;
3076  }
3077 
3078  if (walsnd->state != WALSNDSTATE_STOPPING)
3079  {
3080  all_stopped = false;
3081  SpinLockRelease(&walsnd->mutex);
3082  break;
3083  }
3084  SpinLockRelease(&walsnd->mutex);
3085  }
3086 
3087  /* safe to leave if confirmation is done for all WAL senders */
3088  if (all_stopped)
3089  return;
3090 
3091  pg_usleep(10000L); /* wait for 10 msec */
3092  }
3093 }
3094 
3095 /* Set state for current walsender (only called in walsender) */
3096 void
3098 {
3099  WalSnd *walsnd = MyWalSnd;
3100 
3102 
3103  if (walsnd->state == state)
3104  return;
3105 
3106  SpinLockAcquire(&walsnd->mutex);
3107  walsnd->state = state;
3108  SpinLockRelease(&walsnd->mutex);
3109 }
3110 
3111 /*
3112  * Return a string constant representing the state. This is used
3113  * in system views, and should *not* be translated.
3114  */
3115 static const char *
3117 {
3118  switch (state)
3119  {
3120  case WALSNDSTATE_STARTUP:
3121  return "startup";
3122  case WALSNDSTATE_BACKUP:
3123  return "backup";
3124  case WALSNDSTATE_CATCHUP:
3125  return "catchup";
3126  case WALSNDSTATE_STREAMING:
3127  return "streaming";
3128  case WALSNDSTATE_STOPPING:
3129  return "stopping";
3130  }
3131  return "UNKNOWN";
3132 }
3133 
3134 static Interval *
3136 {
3137  Interval *result = palloc(sizeof(Interval));
3138 
3139  result->month = 0;
3140  result->day = 0;
3141  result->time = offset;
3142 
3143  return result;
3144 }
3145 
3146 /*
3147  * Returns activity of walsenders, including pids and xlog locations sent to
3148  * standby servers.
3149  */
3150 Datum
3152 {
3153 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3154  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3155  TupleDesc tupdesc;
3156  Tuplestorestate *tupstore;
3157  MemoryContext per_query_ctx;
3158  MemoryContext oldcontext;
3159  List *sync_standbys;
3160  int i;
3161 
3162  /* check to see if caller supports us returning a tuplestore */
3163  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3164  ereport(ERROR,
3165  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3166  errmsg("set-valued function called in context that cannot accept a set")));
3167  if (!(rsinfo->allowedModes & SFRM_Materialize))
3168  ereport(ERROR,
3169  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3170  errmsg("materialize mode required, but it is not " \
3171  "allowed in this context")));
3172 
3173  /* Build a tuple descriptor for our result type */
3174  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3175  elog(ERROR, "return type must be a row type");
3176 
3177  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3178  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3179 
3180  tupstore = tuplestore_begin_heap(true, false, work_mem);
3181  rsinfo->returnMode = SFRM_Materialize;
3182  rsinfo->setResult = tupstore;
3183  rsinfo->setDesc = tupdesc;
3184 
3185  MemoryContextSwitchTo(oldcontext);
3186 
3187  /*
3188  * Get the currently active synchronous standbys.
3189  */
3190  LWLockAcquire(SyncRepLock, LW_SHARED);
3191  sync_standbys = SyncRepGetSyncStandbys(NULL);
3192  LWLockRelease(SyncRepLock);
3193 
3194  for (i = 0; i < max_wal_senders; i++)
3195  {
3196  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3198  XLogRecPtr write;
3199  XLogRecPtr flush;
3200  XLogRecPtr apply;
3201  TimeOffset writeLag;
3202  TimeOffset flushLag;
3203  TimeOffset applyLag;
3204  int priority;
3205  int pid;
3208  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3209 
3210  SpinLockAcquire(&walsnd->mutex);
3211  if (walsnd->pid == 0)
3212  {
3213  SpinLockRelease(&walsnd->mutex);
3214  continue;
3215  }
3216  pid = walsnd->pid;
3217  sentPtr = walsnd->sentPtr;
3218  state = walsnd->state;
3219  write = walsnd->write;
3220  flush = walsnd->flush;
3221  apply = walsnd->apply;
3222  writeLag = walsnd->writeLag;
3223  flushLag = walsnd->flushLag;
3224  applyLag = walsnd->applyLag;
3225  priority = walsnd->sync_standby_priority;
3226  SpinLockRelease(&walsnd->mutex);
3227 
3228  memset(nulls, 0, sizeof(nulls));
3229  values[0] = Int32GetDatum(pid);
3230 
3231  if (!superuser())
3232  {
3233  /*
3234  * Only superusers can see details. Other users only get the pid
3235  * value to know it's a walsender, but no details.
3236  */
3237  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3238  }
3239  else
3240  {
3241  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3242 
3243  if (XLogRecPtrIsInvalid(sentPtr))
3244  nulls[2] = true;
3245  values[2] = LSNGetDatum(sentPtr);
3246 
3247  if (XLogRecPtrIsInvalid(write))
3248  nulls[3] = true;
3249  values[3] = LSNGetDatum(write);
3250 
3251  if (XLogRecPtrIsInvalid(flush))
3252  nulls[4] = true;
3253  values[4] = LSNGetDatum(flush);
3254 
3255  if (XLogRecPtrIsInvalid(apply))
3256  nulls[5] = true;
3257  values[5] = LSNGetDatum(apply);
3258 
3259  /*
3260  * Treat a standby such as a pg_basebackup background process
3261  * which always returns an invalid flush location, as an
3262  * asynchronous standby.
3263  */
3264  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3265 
3266  if (writeLag < 0)
3267  nulls[6] = true;
3268  else
3269  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3270 
3271  if (flushLag < 0)
3272  nulls[7] = true;
3273  else
3274  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3275 
3276  if (applyLag < 0)
3277  nulls[8] = true;
3278  else
3279  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3280 
3281  values[9] = Int32GetDatum(priority);
3282 
3283  /*
3284  * More easily understood version of standby state. This is purely
3285  * informational.
3286  *
3287  * In quorum-based sync replication, the role of each standby
3288  * listed in synchronous_standby_names can be changing very
3289  * frequently. Any standbys considered as "sync" at one moment can
3290  * be switched to "potential" ones at the next moment. So, it's
3291  * basically useless to report "sync" or "potential" as their sync
3292  * states. We report just "quorum" for them.
3293  */
3294  if (priority == 0)
3295  values[10] = CStringGetTextDatum("async");
3296  else if (list_member_int(sync_standbys, i))
3298  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3299  else
3300  values[10] = CStringGetTextDatum("potential");
3301  }
3302 
3303  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3304  }
3305 
3306  /* clean up and return the tuplestore */
3307  tuplestore_donestoring(tupstore);
3308 
3309  return (Datum) 0;
3310 }
3311 
3312 /*
3313  * This function is used to send a keepalive message to standby.
3314  * If requestReply is set, sets a flag in the message requesting the standby
3315  * to send a message back to us, for heartbeat purposes.
3316  */
3317 static void
3318 WalSndKeepalive(bool requestReply)
3319 {
3320  elog(DEBUG2, "sending replication keepalive");
3321 
3322  /* construct the message... */
3323  resetStringInfo(&output_message);
3324  pq_sendbyte(&output_message, 'k');
3325  pq_sendint64(&output_message, sentPtr);
3326  pq_sendint64(&output_message, GetCurrentTimestamp());
3327  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3328 
3329  /* ... and send it wrapped in CopyData */
3330  pq_putmessage_noblock('d', output_message.data, output_message.len);
3331 }
3332 
3333 /*
3334  * Send keepalive message if too much time has elapsed.
3335  */
3336 static void
3338 {
3339  TimestampTz ping_time;
3340 
3341  /*
3342  * Don't send keepalive messages if timeouts are globally disabled or
3343  * we're doing something not partaking in timeouts.
3344  */
3345  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3346  return;
3347 
3349  return;
3350 
3351  /*
3352  * If half of wal_sender_timeout has lapsed without receiving any reply
3353  * from the standby, send a keep-alive message to the standby requesting
3354  * an immediate reply.
3355  */
3357  wal_sender_timeout / 2);
3358  if (now >= ping_time)
3359  {
3360  WalSndKeepalive(true);
3362 
3363  /* Try to flush pending output to the client */
3364  if (pq_flush_if_writable() != 0)
3365  WalSndShutdown();
3366  }
3367 }
3368 
3369 /*
3370  * Record the end of the WAL and the time it was flushed locally, so that
3371  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3372  * eventually reported to have been written, flushed and applied by the
3373  * standby in a reply message.
3374  */
3375 static void
3377 {
3378  bool buffer_full;
3379  int new_write_head;
3380  int i;
3381 
3382  if (!am_walsender)
3383  return;
3384 
3385  /*
3386  * If the lsn hasn't advanced since last time, then do nothing. This way
3387  * we only record a new sample when new WAL has been written.
3388  */
3389  if (LagTracker.last_lsn == lsn)
3390  return;
3391  LagTracker.last_lsn = lsn;
3392 
3393  /*
3394  * If advancing the write head of the circular buffer would crash into any
3395  * of the read heads, then the buffer is full. In other words, the
3396  * slowest reader (presumably apply) is the one that controls the release
3397  * of space.
3398  */
3399  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3400  buffer_full = false;
3401  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3402  {
3403  if (new_write_head == LagTracker.read_heads[i])
3404  buffer_full = true;
3405  }
3406 
3407  /*
3408  * If the buffer is full, for now we just rewind by one slot and overwrite
3409  * the last sample, as a simple (if somewhat uneven) way to lower the
3410  * sampling rate. There may be better adaptive compaction algorithms.
3411  */
3412  if (buffer_full)
3413  {
3414  new_write_head = LagTracker.write_head;
3415  if (LagTracker.write_head > 0)
3416  LagTracker.write_head--;
3417  else
3418  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3419  }
3420 
3421  /* Store a sample at the current write head position. */
3422  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3423  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3424  LagTracker.write_head = new_write_head;
3425 }
3426 
3427 /*
3428  * Find out how much time has elapsed between the moment WAL location 'lsn'
3429  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3430  * We have a separate read head for each of the reported LSN locations we
3431  * receive in replies from standby; 'head' controls which read head is
3432  * used. Whenever a read head crosses an LSN which was written into the
3433  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3434  * find out the time this LSN (or an earlier one) was flushed locally, and
3435  * therefore compute the lag.
3436  *
3437  * Return -1 if no new sample data is available, and otherwise the elapsed
3438  * time in microseconds.
3439  */
3440 static TimeOffset
3442 {
3443  TimestampTz time = 0;
3444 
3445  /* Read all unread samples up to this LSN or end of buffer. */
3446  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3447  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3448  {
3449  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3450  LagTracker.last_read[head] =
3451  LagTracker.buffer[LagTracker.read_heads[head]];
3452  LagTracker.read_heads[head] =
3453  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3454  }
3455 
3456  /*
3457  * If the lag tracker is empty, that means the standby has processed
3458  * everything we've ever sent so we should now clear 'last_read'. If we
3459  * didn't do that, we'd risk using a stale and irrelevant sample for
3460  * interpolation at the beginning of the next burst of WAL after a period
3461  * of idleness.
3462  */
3463  if (LagTracker.read_heads[head] == LagTracker.write_head)
3464  LagTracker.last_read[head].time = 0;
3465 
3466  if (time > now)
3467  {
3468  /* If the clock somehow went backwards, treat as not found. */
3469  return -1;
3470  }
3471  else if (time == 0)
3472  {
3473  /*
3474  * We didn't cross a time. If there is a future sample that we
3475  * haven't reached yet, and we've already reached at least one sample,
3476  * let's interpolate the local flushed time. This is mainly useful
3477  * for reporting a completely stuck apply position as having
3478  * increasing lag, since otherwise we'd have to wait for it to
3479  * eventually start moving again and cross one of our samples before
3480  * we can show the lag increasing.
3481  */
3482  if (LagTracker.read_heads[head] == LagTracker.write_head)
3483  {
3484  /* There are no future samples, so we can't interpolate. */
3485  return -1;
3486  }
3487  else if (LagTracker.last_read[head].time != 0)
3488  {
3489  /* We can interpolate between last_read and the next sample. */
3490  double fraction;
3491  WalTimeSample prev = LagTracker.last_read[head];
3492  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3493 
3494  if (lsn < prev.lsn)
3495  {
3496  /*
3497  * Reported LSNs shouldn't normally go backwards, but it's
3498  * possible when there is a timeline change. Treat as not
3499  * found.
3500  */
3501  return -1;
3502  }
3503 
3504  Assert(prev.lsn < next.lsn);
3505 
3506  if (prev.time > next.time)
3507  {
3508  /* If the clock somehow went backwards, treat as not found. */
3509  return -1;
3510  }
3511 
3512  /* See how far we are between the previous and next samples. */
3513  fraction =
3514  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3515 
3516  /* Scale the local flush time proportionally. */
3517  time = (TimestampTz)
3518  ((double) prev.time + (next.time - prev.time) * fraction);
3519  }
3520  else
3521  {
3522  /*
3523  * We have only a future sample, implying that we were entirely
3524  * caught up but and now there is a new burst of WAL and the
3525  * standby hasn't processed the first sample yet. Until the
3526  * standby reaches the future sample the best we can do is report
3527  * the hypothetical lag if that sample were to be replayed now.
3528  */
3529  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3530  }
3531  }
3532 
3533  /* Return the elapsed time since local flush time in microseconds. */
3534  Assert(time != 0);
3535  return now - time;
3536 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void InitWalSenderSlot(void)
Definition: walsender.c:2212
void InitializeTimeouts(void)
Definition: timeout.c:340
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:69
#define XLogSegSize
Definition: xlog_internal.h:92
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void ProcessStandbyMessage(void)
Definition: walsender.c:1675
#define SIGUSR1
Definition: win32.h:202
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
XLogRecPtr startpoint
Definition: replnodes.h:84
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:39
int wal_sender_timeout
Definition: walsender.c:122
#define SIGCONT
Definition: win32.h:197
uint32 TimeLineID
Definition: xlogdefs.h:45
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:210
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
struct Port * MyProcPort
Definition: globals.c:41
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
uint32 TransactionId
Definition: c.h:397
bool wake_wal_senders
Definition: walsender.c:129
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1416
bool update_process_title
Definition: ps_status.c:35
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
TransactionId xmin
Definition: proc.h:213
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2924
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:427
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:782
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1029
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
#define SIGWINCH
Definition: win32.h:201
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
Size WalSndShmemSize(void)
Definition: walsender.c:2960
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
void CommitTransactionCommand(void)
Definition: xact.c:2748
#define XACT_REPEATABLE_READ
Definition: xact.h:30
#define SpinLockInit(lock)
Definition: spin.h:60
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2185
uint8 syncrep_method
Definition: syncrep.h:51
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static int sendFile
Definition: walsender.c:135
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:370
static void XLogSendPhysical(void)
Definition: walsender.c:2476
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
#define SIGTTIN
Definition: win32.h:199
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:220
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
#define INT4OID
Definition: pg_type.h:316
Definition: nodes.h:509
static StringInfoData output_message
Definition: walsender.c:160
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
struct cursor * cur
Definition: ecpg.c:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:857
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2273
return result
Definition: formatting.c:1633
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2809
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:577
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8227
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:680
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:216
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:217
TimestampTz time
Definition: walsender.c:204
int sleeptime
Definition: pg_standby.c:40
ReplicationSlotPersistentData data
Definition: slot.h:115
TimeOffset writeLag
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7878
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
XLogRecPtr last_lsn
Definition: walsender.c:213
void list_free_deep(List *list)
Definition: list.c:1147
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:118
#define PG_BINARY
Definition: c.h:1038
#define SIGQUIT
Definition: win32.h:189
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7894
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2648
Latch procLatch
Definition: proc.h:103
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:799
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr confirmed_flush
Definition: slot.h:76
PGXACT * MyPgXact
Definition: proc.c:68
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
int32 day
Definition: timestamp.h:47
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1879
TimeLineID timeline
Definition: replnodes.h:96
ReplicationKind kind
Definition: replnodes.h:56
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:748
bool IsTransactionBlock(void)
Definition: xact.c:4305
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:635
bool am_walsender
Definition: walsender.c:114
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void HandleWalSndInitStopping(void)
Definition: walsender.c:2902
void ReplicationSlotReserveWal(void)
Definition: slot.c:924
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:432
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:680
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8296
ReplicationKind kind
Definition: replnodes.h:81
void WalSndRqstFileReload(void)
Definition: walsender.c:2879
#define SIG_IGN
Definition: win32.h:185
void ConditionVariableCancelSleep(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3318
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
void pq_startmsgread(void)
Definition: pqcomm.c:1210
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1706
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3765
bool FirstSnapshotSet
Definition: snapmgr.c:203
static struct @27 LagTracker
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void WalSndSignals(void)
Definition: walsender.c:2936
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:686
Latch * latch
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11094
void ReplicationSlotPersist(void)
Definition: slot.c:612
TransactionId effective_xmin
Definition: slot.h:111
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1040
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
static TimeLineID curFileTimeLine
Definition: walsender.c:140
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define XLogFilePath(path, tli, logSegNo)
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2056
#define DEBUG2
Definition: elog.h:24
bool XLogBackgroundFlush(void)
Definition: xlog.c:2946
bool list_member_int(const List *list, int datum)
Definition: list.c:485
Definition: latch.h:110
Definition: dest.h:88
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
WalSndState state
NodeTag type
Definition: nodes.h:511
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10136
static char * buf
Definition: pg_test_fsync.c:66
void WalSndWaitStopping(void)
Definition: walsender.c:3059
static bool streamingDoneSending
Definition: walsender.c:179
uint64 XLogSegNo
Definition: xlogdefs.h:34
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:598
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:523
TransactionId catalog_xmin
Definition: slot.h:65
#define pq_flush_if_writable()
Definition: libpq.h:40
TimeOffset time
Definition: timestamp.h:45
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:175
#define InvalidTransactionId
Definition: transam.h:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:245
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2688
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1151
static XLogRecPtr logical_startptr
Definition: walsender.c:198
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:373
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2301
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TransactionId xmin
Definition: slot.h:57
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
#define SlotIsLogical(slot)
Definition: slot.h:134
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1791
void SyncRepInitConfig(void)
Definition: syncrep.c:384
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:183
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3376
XLogRecPtr lsn
Definition: walsender.c:203
int replication_yyparse(void)
Definition: guc.h:72
int32 month
Definition: timestamp.h:48
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
int max_wal_senders
Definition: walsender.c:120
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2075
int CloseTransientFile(int fd)
Definition: fd.c:2305
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1124
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1272
static StringInfoData reply_message
Definition: walsender.c:161
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:181
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1267
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3441
void WalSndErrorCleanup(void)
Definition: walsender.c:291
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:224
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:372
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3116
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
bool am_db_walsender
Definition: walsender.c:117
TransactionId effective_catalog_xmin
Definition: slot.h:112
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1040
Oid MyDatabaseId
Definition: globals.c:77
static void WalSndShutdown(void)
Definition: walsender.c:228
static TimeLineID receiveTLI
Definition: xlog.c:201
int work_mem
Definition: globals.c:113
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:111
static XLogRecPtr sentPtr
Definition: walsender.c:157
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
void ReplicationSlotDrop(const char *name)
Definition: slot.c:454
void pq_endmsgread(void)
Definition: pqcomm.c:1234
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:179
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define SIG_DFL
Definition: win32.h:183
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:343
int allowedModes
Definition: execnodes.h:268
void WalSndInitStopping(void)
Definition: walsender.c:3033
TimeLineID currTLI
Definition: xlogreader.h:165
#define INT8OID
Definition: pg_type.h:304
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3337
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:46
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2006
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:229
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2048
#define lfirst(lc)
Definition: pg_list.h:106
void WalSndSetState(WalSndState state)
Definition: walsender.c:3097
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
int sync_standby_priority
Definition: regguts.h:298
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:475
static void XLogSendLogical(void)
Definition: walsender.c:2739
XLogRecPtr restart_lsn
Definition: slot.h:68
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
void StartTransactionCommand(void)
Definition: xact.c:2678
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1738
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
char * dbname
Definition: streamutil.c:39
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208
int XactIsoLevel
Definition: xact.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:914
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
WalSndState
static XLogSegNo sendSegNo
Definition: walsender.c:136
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3151
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:835
static bool streamingDoneReceiving
Definition: walsender.c:180
#define BYTEAOID
Definition: pg_type.h:292
Tuplestorestate * setResult
Definition: execnodes.h:273
#define pg_attribute_noreturn()
Definition: c.h:653
static StringInfoData tmpbuf
Definition: walsender.c:162
#define SIGTTOU
Definition: win32.h:200
bool IsSubTransaction(void)
Definition: xact.c:4377
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:545
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:266
bool log_replication_commands
Definition: walsender.c:124
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4695
#define Int32GetDatum(X)
Definition: postgres.h:485
char * application_name
Definition: guc.c:469
TupleDesc setDesc
Definition: execnodes.h:274
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:115
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogReaderState * reader
Definition: logical.h:44
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
void die(SIGNAL_ARGS)
Definition: postgres.c:2617
CRSSnapshotAction
Definition: walsender.h:22
StringInfo out
Definition: logical.h:66
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
int i
void WalSndShmemInit(void)
Definition: walsender.c:2972
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define NameStr(name)
Definition: c.h:499
TimeLineID timeline
Definition: replnodes.h:83
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1830
struct Latch * MyLatch
Definition: globals.c:52
void ReplicationSlotCleanup(void)
Definition: slot.c:427
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
char * defname
Definition: parsenodes.h:719
static uint32 sendOff
Definition: walsender.c:137
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2849
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
#define elog
Definition: elog.h:219
void LWLockReleaseAll(void)
Definition: lwlock.c:1814
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3135
slock_t mutex
Definition: slot.h:88
#define close(a)
Definition: win32.h:12
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:261
CommandDest whereToSendOutput
Definition: postgres.c:88
#define SIGCHLD
Definition: win32.h:198
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
#define XLByteInSeg(xlrp, logSegNo)
static bool waiting_for_ping_response
Definition: walsender.c:171
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:400
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:338
Definition: pg_list.h:45
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2556
#define UINT64_FORMAT
Definition: c.h:316
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:634
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:412
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
int write_head
Definition: walsender.c:215
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1573
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
#define MAX_SEND_SIZE
Definition: walsender.c:105
#define read(a, b, c)
Definition: win32.h:13
#define SIGUSR2
Definition: win32.h:203
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
#define offsetof(type, field)
Definition: c.h:555
void WalSndWakeup(void)
Definition: walsender.c:3004
void ReplicationSlotMarkDirty(void)
Definition: slot.c:595
bool am_cascading_walsender
Definition: walsender.c:115
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1907
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:696
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1241
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3155
int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:936
void replication_scanner_init(const char *query_string)
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define TLHistoryFilePath(path, tli)