PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until the either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, postmaster sends us SIGUSR2 after all
28  * regular backends have exited and the shutdown checkpoint has been written.
29  * This instructs walsender to send any outstanding WAL, including the
30  * shutdown checkpoint record, wait for it to be replicated to the standby,
31  * and then exit.
32  *
33  *
34  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
35  *
36  * IDENTIFICATION
37  * src/backend/replication/walsender.c
38  *
39  *-------------------------------------------------------------------------
40  */
41 #include "postgres.h"
42 
43 #include <signal.h>
44 #include <unistd.h>
45 
46 #include "access/printtup.h"
47 #include "access/timeline.h"
48 #include "access/transam.h"
49 #include "access/xact.h"
50 #include "access/xlog_internal.h"
51 #include "access/xlogutils.h"
52 
53 #include "catalog/pg_type.h"
54 #include "commands/dbcommands.h"
55 #include "commands/defrem.h"
56 #include "funcapi.h"
57 #include "libpq/libpq.h"
58 #include "libpq/pqformat.h"
59 #include "miscadmin.h"
60 #include "nodes/replnodes.h"
61 #include "pgstat.h"
62 #include "replication/basebackup.h"
63 #include "replication/decode.h"
64 #include "replication/logical.h"
66 #include "replication/slot.h"
67 #include "replication/snapbuild.h"
68 #include "replication/syncrep.h"
70 #include "replication/walsender.h"
73 #include "storage/fd.h"
74 #include "storage/ipc.h"
75 #include "storage/pmsignal.h"
76 #include "storage/proc.h"
77 #include "storage/procarray.h"
78 #include "tcop/dest.h"
79 #include "tcop/tcopprot.h"
80 #include "utils/builtins.h"
81 #include "utils/guc.h"
82 #include "utils/memutils.h"
83 #include "utils/pg_lsn.h"
84 #include "utils/portal.h"
85 #include "utils/ps_status.h"
86 #include "utils/resowner.h"
87 #include "utils/timeout.h"
88 #include "utils/timestamp.h"
89 
90 /*
91  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
92  *
93  * We don't have a good idea of what a good value would be; there's some
94  * overhead per message in both walsender and walreceiver, but on the other
95  * hand sending large batches makes walsender less responsive to signals
96  * because signals are checked only between messages. 128kB (with
97  * default 8k blocks) seems like a reasonable guess for now.
98  */
99 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
100 
101 /* Array of WalSnds in shared memory */
103 
104 /* My slot in the shared memory array */
106 
107 /* Global state */
108 bool am_walsender = false; /* Am I a walsender process? */
109 bool am_cascading_walsender = false; /* Am I cascading WAL to
110  * another standby? */
111 bool am_db_walsender = false; /* Connected to a database? */
112 
113 /* User-settable parameters for walsender */
114 int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
115 int wal_sender_timeout = 60 * 1000; /* maximum time to send one
116  * WAL data message */
118 
119 /*
120  * State for WalSndWakeupRequest
121  */
122 bool wake_wal_senders = false;
123 
124 /*
125  * These variables are used similarly to openLogFile/SegNo/Off,
126  * but for walsender to read the XLOG.
127  */
128 static int sendFile = -1;
129 static XLogSegNo sendSegNo = 0;
130 static uint32 sendOff = 0;
131 
132 /* Timeline ID of the currently open file */
134 
135 /*
136  * These variables keep track of the state of the timeline we're currently
137  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
138  * the timeline is not the latest timeline on this server, and the server's
139  * history forked off from that timeline at sendTimeLineValidUpto.
140  */
143 static bool sendTimeLineIsHistoric = false;
145 
146 /*
147  * How far have we sent WAL already? This is also advertised in
148  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
149  */
150 static XLogRecPtr sentPtr = 0;
151 
152 /* Buffers for constructing outgoing messages and processing reply messages. */
156 
157 /*
158  * Timestamp of the last receipt of the reply from the standby. Set to 0 if
159  * wal_sender_timeout doesn't need to be active.
160  */
162 
163 /* Have we sent a heartbeat message asking for reply, since last reply? */
164 static bool waiting_for_ping_response = false;
165 
166 /*
167  * While streaming WAL in Copy mode, streamingDoneSending is set to true
168  * after we have sent CopyDone. We should not send any more CopyData messages
169  * after that. streamingDoneReceiving is set to true when we receive CopyDone
170  * from the other end. When both become true, it's time to exit Copy mode.
171  */
174 
175 /* Are we there yet? */
176 static bool WalSndCaughtUp = false;
177 
178 /* Flags set by signal handlers for later service in main loop */
179 static volatile sig_atomic_t got_SIGHUP = false;
180 static volatile sig_atomic_t walsender_ready_to_stop = false;
181 
182 /*
183  * This is set while we are streaming. When not set, SIGUSR2 signal will be
184  * handled like SIGTERM. When set, the main loop is responsible for checking
185  * walsender_ready_to_stop and terminating when it's set (after streaming any
186  * remaining WAL).
187  */
188 static volatile sig_atomic_t replication_active = false;
189 
192 
193 /* A sample associating a log position with the time it was written. */
194 typedef struct
195 {
198 } WalTimeSample;
199 
200 /* The size of our buffer of time samples. */
201 #define LAG_TRACKER_BUFFER_SIZE 8192
202 
203 /* A mechanism for tracking replication lag. */
204 static struct
205 {
211 } LagTracker;
212 
213 /* Signal handlers */
214 static void WalSndSigHupHandler(SIGNAL_ARGS);
217 
218 /* Prototypes for private functions */
219 typedef void (*WalSndSendDataCallback) (void);
220 static void WalSndLoop(WalSndSendDataCallback send_data);
221 static void InitWalSenderSlot(void);
222 static void WalSndKill(int code, Datum arg);
224 static void XLogSendPhysical(void);
225 static void XLogSendLogical(void);
226 static void WalSndDone(WalSndSendDataCallback send_data);
227 static XLogRecPtr GetStandbyFlushRecPtr(void);
228 static void IdentifySystem(void);
231 static void StartReplication(StartReplicationCmd *cmd);
233 static void ProcessStandbyMessage(void);
234 static void ProcessStandbyReplyMessage(void);
235 static void ProcessStandbyHSFeedbackMessage(void);
236 static void ProcessRepliesIfAny(void);
237 static void WalSndKeepalive(bool requestReply);
239 static void WalSndCheckTimeOut(TimestampTz now);
240 static long WalSndComputeSleeptime(TimestampTz now);
241 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
242 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
244 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
245 
246 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
247 
248 
249 /* Initialize walsender process before entering the main command loop */
250 void
251 InitWalSender(void)
252 {
254 
255  /* Create a per-walsender data structure in shared memory */
257 
258  /* Set up resource owner */
259  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
260 
261  /*
262  * Let postmaster know that we're a WAL sender. Once we've declared us as
263  * a WAL sender process, postmaster will let us outlive the bgwriter and
264  * kill us last in the shutdown sequence, so we get a chance to stream all
265  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
266  * there's no going back, and we mustn't write any WAL records after this.
267  */
270 
271  /* Initialize empty timestamp buffer for lag tracking. */
272  memset(&LagTracker, 0, sizeof(LagTracker));
273 }
274 
275 /*
276  * Clean up after an error.
277  *
278  * WAL sender processes don't use transactions like regular backends do.
279  * This function does any cleanup requited after an error in a WAL sender
280  * process, similar to what transaction abort does in a regular backend.
281  */
282 void
284 {
288 
289  if (sendFile >= 0)
290  {
291  close(sendFile);
292  sendFile = -1;
293  }
294 
295  if (MyReplicationSlot != NULL)
297 
299 
300  replication_active = false;
302  proc_exit(0);
303 
304  /* Revert back to startup state */
306 }
307 
308 /*
309  * Handle a client's connection abort in an orderly manner.
310  */
311 static void
312 WalSndShutdown(void)
313 {
314  /*
315  * Reset whereToSendOutput to prevent ereport from attempting to send any
316  * more messages to the standby.
317  */
320 
321  proc_exit(0);
322  abort(); /* keep the compiler quiet */
323 }
324 
325 /*
326  * Handle the IDENTIFY_SYSTEM command.
327  */
328 static void
330 {
331  char sysid[32];
332  char xpos[MAXFNAMELEN];
333  XLogRecPtr logptr;
334  char *dbname = NULL;
335  DestReceiver *dest;
336  TupOutputState *tstate;
337  TupleDesc tupdesc;
338  Datum values[4];
339  bool nulls[4];
340 
341  /*
342  * Reply with a result set with one row, four columns. First col is system
343  * ID, second is timeline ID, third is current xlog location and the
344  * fourth contains the database name if we are connected to one.
345  */
346 
347  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
349 
352  {
353  /* this also updates ThisTimeLineID */
354  logptr = GetStandbyFlushRecPtr();
355  }
356  else
357  logptr = GetFlushRecPtr();
358 
359  snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
360 
361  if (MyDatabaseId != InvalidOid)
362  {
364 
365  /* syscache access needs a transaction env. */
367  /* make dbname live outside TX context */
371  /* CommitTransactionCommand switches to TopMemoryContext */
373  }
374 
376  MemSet(nulls, false, sizeof(nulls));
377 
378  /* need a tuple descriptor representing four columns */
379  tupdesc = CreateTemplateTupleDesc(4, false);
380  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
381  TEXTOID, -1, 0);
382  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
383  INT4OID, -1, 0);
384  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
385  TEXTOID, -1, 0);
386  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
387  TEXTOID, -1, 0);
388 
389  /* prepare for projection of tuples */
390  tstate = begin_tup_output_tupdesc(dest, tupdesc);
391 
392  /* column 1: system identifier */
393  values[0] = CStringGetTextDatum(sysid);
394 
395  /* column 2: timeline */
396  values[1] = Int32GetDatum(ThisTimeLineID);
397 
398  /* column 3: xlog position */
399  values[2] = CStringGetTextDatum(xpos);
400 
401  /* column 4: database name, or NULL if none */
402  if (dbname)
403  values[3] = CStringGetTextDatum(dbname);
404  else
405  nulls[3] = true;
406 
407  /* send it to dest */
408  do_tup_output(tstate, values, nulls);
409 
410  end_tup_output(tstate);
411 }
412 
413 
414 /*
415  * Handle TIMELINE_HISTORY command.
416  */
417 static void
419 {
421  char histfname[MAXFNAMELEN];
422  char path[MAXPGPATH];
423  int fd;
424  off_t histfilelen;
425  off_t bytesleft;
426  Size len;
427 
428  /*
429  * Reply with a result set with one row, and two columns. The first col is
430  * the name of the history file, 2nd is the contents.
431  */
432 
433  TLHistoryFileName(histfname, cmd->timeline);
434  TLHistoryFilePath(path, cmd->timeline);
435 
436  /* Send a RowDescription message */
437  pq_beginmessage(&buf, 'T');
438  pq_sendint(&buf, 2, 2); /* 2 fields */
439 
440  /* first field */
441  pq_sendstring(&buf, "filename"); /* col name */
442  pq_sendint(&buf, 0, 4); /* table oid */
443  pq_sendint(&buf, 0, 2); /* attnum */
444  pq_sendint(&buf, TEXTOID, 4); /* type oid */
445  pq_sendint(&buf, -1, 2); /* typlen */
446  pq_sendint(&buf, 0, 4); /* typmod */
447  pq_sendint(&buf, 0, 2); /* format code */
448 
449  /* second field */
450  pq_sendstring(&buf, "content"); /* col name */
451  pq_sendint(&buf, 0, 4); /* table oid */
452  pq_sendint(&buf, 0, 2); /* attnum */
453  pq_sendint(&buf, BYTEAOID, 4); /* type oid */
454  pq_sendint(&buf, -1, 2); /* typlen */
455  pq_sendint(&buf, 0, 4); /* typmod */
456  pq_sendint(&buf, 0, 2); /* format code */
457  pq_endmessage(&buf);
458 
459  /* Send a DataRow message */
460  pq_beginmessage(&buf, 'D');
461  pq_sendint(&buf, 2, 2); /* # of columns */
462  len = strlen(histfname);
463  pq_sendint(&buf, len, 4); /* col1 len */
464  pq_sendbytes(&buf, histfname, len);
465 
466  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
467  if (fd < 0)
468  ereport(ERROR,
470  errmsg("could not open file \"%s\": %m", path)));
471 
472  /* Determine file length and send it to client */
473  histfilelen = lseek(fd, 0, SEEK_END);
474  if (histfilelen < 0)
475  ereport(ERROR,
477  errmsg("could not seek to end of file \"%s\": %m", path)));
478  if (lseek(fd, 0, SEEK_SET) != 0)
479  ereport(ERROR,
481  errmsg("could not seek to beginning of file \"%s\": %m", path)));
482 
483  pq_sendint(&buf, histfilelen, 4); /* col2 len */
484 
485  bytesleft = histfilelen;
486  while (bytesleft > 0)
487  {
488  char rbuf[BLCKSZ];
489  int nread;
490 
492  nread = read(fd, rbuf, sizeof(rbuf));
494  if (nread <= 0)
495  ereport(ERROR,
497  errmsg("could not read file \"%s\": %m",
498  path)));
499  pq_sendbytes(&buf, rbuf, nread);
500  bytesleft -= nread;
501  }
502  CloseTransientFile(fd);
503 
504  pq_endmessage(&buf);
505 }
506 
507 /*
508  * Handle START_REPLICATION command.
509  *
510  * At the moment, this never returns, but an ereport(ERROR) will take us back
511  * to the main loop.
512  */
513 static void
515 {
517  XLogRecPtr FlushPtr;
518 
519  if (ThisTimeLineID == 0)
520  ereport(ERROR,
521  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
522  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
523 
524  /*
525  * We assume here that we're logging enough information in the WAL for
526  * log-shipping, since this is checked in PostmasterMain().
527  *
528  * NOTE: wal_level can only change at shutdown, so in most cases it is
529  * difficult for there to be WAL data that we can still see that was
530  * written at wal_level='minimal'.
531  */
532 
533  if (cmd->slotname)
534  {
537  ereport(ERROR,
538  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
539  (errmsg("cannot use a logical replication slot for physical replication"))));
540  }
541 
542  /*
543  * Select the timeline. If it was given explicitly by the client, use
544  * that. Otherwise use the timeline of the last replayed record, which is
545  * kept in ThisTimeLineID.
546  */
548  {
549  /* this also updates ThisTimeLineID */
550  FlushPtr = GetStandbyFlushRecPtr();
551  }
552  else
553  FlushPtr = GetFlushRecPtr();
554 
555  if (cmd->timeline != 0)
556  {
557  XLogRecPtr switchpoint;
558 
559  sendTimeLine = cmd->timeline;
561  {
562  sendTimeLineIsHistoric = false;
564  }
565  else
566  {
567  List *timeLineHistory;
568 
569  sendTimeLineIsHistoric = true;
570 
571  /*
572  * Check that the timeline the client requested for exists, and
573  * the requested start location is on that timeline.
574  */
575  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
576  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
578  list_free_deep(timeLineHistory);
579 
580  /*
581  * Found the requested timeline in the history. Check that
582  * requested startpoint is on that timeline in our history.
583  *
584  * This is quite loose on purpose. We only check that we didn't
585  * fork off the requested timeline before the switchpoint. We
586  * don't check that we switched *to* it before the requested
587  * starting point. This is because the client can legitimately
588  * request to start replication from the beginning of the WAL
589  * segment that contains switchpoint, but on the new timeline, so
590  * that it doesn't end up with a partial segment. If you ask for a
591  * too old starting point, you'll get an error later when we fail
592  * to find the requested WAL segment in pg_wal.
593  *
594  * XXX: we could be more strict here and only allow a startpoint
595  * that's older than the switchpoint, if it's still in the same
596  * WAL segment.
597  */
598  if (!XLogRecPtrIsInvalid(switchpoint) &&
599  switchpoint < cmd->startpoint)
600  {
601  ereport(ERROR,
602  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
603  (uint32) (cmd->startpoint >> 32),
604  (uint32) (cmd->startpoint),
605  cmd->timeline),
606  errdetail("This server's history forked from timeline %u at %X/%X.",
607  cmd->timeline,
608  (uint32) (switchpoint >> 32),
609  (uint32) (switchpoint))));
610  }
611  sendTimeLineValidUpto = switchpoint;
612  }
613  }
614  else
615  {
618  sendTimeLineIsHistoric = false;
619  }
620 
622 
623  /* If there is nothing to stream, don't even enter COPY mode */
625  {
626  /*
627  * When we first start replication the standby will be behind the
628  * primary. For some applications, for example, synchronous
629  * replication, it is important to have a clear state for this initial
630  * catchup mode, so we can trigger actions when we change streaming
631  * state later. We may stay in this state for a long time, which is
632  * exactly why we want to be able to monitor whether or not we are
633  * still here.
634  */
636 
637  /* Send a CopyBothResponse message, and start streaming */
638  pq_beginmessage(&buf, 'W');
639  pq_sendbyte(&buf, 0);
640  pq_sendint(&buf, 0, 2);
641  pq_endmessage(&buf);
642  pq_flush();
643 
644  /*
645  * Don't allow a request to stream from a future point in WAL that
646  * hasn't been flushed to disk in this server yet.
647  */
648  if (FlushPtr < cmd->startpoint)
649  {
650  ereport(ERROR,
651  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
652  (uint32) (cmd->startpoint >> 32),
653  (uint32) (cmd->startpoint),
654  (uint32) (FlushPtr >> 32),
655  (uint32) (FlushPtr))));
656  }
657 
658  /* Start streaming from the requested point */
659  sentPtr = cmd->startpoint;
660 
661  /* Initialize shared memory status, too */
662  {
663  WalSnd *walsnd = MyWalSnd;
664 
665  SpinLockAcquire(&walsnd->mutex);
666  walsnd->sentPtr = sentPtr;
667  SpinLockRelease(&walsnd->mutex);
668  }
669 
671 
672  /* Main loop of walsender */
673  replication_active = true;
674 
676 
677  replication_active = false;
679  proc_exit(0);
681 
683  }
684 
685  if (cmd->slotname)
687 
688  /*
689  * Copy is finished now. Send a single-row result set indicating the next
690  * timeline.
691  */
693  {
694  char startpos_str[8 + 1 + 8 + 1];
695  DestReceiver *dest;
696  TupOutputState *tstate;
697  TupleDesc tupdesc;
698  Datum values[2];
699  bool nulls[2];
700 
701  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
702  (uint32) (sendTimeLineValidUpto >> 32),
704 
706  MemSet(nulls, false, sizeof(nulls));
707 
708  /*
709  * Need a tuple descriptor representing two columns.
710  * int8 may seem like a surprising data type for this, but in theory
711  * int4 would not be wide enough for this, as TimeLineID is unsigned.
712  */
713  tupdesc = CreateTemplateTupleDesc(2, false);
714  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
715  INT8OID, -1, 0);
716  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
717  TEXTOID, -1, 0);
718 
719  /* prepare for projection of tuple */
720  tstate = begin_tup_output_tupdesc(dest, tupdesc);
721 
722  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
723  values[1] = CStringGetTextDatum(startpos_str);
724 
725  /* send it to dest */
726  do_tup_output(tstate, values, nulls);
727 
728  end_tup_output(tstate);
729  }
730 
731  /* Send CommandComplete message */
732  pq_puttextmessage('C', "START_STREAMING");
733 }
734 
735 /*
736  * read_page callback for logical decoding contexts, as a walsender process.
737  *
738  * Inside the walsender we can do better than logical_read_local_xlog_page,
739  * which has to do a plain sleep/busy loop, because the walsender's latch gets
740  * set every time WAL is flushed.
741  */
742 static int
744  XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
745 {
746  XLogRecPtr flushptr;
747  int count;
748 
749  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
751  sendTimeLine = state->currTLI;
753  sendTimeLineNextTLI = state->nextTLI;
754 
755  /* make sure we have enough WAL available */
756  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
757 
758  /* more than one block available */
759  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
760  count = XLOG_BLCKSZ;
761  /* not enough WAL synced, that can happen during shutdown */
762  else if (targetPagePtr + reqLen > flushptr)
763  return -1;
764  /* part of the page available */
765  else
766  count = flushptr - targetPagePtr;
767 
768  /* now actually read the data, we know it's there */
769  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
770 
771  return count;
772 }
773 
774 /*
775  * Process extra options given to CREATE_REPLICATION_SLOT.
776  */
777 static void
779  bool *reserve_wal,
780  CRSSnapshotAction *snapshot_action)
781 {
782  ListCell *lc;
783  bool snapshot_action_given = false;
784  bool reserve_wal_given = false;
785 
786  /* Parse options */
787  foreach (lc, cmd->options)
788  {
789  DefElem *defel = (DefElem *) lfirst(lc);
790 
791  if (strcmp(defel->defname, "export_snapshot") == 0)
792  {
793  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
794  ereport(ERROR,
795  (errcode(ERRCODE_SYNTAX_ERROR),
796  errmsg("conflicting or redundant options")));
797 
798  snapshot_action_given = true;
799  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
801  }
802  else if (strcmp(defel->defname, "use_snapshot") == 0)
803  {
804  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
805  ereport(ERROR,
806  (errcode(ERRCODE_SYNTAX_ERROR),
807  errmsg("conflicting or redundant options")));
808 
809  snapshot_action_given = true;
810  *snapshot_action = CRS_USE_SNAPSHOT;
811  }
812  else if (strcmp(defel->defname, "reserve_wal") == 0)
813  {
814  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
815  ereport(ERROR,
816  (errcode(ERRCODE_SYNTAX_ERROR),
817  errmsg("conflicting or redundant options")));
818 
819  reserve_wal_given = true;
820  *reserve_wal = true;
821  }
822  else
823  elog(ERROR, "unrecognized option: %s", defel->defname);
824  }
825 }
826 
827 /*
828  * Create a new replication slot.
829  */
830 static void
832 {
833  const char *snapshot_name = NULL;
834  char xpos[MAXFNAMELEN];
835  char *slot_name;
836  bool reserve_wal = false;
837  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
838  DestReceiver *dest;
839  TupOutputState *tstate;
840  TupleDesc tupdesc;
841  Datum values[4];
842  bool nulls[4];
843 
845 
846  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
847 
848  /* setup state for XLogReadPage */
849  sendTimeLineIsHistoric = false;
851 
852  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
853  {
854  ReplicationSlotCreate(cmd->slotname, false,
856  }
857  else
858  {
860 
861  /*
862  * Initially create persistent slot as ephemeral - that allows us to
863  * nicely handle errors during initialization because it'll get
864  * dropped if this transaction fails. We'll make it persistent at the
865  * end. Temporary slots can be created as temporary from beginning as
866  * they get dropped on error as well.
867  */
868  ReplicationSlotCreate(cmd->slotname, true,
870  }
871 
872  if (cmd->kind == REPLICATION_KIND_LOGICAL)
873  {
875 
876  /*
877  * Do options check early so that we can bail before calling the
878  * DecodingContextFindStartpoint which can take long time.
879  */
880  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
881  {
882  if (IsTransactionBlock())
883  ereport(ERROR,
884  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
885  "must not be called inside a transaction")));
886  }
887  else if (snapshot_action == CRS_USE_SNAPSHOT)
888  {
889  if (!IsTransactionBlock())
890  ereport(ERROR,
891  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
892  "must be called inside a transaction")));
893 
895  ereport(ERROR,
896  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
897  "must be called in REPEATABLE READ isolation mode transaction")));
898 
899  if (FirstSnapshotSet)
900  ereport(ERROR,
901  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
902  "must be called before any query")));
903 
904  if (IsSubTransaction())
905  ereport(ERROR,
906  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
907  "must not be called in a subtransaction")));
908  }
909 
913 
914  /*
915  * Signal that we don't need the timeout mechanism. We're just
916  * creating the replication slot and don't yet accept feedback
917  * messages or send keepalives. As we possibly need to wait for
918  * further WAL the walsender would otherwise possibly be killed too
919  * soon.
920  */
922 
923  /* build initial snapshot, might take a while */
925 
926  /*
927  * Export or use the snapshot if we've been asked to do so.
928  *
929  * NB. We will convert the snapbuild.c kind of snapshot to normal
930  * snapshot when doing this.
931  */
932  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
933  {
934  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
935  }
936  else if (snapshot_action == CRS_USE_SNAPSHOT)
937  {
938  Snapshot snap;
939 
942  }
943 
944  /* don't need the decoding context anymore */
945  FreeDecodingContext(ctx);
946 
947  if (!cmd->temporary)
949  }
950  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
951  {
953 
955 
956  /* Write this slot to disk if it's permanent one. */
957  if (!cmd->temporary)
959  }
960 
961  snprintf(xpos, sizeof(xpos), "%X/%X",
964 
966  MemSet(nulls, false, sizeof(nulls));
967 
968  /*----------
969  * Need a tuple descriptor representing four columns:
970  * - first field: the slot name
971  * - second field: LSN at which we became consistent
972  * - third field: exported snapshot's name
973  * - fourth field: output plugin
974  *----------
975  */
976  tupdesc = CreateTemplateTupleDesc(4, false);
977  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
978  TEXTOID, -1, 0);
979  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
980  TEXTOID, -1, 0);
981  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
982  TEXTOID, -1, 0);
983  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
984  TEXTOID, -1, 0);
985 
986  /* prepare for projection of tuples */
987  tstate = begin_tup_output_tupdesc(dest, tupdesc);
988 
989  /* slot_name */
990  slot_name = NameStr(MyReplicationSlot->data.name);
991  values[0] = CStringGetTextDatum(slot_name);
992 
993  /* consistent wal location */
994  values[1] = CStringGetTextDatum(xpos);
995 
996  /* snapshot name, or NULL if none */
997  if (snapshot_name != NULL)
998  values[2] = CStringGetTextDatum(snapshot_name);
999  else
1000  nulls[2] = true;
1001 
1002  /* plugin, or NULL if none */
1003  if (cmd->plugin != NULL)
1004  values[3] = CStringGetTextDatum(cmd->plugin);
1005  else
1006  nulls[3] = true;
1007 
1008  /* send it to dest */
1009  do_tup_output(tstate, values, nulls);
1010  end_tup_output(tstate);
1011 
1013 }
1014 
1015 /*
1016  * Get rid of a replication slot that is no longer wanted.
1017  */
1018 static void
1020 {
1022  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1023 }
1024 
1025 /*
1026  * Load previously initiated logical slot and prepare for sending data (via
1027  * WalSndLoop).
1028  */
1029 static void
1031 {
1033 
1034  /* make sure that our requirements are still fulfilled */
1036 
1038 
1040 
1041  /*
1042  * Force a disconnect, so that the decoding code doesn't need to care
1043  * about an eventual switch from running in recovery, to running in a
1044  * normal environment. Client code is expected to handle reconnects.
1045  */
1047  {
1048  ereport(LOG,
1049  (errmsg("terminating walsender process after promotion")));
1050  walsender_ready_to_stop = true;
1051  }
1052 
1054 
1055  /* Send a CopyBothResponse message, and start streaming */
1056  pq_beginmessage(&buf, 'W');
1057  pq_sendbyte(&buf, 0);
1058  pq_sendint(&buf, 0, 2);
1059  pq_endmessage(&buf);
1060  pq_flush();
1061 
1062  /*
1063  * Initialize position to the last ack'ed one, then the xlog records begin
1064  * to be shipped from that position.
1065  */
1066  logical_decoding_ctx = CreateDecodingContext(
1067  cmd->startpoint, cmd->options,
1070 
1071  /* Start reading WAL from the oldest required WAL. */
1073 
1074  /*
1075  * Report the location after which we'll send out further commits as the
1076  * current sentPtr.
1077  */
1079 
1080  /* Also update the sent position status in shared memory */
1081  {
1082  WalSnd *walsnd = MyWalSnd;
1083 
1084  SpinLockAcquire(&walsnd->mutex);
1086  SpinLockRelease(&walsnd->mutex);
1087  }
1088 
1089  replication_active = true;
1090 
1092 
1093  /* Main loop of walsender */
1095 
1096  FreeDecodingContext(logical_decoding_ctx);
1098 
1099  replication_active = false;
1101  proc_exit(0);
1103 
1104  /* Get out of COPY mode (CommandComplete). */
1105  EndCommand("COPY 0", DestRemote);
1106 }
1107 
1108 /*
1109  * LogicalDecodingContext 'prepare_write' callback.
1110  *
1111  * Prepare a write into a StringInfo.
1112  *
1113  * Don't do anything lasting in here, it's quite possible that nothing will done
1114  * with the data.
1115  */
1116 static void
1118 {
1119  /* can't have sync rep confused by sending the same LSN several times */
1120  if (!last_write)
1121  lsn = InvalidXLogRecPtr;
1122 
1123  resetStringInfo(ctx->out);
1124 
1125  pq_sendbyte(ctx->out, 'w');
1126  pq_sendint64(ctx->out, lsn); /* dataStart */
1127  pq_sendint64(ctx->out, lsn); /* walEnd */
1128 
1129  /*
1130  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1131  * reserve space here.
1132  */
1133  pq_sendint64(ctx->out, 0); /* sendtime */
1134 }
1135 
1136 /*
1137  * LogicalDecodingContext 'write' callback.
1138  *
1139  * Actually write out data previously prepared by WalSndPrepareWrite out to
1140  * the network. Take as long as needed, but process replies from the other
1141  * side and check timeouts during that.
1142  */
1143 static void
1145  bool last_write)
1146 {
1147  /* output previously gathered data in a CopyData packet */
1148  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1149 
1150  /*
1151  * Fill the send timestamp last, so that it is taken as late as possible.
1152  * This is somewhat ugly, but the protocol's set as it's already used for
1153  * several releases by streaming physical replication.
1154  */
1155  resetStringInfo(&tmpbuf);
1156  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
1157  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1158  tmpbuf.data, sizeof(int64));
1159 
1160  /* fast path */
1161  /* Try to flush pending output to the client */
1162  if (pq_flush_if_writable() != 0)
1163  WalSndShutdown();
1164 
1165  if (!pq_is_send_pending())
1166  return;
1167 
1168  for (;;)
1169  {
1170  int wakeEvents;
1171  long sleeptime;
1172  TimestampTz now;
1173 
1174  /*
1175  * Emergency bailout if postmaster has died. This is to avoid the
1176  * necessity for manual cleanup of all postmaster children.
1177  */
1178  if (!PostmasterIsAlive())
1179  exit(1);
1180 
1181  /* Clear any already-pending wakeups */
1183 
1185 
1186  /* Process any requests or signals received recently */
1187  if (got_SIGHUP)
1188  {
1189  got_SIGHUP = false;
1192  }
1193 
1194  /* Check for input from the client */
1196 
1197  /* Try to flush pending output to the client */
1198  if (pq_flush_if_writable() != 0)
1199  WalSndShutdown();
1200 
1201  /* If we finished clearing the buffered data, we're done here. */
1202  if (!pq_is_send_pending())
1203  break;
1204 
1205  now = GetCurrentTimestamp();
1206 
1207  /* die if timeout was reached */
1208  WalSndCheckTimeOut(now);
1209 
1210  /* Send keepalive if the time has come */
1212 
1213  sleeptime = WalSndComputeSleeptime(now);
1214 
1215  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1217 
1218  /* Sleep until something happens or we time out */
1219  WaitLatchOrSocket(MyLatch, wakeEvents,
1220  MyProcPort->sock, sleeptime,
1222  }
1223 
1224  /* reactivate latch so WalSndLoop knows to continue */
1225  SetLatch(MyLatch);
1226 }
1227 
1228 /*
1229  * Wait till WAL < loc is flushed to disk so it can be safely read.
1230  */
1231 static XLogRecPtr
1233 {
1234  int wakeEvents;
1235  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1236 
1237 
1238  /*
1239  * Fast path to avoid acquiring the spinlock in the we already know we
1240  * have enough WAL available. This is particularly interesting if we're
1241  * far behind.
1242  */
1243  if (RecentFlushPtr != InvalidXLogRecPtr &&
1244  loc <= RecentFlushPtr)
1245  return RecentFlushPtr;
1246 
1247  /* Get a more recent flush pointer. */
1248  if (!RecoveryInProgress())
1249  RecentFlushPtr = GetFlushRecPtr();
1250  else
1251  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1252 
1253  for (;;)
1254  {
1255  long sleeptime;
1256  TimestampTz now;
1257 
1258  /*
1259  * Emergency bailout if postmaster has died. This is to avoid the
1260  * necessity for manual cleanup of all postmaster children.
1261  */
1262  if (!PostmasterIsAlive())
1263  exit(1);
1264 
1265  /* Clear any already-pending wakeups */
1267 
1269 
1270  /* Process any requests or signals received recently */
1271  if (got_SIGHUP)
1272  {
1273  got_SIGHUP = false;
1276  }
1277 
1278  /* Check for input from the client */
1280 
1281  /* Update our idea of the currently flushed position. */
1282  if (!RecoveryInProgress())
1283  RecentFlushPtr = GetFlushRecPtr();
1284  else
1285  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1286 
1287  /*
1288  * If postmaster asked us to stop, don't wait here anymore. This will
1289  * cause the xlogreader to return without reading a full record, which
1290  * is the fastest way to reach the mainloop which then can quit.
1291  *
1292  * It's important to do this check after the recomputation of
1293  * RecentFlushPtr, so we can send all remaining data before shutting
1294  * down.
1295  */
1297  break;
1298 
1299  /*
1300  * We only send regular messages to the client for full decoded
1301  * transactions, but a synchronous replication and walsender shutdown
1302  * possibly are waiting for a later location. So we send pings
1303  * containing the flush location every now and then.
1304  */
1305  if (MyWalSnd->flush < sentPtr &&
1306  MyWalSnd->write < sentPtr &&
1308  {
1309  WalSndKeepalive(false);
1311  }
1312 
1313  /* check whether we're done */
1314  if (loc <= RecentFlushPtr)
1315  break;
1316 
1317  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1318  WalSndCaughtUp = true;
1319 
1320  /*
1321  * Try to flush pending output to the client. Also wait for the socket
1322  * becoming writable, if there's still pending output after an attempt
1323  * to flush. Otherwise we might just sit on output data while waiting
1324  * for new WAL being generated.
1325  */
1326  if (pq_flush_if_writable() != 0)
1327  WalSndShutdown();
1328 
1329  now = GetCurrentTimestamp();
1330 
1331  /* die if timeout was reached */
1332  WalSndCheckTimeOut(now);
1333 
1334  /* Send keepalive if the time has come */
1336 
1337  sleeptime = WalSndComputeSleeptime(now);
1338 
1339  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1341 
1342  if (pq_is_send_pending())
1343  wakeEvents |= WL_SOCKET_WRITEABLE;
1344 
1345  /* Sleep until something happens or we time out */
1346  WaitLatchOrSocket(MyLatch, wakeEvents,
1347  MyProcPort->sock, sleeptime,
1349  }
1350 
1351  /* reactivate latch so WalSndLoop knows to continue */
1352  SetLatch(MyLatch);
1353  return RecentFlushPtr;
1354 }
1355 
1356 /*
1357  * Execute an incoming replication command.
1358  *
1359  * Returns true if the cmd_string was recognized as WalSender command, false
1360  * if not.
1361  */
1362 bool
1363 exec_replication_command(const char *cmd_string)
1364 {
1365  int parse_rc;
1366  Node *cmd_node;
1367  MemoryContext cmd_context;
1368  MemoryContext old_context;
1369 
1370  /*
1371  * Log replication command if log_replication_commands is enabled. Even
1372  * when it's disabled, log the command with DEBUG1 level for backward
1373  * compatibility.
1374  */
1376  (errmsg("received replication command: %s", cmd_string)));
1377 
1378  /*
1379  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1380  * command arrives. Clean up the old stuff if there's anything.
1381  */
1383 
1385 
1387  "Replication command context",
1389  old_context = MemoryContextSwitchTo(cmd_context);
1390 
1391  replication_scanner_init(cmd_string);
1392  parse_rc = replication_yyparse();
1393  if (parse_rc != 0)
1394  ereport(ERROR,
1395  (errcode(ERRCODE_SYNTAX_ERROR),
1396  (errmsg_internal("replication command parser returned %d",
1397  parse_rc))));
1398 
1399  cmd_node = replication_parse_result;
1400 
1401  /*
1402  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1403  * called outside of transaction the snapshot should be cleared here.
1404  */
1405  if (!IsTransactionBlock())
1407 
1408  /*
1409  * For aborted transactions, don't allow anything except pure SQL,
1410  * the exec_simple_query() will handle it correctly.
1411  */
1412  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1413  ereport(ERROR,
1414  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1415  errmsg("current transaction is aborted, "
1416  "commands ignored until end of transaction block")));
1417 
1419 
1420  /*
1421  * Allocate buffers that will be used for each outgoing and incoming
1422  * message. We do this just once per command to reduce palloc overhead.
1423  */
1424  initStringInfo(&output_message);
1425  initStringInfo(&reply_message);
1426  initStringInfo(&tmpbuf);
1427 
1428  switch (cmd_node->type)
1429  {
1430  case T_IdentifySystemCmd:
1431  IdentifySystem();
1432  break;
1433 
1434  case T_BaseBackupCmd:
1435  PreventTransactionChain(true, "BASE_BACKUP");
1436  SendBaseBackup((BaseBackupCmd *) cmd_node);
1437  break;
1438 
1441  break;
1442 
1445  break;
1446 
1447  case T_StartReplicationCmd:
1448  {
1449  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1450 
1451  PreventTransactionChain(true, "START_REPLICATION");
1452 
1453  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1454  StartReplication(cmd);
1455  else
1457  break;
1458  }
1459 
1460  case T_TimeLineHistoryCmd:
1461  PreventTransactionChain(true, "TIMELINE_HISTORY");
1463  break;
1464 
1465  case T_VariableShowStmt:
1466  {
1468  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1469 
1470  GetPGVariable(n->name, dest);
1471  }
1472  break;
1473 
1474  case T_SQLCmd:
1475  if (MyDatabaseId == InvalidOid)
1476  ereport(ERROR,
1477  (errmsg("not connected to database")));
1478 
1479  /* Tell the caller that this wasn't a WalSender command. */
1480  return false;
1481 
1482  default:
1483  elog(ERROR, "unrecognized replication command node tag: %u",
1484  cmd_node->type);
1485  }
1486 
1487  /* done */
1488  MemoryContextSwitchTo(old_context);
1489  MemoryContextDelete(cmd_context);
1490 
1491  /* Send CommandComplete message */
1492  EndCommand("SELECT", DestRemote);
1493 
1494  return true;
1495 }
1496 
1497 /*
1498  * Process any incoming messages while streaming. Also checks if the remote
1499  * end has closed the connection.
1500  */
1501 static void
1503 {
1504  unsigned char firstchar;
1505  int r;
1506  bool received = false;
1507 
1508  for (;;)
1509  {
1510  pq_startmsgread();
1511  r = pq_getbyte_if_available(&firstchar);
1512  if (r < 0)
1513  {
1514  /* unexpected error or EOF */
1516  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1517  errmsg("unexpected EOF on standby connection")));
1518  proc_exit(0);
1519  }
1520  if (r == 0)
1521  {
1522  /* no data available without blocking */
1523  pq_endmsgread();
1524  break;
1525  }
1526 
1527  /* Read the message contents */
1528  resetStringInfo(&reply_message);
1529  if (pq_getmessage(&reply_message, 0))
1530  {
1532  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1533  errmsg("unexpected EOF on standby connection")));
1534  proc_exit(0);
1535  }
1536 
1537  /*
1538  * If we already received a CopyDone from the frontend, the frontend
1539  * should not send us anything until we've closed our end of the COPY.
1540  * XXX: In theory, the frontend could already send the next command
1541  * before receiving the CopyDone, but libpq doesn't currently allow
1542  * that.
1543  */
1544  if (streamingDoneReceiving && firstchar != 'X')
1545  ereport(FATAL,
1546  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1547  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1548  firstchar)));
1549 
1550  /* Handle the very limited subset of commands expected in this phase */
1551  switch (firstchar)
1552  {
1553  /*
1554  * 'd' means a standby reply wrapped in a CopyData packet.
1555  */
1556  case 'd':
1558  received = true;
1559  break;
1560 
1561  /*
1562  * CopyDone means the standby requested to finish streaming.
1563  * Reply with CopyDone, if we had not sent that already.
1564  */
1565  case 'c':
1566  if (!streamingDoneSending)
1567  {
1568  pq_putmessage_noblock('c', NULL, 0);
1569  streamingDoneSending = true;
1570  }
1571 
1572  streamingDoneReceiving = true;
1573  received = true;
1574  break;
1575 
1576  /*
1577  * 'X' means that the standby is closing down the socket.
1578  */
1579  case 'X':
1580  proc_exit(0);
1581 
1582  default:
1583  ereport(FATAL,
1584  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1585  errmsg("invalid standby message type \"%c\"",
1586  firstchar)));
1587  }
1588  }
1589 
1590  /*
1591  * Save the last reply timestamp if we've received at least one reply.
1592  */
1593  if (received)
1594  {
1596  waiting_for_ping_response = false;
1597  }
1598 }
1599 
1600 /*
1601  * Process a status update message received from standby.
1602  */
1603 static void
1605 {
1606  char msgtype;
1607 
1608  /*
1609  * Check message type from the first byte.
1610  */
1611  msgtype = pq_getmsgbyte(&reply_message);
1612 
1613  switch (msgtype)
1614  {
1615  case 'r':
1617  break;
1618 
1619  case 'h':
1621  break;
1622 
1623  default:
1625  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1626  errmsg("unexpected message type \"%c\"", msgtype)));
1627  proc_exit(0);
1628  }
1629 }
1630 
1631 /*
1632  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1633  */
1634 static void
1636 {
1637  bool changed = false;
1639 
1640  Assert(lsn != InvalidXLogRecPtr);
1641  SpinLockAcquire(&slot->mutex);
1642  if (slot->data.restart_lsn != lsn)
1643  {
1644  changed = true;
1645  slot->data.restart_lsn = lsn;
1646  }
1647  SpinLockRelease(&slot->mutex);
1648 
1649  if (changed)
1650  {
1653  }
1654 
1655  /*
1656  * One could argue that the slot should be saved to disk now, but that'd
1657  * be energy wasted - the worst lost information can do here is give us
1658  * wrong information in a statistics view - we'll just potentially be more
1659  * conservative in removing files.
1660  */
1661 }
1662 
1663 /*
1664  * Regular reply from standby advising of WAL positions on standby server.
1665  */
1666 static void
1668 {
1669  XLogRecPtr writePtr,
1670  flushPtr,
1671  applyPtr;
1672  bool replyRequested;
1673  TimeOffset writeLag,
1674  flushLag,
1675  applyLag;
1676  bool clearLagTimes;
1677  TimestampTz now;
1678 
1679  static bool fullyAppliedLastTime = false;
1680 
1681  /* the caller already consumed the msgtype byte */
1682  writePtr = pq_getmsgint64(&reply_message);
1683  flushPtr = pq_getmsgint64(&reply_message);
1684  applyPtr = pq_getmsgint64(&reply_message);
1685  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1686  replyRequested = pq_getmsgbyte(&reply_message);
1687 
1688  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1689  (uint32) (writePtr >> 32), (uint32) writePtr,
1690  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1691  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1692  replyRequested ? " (reply requested)" : "");
1693 
1694  /* See if we can compute the round-trip lag for these positions. */
1695  now = GetCurrentTimestamp();
1696  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1697  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1698  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1699 
1700  /*
1701  * If the standby reports that it has fully replayed the WAL in two
1702  * consecutive reply messages, then the second such message must result
1703  * from wal_receiver_status_interval expiring on the standby. This is a
1704  * convenient time to forget the lag times measured when it last
1705  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1706  * until more WAL traffic arrives.
1707  */
1708  clearLagTimes = false;
1709  if (applyPtr == sentPtr)
1710  {
1711  if (fullyAppliedLastTime)
1712  clearLagTimes = true;
1713  fullyAppliedLastTime = true;
1714  }
1715  else
1716  fullyAppliedLastTime = false;
1717 
1718  /* Send a reply if the standby requested one. */
1719  if (replyRequested)
1720  WalSndKeepalive(false);
1721 
1722  /*
1723  * Update shared state for this WalSender process based on reply data from
1724  * standby.
1725  */
1726  {
1727  WalSnd *walsnd = MyWalSnd;
1728 
1729  SpinLockAcquire(&walsnd->mutex);
1730  walsnd->write = writePtr;
1731  walsnd->flush = flushPtr;
1732  walsnd->apply = applyPtr;
1733  if (writeLag != -1 || clearLagTimes)
1734  walsnd->writeLag = writeLag;
1735  if (flushLag != -1 || clearLagTimes)
1736  walsnd->flushLag = flushLag;
1737  if (applyLag != -1 || clearLagTimes)
1738  walsnd->applyLag = applyLag;
1739  SpinLockRelease(&walsnd->mutex);
1740  }
1741 
1744 
1745  /*
1746  * Advance our local xmin horizon when the client confirmed a flush.
1747  */
1748  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1749  {
1752  else
1754  }
1755 }
1756 
1757 /* compute new replication slot xmin horizon if needed */
1758 static void
1760 {
1761  bool changed = false;
1763 
1764  SpinLockAcquire(&slot->mutex);
1766 
1767  /*
1768  * For physical replication we don't need the interlock provided by xmin
1769  * and effective_xmin since the consequences of a missed increase are
1770  * limited to query cancellations, so set both at once.
1771  */
1772  if (!TransactionIdIsNormal(slot->data.xmin) ||
1773  !TransactionIdIsNormal(feedbackXmin) ||
1774  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1775  {
1776  changed = true;
1777  slot->data.xmin = feedbackXmin;
1778  slot->effective_xmin = feedbackXmin;
1779  }
1780  SpinLockRelease(&slot->mutex);
1781 
1782  if (changed)
1783  {
1786  }
1787 }
1788 
1789 /*
1790  * Hot Standby feedback
1791  */
1792 static void
1794 {
1795  TransactionId nextXid;
1796  uint32 nextEpoch;
1797  TransactionId feedbackXmin;
1798  uint32 feedbackEpoch;
1799 
1800  /*
1801  * Decipher the reply message. The caller already consumed the msgtype
1802  * byte.
1803  */
1804  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1805  feedbackXmin = pq_getmsgint(&reply_message, 4);
1806  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1807 
1808  elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
1809  feedbackXmin,
1810  feedbackEpoch);
1811 
1812  /* Unset WalSender's xmin if the feedback message value is invalid */
1813  if (!TransactionIdIsNormal(feedbackXmin))
1814  {
1816  if (MyReplicationSlot != NULL)
1817  PhysicalReplicationSlotNewXmin(feedbackXmin);
1818  return;
1819  }
1820 
1821  /*
1822  * Check that the provided xmin/epoch are sane, that is, not in the future
1823  * and not so far back as to be already wrapped around. Ignore if not.
1824  *
1825  * Epoch of nextXid should be same as standby, or if the counter has
1826  * wrapped, then one greater than standby.
1827  */
1828  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1829 
1830  if (feedbackXmin <= nextXid)
1831  {
1832  if (feedbackEpoch != nextEpoch)
1833  return;
1834  }
1835  else
1836  {
1837  if (feedbackEpoch + 1 != nextEpoch)
1838  return;
1839  }
1840 
1841  if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
1842  return; /* epoch OK, but it's wrapped around */
1843 
1844  /*
1845  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1846  * the xmin will be taken into account by GetOldestXmin. This will hold
1847  * back the removal of dead rows and thereby prevent the generation of
1848  * cleanup conflicts on the standby server.
1849  *
1850  * There is a small window for a race condition here: although we just
1851  * checked that feedbackXmin precedes nextXid, the nextXid could have
1852  * gotten advanced between our fetching it and applying the xmin below,
1853  * perhaps far enough to make feedbackXmin wrap around. In that case the
1854  * xmin we set here would be "in the future" and have no effect. No point
1855  * in worrying about this since it's too late to save the desired data
1856  * anyway. Assuming that the standby sends us an increasing sequence of
1857  * xmins, this could only happen during the first reply cycle, else our
1858  * own xmin would prevent nextXid from advancing so far.
1859  *
1860  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1861  * is assumed atomic, and there's no real need to prevent a concurrent
1862  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1863  * safe, and if we're moving it backwards, well, the data is at risk
1864  * already since a VACUUM could have just finished calling GetOldestXmin.)
1865  *
1866  * If we're using a replication slot we reserve the xmin via that,
1867  * otherwise via the walsender's PGXACT entry.
1868  *
1869  * XXX: It might make sense to generalize the ephemeral slot concept and
1870  * always use the slot mechanism to handle the feedback xmin.
1871  */
1872  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1873  PhysicalReplicationSlotNewXmin(feedbackXmin);
1874  else
1875  MyPgXact->xmin = feedbackXmin;
1876 }
1877 
1878 /*
1879  * Compute how long send/receive loops should sleep.
1880  *
1881  * If wal_sender_timeout is enabled we want to wake up in time to send
1882  * keepalives and to abort the connection if wal_sender_timeout has been
1883  * reached.
1884  */
1885 static long
1887 {
1888  long sleeptime = 10000; /* 10 s */
1889 
1891  {
1892  TimestampTz wakeup_time;
1893  long sec_to_timeout;
1894  int microsec_to_timeout;
1895 
1896  /*
1897  * At the latest stop sleeping once wal_sender_timeout has been
1898  * reached.
1899  */
1902 
1903  /*
1904  * If no ping has been sent yet, wakeup when it's time to do so.
1905  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
1906  * the timeout passed without a response.
1907  */
1910  wal_sender_timeout / 2);
1911 
1912  /* Compute relative time until wakeup. */
1913  TimestampDifference(now, wakeup_time,
1914  &sec_to_timeout, &microsec_to_timeout);
1915 
1916  sleeptime = sec_to_timeout * 1000 +
1917  microsec_to_timeout / 1000;
1918  }
1919 
1920  return sleeptime;
1921 }
1922 
1923 /*
1924  * Check whether there have been responses by the client within
1925  * wal_sender_timeout and shutdown if not.
1926  */
1927 static void
1929 {
1930  TimestampTz timeout;
1931 
1932  /* don't bail out if we're doing something that doesn't require timeouts */
1933  if (last_reply_timestamp <= 0)
1934  return;
1935 
1938 
1939  if (wal_sender_timeout > 0 && now >= timeout)
1940  {
1941  /*
1942  * Since typically expiration of replication timeout means
1943  * communication problem, we don't send the error message to the
1944  * standby.
1945  */
1947  (errmsg("terminating walsender process due to replication timeout")));
1948 
1949  WalSndShutdown();
1950  }
1951 }
1952 
1953 /* Main loop of walsender process that streams the WAL over Copy messages. */
1954 static void
1956 {
1957  /*
1958  * Initialize the last reply timestamp. That enables timeout processing
1959  * from hereon.
1960  */
1962  waiting_for_ping_response = false;
1963 
1964  /* Report to pgstat that this process is a WAL sender */
1965  pgstat_report_activity(STATE_RUNNING, "walsender");
1966 
1967  /*
1968  * Loop until we reach the end of this timeline or the client requests to
1969  * stop streaming.
1970  */
1971  for (;;)
1972  {
1973  TimestampTz now;
1974 
1975  /*
1976  * Emergency bailout if postmaster has died. This is to avoid the
1977  * necessity for manual cleanup of all postmaster children.
1978  */
1979  if (!PostmasterIsAlive())
1980  exit(1);
1981 
1982  /* Clear any already-pending wakeups */
1984 
1986 
1987  /* Process any requests or signals received recently */
1988  if (got_SIGHUP)
1989  {
1990  got_SIGHUP = false;
1993  }
1994 
1995  /* Check for input from the client */
1997 
1998  /*
1999  * If we have received CopyDone from the client, sent CopyDone
2000  * ourselves, and the output buffer is empty, it's time to exit
2001  * streaming.
2002  */
2004  break;
2005 
2006  /*
2007  * If we don't have any pending data in the output buffer, try to send
2008  * some more. If there is some, we don't bother to call send_data
2009  * again until we've flushed it ... but we'd better assume we are not
2010  * caught up.
2011  */
2012  if (!pq_is_send_pending())
2013  send_data();
2014  else
2015  WalSndCaughtUp = false;
2016 
2017  /* Try to flush pending output to the client */
2018  if (pq_flush_if_writable() != 0)
2019  WalSndShutdown();
2020 
2021  /* If nothing remains to be sent right now ... */
2023  {
2024  /*
2025  * If we're in catchup state, move to streaming. This is an
2026  * important state change for users to know about, since before
2027  * this point data loss might occur if the primary dies and we
2028  * need to failover to the standby. The state change is also
2029  * important for synchronous replication, since commits that
2030  * started to wait at that point might wait for some time.
2031  */
2032  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2033  {
2034  ereport(DEBUG1,
2035  (errmsg("standby \"%s\" has now caught up with primary",
2036  application_name)));
2038  }
2039 
2040  /*
2041  * When SIGUSR2 arrives, we send any outstanding logs up to the
2042  * shutdown checkpoint record (i.e., the latest record), wait for
2043  * them to be replicated to the standby, and exit. This may be a
2044  * normal termination at shutdown, or a promotion, the walsender
2045  * is not sure which.
2046  */
2048  WalSndDone(send_data);
2049  }
2050 
2051  now = GetCurrentTimestamp();
2052 
2053  /* Check for replication timeout. */
2054  WalSndCheckTimeOut(now);
2055 
2056  /* Send keepalive if the time has come */
2058 
2059  /*
2060  * We don't block if not caught up, unless there is unsent data
2061  * pending in which case we'd better block until the socket is
2062  * write-ready. This test is only needed for the case where the
2063  * send_data callback handled a subset of the available data but then
2064  * pq_flush_if_writable flushed it all --- we should immediately try
2065  * to send more.
2066  */
2068  {
2069  long sleeptime;
2070  int wakeEvents;
2071 
2072  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2074 
2075  sleeptime = WalSndComputeSleeptime(now);
2076 
2077  if (pq_is_send_pending())
2078  wakeEvents |= WL_SOCKET_WRITEABLE;
2079 
2080  /* Sleep until something happens or we time out */
2081  WaitLatchOrSocket(MyLatch, wakeEvents,
2082  MyProcPort->sock, sleeptime,
2084  }
2085  }
2086  return;
2087 }
2088 
2089 /* Initialize a per-walsender data structure for this walsender process */
2090 static void
2092 {
2093  int i;
2094 
2095  /*
2096  * WalSndCtl should be set up already (we inherit this by fork() or
2097  * EXEC_BACKEND mechanism from the postmaster).
2098  */
2099  Assert(WalSndCtl != NULL);
2100  Assert(MyWalSnd == NULL);
2101 
2102  /*
2103  * Find a free walsender slot and reserve it. If this fails, we must be
2104  * out of WalSnd structures.
2105  */
2106  for (i = 0; i < max_wal_senders; i++)
2107  {
2108  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2109 
2110  SpinLockAcquire(&walsnd->mutex);
2111 
2112  if (walsnd->pid != 0)
2113  {
2114  SpinLockRelease(&walsnd->mutex);
2115  continue;
2116  }
2117  else
2118  {
2119  /*
2120  * Found a free slot. Reserve it for us.
2121  */
2122  walsnd->pid = MyProcPid;
2123  walsnd->sentPtr = InvalidXLogRecPtr;
2124  walsnd->write = InvalidXLogRecPtr;
2125  walsnd->flush = InvalidXLogRecPtr;
2126  walsnd->apply = InvalidXLogRecPtr;
2127  walsnd->writeLag = -1;
2128  walsnd->flushLag = -1;
2129  walsnd->applyLag = -1;
2130  walsnd->state = WALSNDSTATE_STARTUP;
2131  walsnd->latch = &MyProc->procLatch;
2132  SpinLockRelease(&walsnd->mutex);
2133  /* don't need the lock anymore */
2134  MyWalSnd = (WalSnd *) walsnd;
2135 
2136  break;
2137  }
2138  }
2139  if (MyWalSnd == NULL)
2140  ereport(FATAL,
2141  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2142  errmsg("number of requested standby connections "
2143  "exceeds max_wal_senders (currently %d)",
2144  max_wal_senders)));
2145 
2146  /* Arrange to clean up at walsender exit */
2148 }
2149 
2150 /* Destroy the per-walsender data structure for this walsender process */
2151 static void
2153 {
2154  WalSnd *walsnd = MyWalSnd;
2155 
2156  Assert(walsnd != NULL);
2157 
2158  MyWalSnd = NULL;
2159 
2160  SpinLockAcquire(&walsnd->mutex);
2161  /* clear latch while holding the spinlock, so it can safely be read */
2162  walsnd->latch = NULL;
2163  /* Mark WalSnd struct as no longer being in use. */
2164  walsnd->pid = 0;
2165  SpinLockRelease(&walsnd->mutex);
2166 }
2167 
2168 /*
2169  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2170  *
2171  * XXX probably this should be improved to suck data directly from the
2172  * WAL buffers when possible.
2173  *
2174  * Will open, and keep open, one WAL segment stored in the global file
2175  * descriptor sendFile. This means if XLogRead is used once, there will
2176  * always be one descriptor left open until the process ends, but never
2177  * more than one.
2178  */
2179 static void
2180 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2181 {
2182  char *p;
2183  XLogRecPtr recptr;
2184  Size nbytes;
2185  XLogSegNo segno;
2186 
2187 retry:
2188  p = buf;
2189  recptr = startptr;
2190  nbytes = count;
2191 
2192  while (nbytes > 0)
2193  {
2194  uint32 startoff;
2195  int segbytes;
2196  int readbytes;
2197 
2198  startoff = recptr % XLogSegSize;
2199 
2200  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2201  {
2202  char path[MAXPGPATH];
2203 
2204  /* Switch to another logfile segment */
2205  if (sendFile >= 0)
2206  close(sendFile);
2207 
2208  XLByteToSeg(recptr, sendSegNo);
2209 
2210  /*-------
2211  * When reading from a historic timeline, and there is a timeline
2212  * switch within this segment, read from the WAL segment belonging
2213  * to the new timeline.
2214  *
2215  * For example, imagine that this server is currently on timeline
2216  * 5, and we're streaming timeline 4. The switch from timeline 4
2217  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2218  *
2219  * ...
2220  * 000000040000000000000012
2221  * 000000040000000000000013
2222  * 000000050000000000000013
2223  * 000000050000000000000014
2224  * ...
2225  *
2226  * In this situation, when requested to send the WAL from
2227  * segment 0x13, on timeline 4, we read the WAL from file
2228  * 000000050000000000000013. Archive recovery prefers files from
2229  * newer timelines, so if the segment was restored from the
2230  * archive on this server, the file belonging to the old timeline,
2231  * 000000040000000000000013, might not exist. Their contents are
2232  * equal up to the switchpoint, because at a timeline switch, the
2233  * used portion of the old segment is copied to the new file.
2234  *-------
2235  */
2238  {
2239  XLogSegNo endSegNo;
2240 
2242  if (sendSegNo == endSegNo)
2244  }
2245 
2247 
2248  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2249  if (sendFile < 0)
2250  {
2251  /*
2252  * If the file is not found, assume it's because the standby
2253  * asked for a too old WAL segment that has already been
2254  * removed or recycled.
2255  */
2256  if (errno == ENOENT)
2257  ereport(ERROR,
2259  errmsg("requested WAL segment %s has already been removed",
2261  else
2262  ereport(ERROR,
2264  errmsg("could not open file \"%s\": %m",
2265  path)));
2266  }
2267  sendOff = 0;
2268  }
2269 
2270  /* Need to seek in the file? */
2271  if (sendOff != startoff)
2272  {
2273  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2274  ereport(ERROR,
2276  errmsg("could not seek in log segment %s to offset %u: %m",
2278  startoff)));
2279  sendOff = startoff;
2280  }
2281 
2282  /* How many bytes are within this segment? */
2283  if (nbytes > (XLogSegSize - startoff))
2284  segbytes = XLogSegSize - startoff;
2285  else
2286  segbytes = nbytes;
2287 
2289  readbytes = read(sendFile, p, segbytes);
2291  if (readbytes <= 0)
2292  {
2293  ereport(ERROR,
2295  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2297  sendOff, (unsigned long) segbytes)));
2298  }
2299 
2300  /* Update state for read */
2301  recptr += readbytes;
2302 
2303  sendOff += readbytes;
2304  nbytes -= readbytes;
2305  p += readbytes;
2306  }
2307 
2308  /*
2309  * After reading into the buffer, check that what we read was valid. We do
2310  * this after reading, because even though the segment was present when we
2311  * opened it, it might get recycled or removed while we read it. The
2312  * read() succeeds in that case, but the data we tried to read might
2313  * already have been overwritten with new WAL records.
2314  */
2315  XLByteToSeg(startptr, segno);
2317 
2318  /*
2319  * During recovery, the currently-open WAL file might be replaced with the
2320  * file of the same name retrieved from archive. So we always need to
2321  * check what we read was valid after reading into the buffer. If it's
2322  * invalid, we try to open and read the file again.
2323  */
2325  {
2326  WalSnd *walsnd = MyWalSnd;
2327  bool reload;
2328 
2329  SpinLockAcquire(&walsnd->mutex);
2330  reload = walsnd->needreload;
2331  walsnd->needreload = false;
2332  SpinLockRelease(&walsnd->mutex);
2333 
2334  if (reload && sendFile >= 0)
2335  {
2336  close(sendFile);
2337  sendFile = -1;
2338 
2339  goto retry;
2340  }
2341  }
2342 }
2343 
2344 /*
2345  * Send out the WAL in its normal physical/stored form.
2346  *
2347  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2348  * but not yet sent to the client, and buffer it in the libpq output
2349  * buffer.
2350  *
2351  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2352  * otherwise WalSndCaughtUp is set to false.
2353  */
2354 static void
2356 {
2357  XLogRecPtr SendRqstPtr;
2358  XLogRecPtr startptr;
2359  XLogRecPtr endptr;
2360  Size nbytes;
2361 
2363  {
2364  WalSndCaughtUp = true;
2365  return;
2366  }
2367 
2368  /* Figure out how far we can safely send the WAL. */
2370  {
2371  /*
2372  * Streaming an old timeline that's in this server's history, but is
2373  * not the one we're currently inserting or replaying. It can be
2374  * streamed up to the point where we switched off that timeline.
2375  */
2376  SendRqstPtr = sendTimeLineValidUpto;
2377  }
2378  else if (am_cascading_walsender)
2379  {
2380  /*
2381  * Streaming the latest timeline on a standby.
2382  *
2383  * Attempt to send all WAL that has already been replayed, so that we
2384  * know it's valid. If we're receiving WAL through streaming
2385  * replication, it's also OK to send any WAL that has been received
2386  * but not replayed.
2387  *
2388  * The timeline we're recovering from can change, or we can be
2389  * promoted. In either case, the current timeline becomes historic. We
2390  * need to detect that so that we don't try to stream past the point
2391  * where we switched to another timeline. We check for promotion or
2392  * timeline switch after calculating FlushPtr, to avoid a race
2393  * condition: if the timeline becomes historic just after we checked
2394  * that it was still current, it's still be OK to stream it up to the
2395  * FlushPtr that was calculated before it became historic.
2396  */
2397  bool becameHistoric = false;
2398 
2399  SendRqstPtr = GetStandbyFlushRecPtr();
2400 
2401  if (!RecoveryInProgress())
2402  {
2403  /*
2404  * We have been promoted. RecoveryInProgress() updated
2405  * ThisTimeLineID to the new current timeline.
2406  */
2407  am_cascading_walsender = false;
2408  becameHistoric = true;
2409  }
2410  else
2411  {
2412  /*
2413  * Still a cascading standby. But is the timeline we're sending
2414  * still the one recovery is recovering from? ThisTimeLineID was
2415  * updated by the GetStandbyFlushRecPtr() call above.
2416  */
2418  becameHistoric = true;
2419  }
2420 
2421  if (becameHistoric)
2422  {
2423  /*
2424  * The timeline we were sending has become historic. Read the
2425  * timeline history file of the new timeline to see where exactly
2426  * we forked off from the timeline we were sending.
2427  */
2428  List *history;
2429 
2432 
2434  list_free_deep(history);
2435 
2436  sendTimeLineIsHistoric = true;
2437 
2438  SendRqstPtr = sendTimeLineValidUpto;
2439  }
2440  }
2441  else
2442  {
2443  /*
2444  * Streaming the current timeline on a master.
2445  *
2446  * Attempt to send all data that's already been written out and
2447  * fsync'd to disk. We cannot go further than what's been written out
2448  * given the current implementation of XLogRead(). And in any case
2449  * it's unsafe to send WAL that is not securely down to disk on the
2450  * master: if the master subsequently crashes and restarts, slaves
2451  * must not have applied any WAL that gets lost on the master.
2452  */
2453  SendRqstPtr = GetFlushRecPtr();
2454  }
2455 
2456  /*
2457  * Record the current system time as an approximation of the time at which
2458  * this WAL position was written for the purposes of lag tracking.
2459  *
2460  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2461  * is flushed and we could get that time as well as the LSN when we call
2462  * GetFlushRecPtr() above (and likewise for the cascading standby
2463  * equivalent), but rather than putting any new code into the hot WAL path
2464  * it seems good enough to capture the time here. We should reach this
2465  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2466  * may take some time, we read the WAL flush pointer and take the time
2467  * very close to together here so that we'll get a later position if it
2468  * is still moving.
2469  *
2470  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2471  * this gives us a cheap approximation for the WAL flush time for this
2472  * LSN.
2473  *
2474  * Note that the LSN is not necessarily the LSN for the data contained in
2475  * the present message; it's the end of the the WAL, which might be
2476  * further ahead. All the lag tracking machinery cares about is finding
2477  * out when that arbitrary LSN is eventually reported as written, flushed
2478  * and applied, so that it can measure the elapsed time.
2479  */
2480  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2481 
2482  /*
2483  * If this is a historic timeline and we've reached the point where we
2484  * forked to the next timeline, stop streaming.
2485  *
2486  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2487  * startup process will normally replay all WAL that has been received
2488  * from the master, before promoting, but if the WAL streaming is
2489  * terminated at a WAL page boundary, the valid portion of the timeline
2490  * might end in the middle of a WAL record. We might've already sent the
2491  * first half of that partial WAL record to the cascading standby, so that
2492  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2493  * replay the partial WAL record either, so it can still follow our
2494  * timeline switch.
2495  */
2497  {
2498  /* close the current file. */
2499  if (sendFile >= 0)
2500  close(sendFile);
2501  sendFile = -1;
2502 
2503  /* Send CopyDone */
2504  pq_putmessage_noblock('c', NULL, 0);
2505  streamingDoneSending = true;
2506 
2507  WalSndCaughtUp = true;
2508 
2509  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2511  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2512  return;
2513  }
2514 
2515  /* Do we have any work to do? */
2516  Assert(sentPtr <= SendRqstPtr);
2517  if (SendRqstPtr <= sentPtr)
2518  {
2519  WalSndCaughtUp = true;
2520  return;
2521  }
2522 
2523  /*
2524  * Figure out how much to send in one message. If there's no more than
2525  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2526  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2527  *
2528  * The rounding is not only for performance reasons. Walreceiver relies on
2529  * the fact that we never split a WAL record across two messages. Since a
2530  * long WAL record is split at page boundary into continuation records,
2531  * page boundary is always a safe cut-off point. We also assume that
2532  * SendRqstPtr never points to the middle of a WAL record.
2533  */
2534  startptr = sentPtr;
2535  endptr = startptr;
2536  endptr += MAX_SEND_SIZE;
2537 
2538  /* if we went beyond SendRqstPtr, back off */
2539  if (SendRqstPtr <= endptr)
2540  {
2541  endptr = SendRqstPtr;
2543  WalSndCaughtUp = false;
2544  else
2545  WalSndCaughtUp = true;
2546  }
2547  else
2548  {
2549  /* round down to page boundary. */
2550  endptr -= (endptr % XLOG_BLCKSZ);
2551  WalSndCaughtUp = false;
2552  }
2553 
2554  nbytes = endptr - startptr;
2555  Assert(nbytes <= MAX_SEND_SIZE);
2556 
2557  /*
2558  * OK to read and send the slice.
2559  */
2560  resetStringInfo(&output_message);
2561  pq_sendbyte(&output_message, 'w');
2562 
2563  pq_sendint64(&output_message, startptr); /* dataStart */
2564  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2565  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2566 
2567  /*
2568  * Read the log directly into the output buffer to avoid extra memcpy
2569  * calls.
2570  */
2571  enlargeStringInfo(&output_message, nbytes);
2572  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2573  output_message.len += nbytes;
2574  output_message.data[output_message.len] = '\0';
2575 
2576  /*
2577  * Fill the send timestamp last, so that it is taken as late as possible.
2578  */
2579  resetStringInfo(&tmpbuf);
2580  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2581  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2582  tmpbuf.data, sizeof(int64));
2583 
2584  pq_putmessage_noblock('d', output_message.data, output_message.len);
2585 
2586  sentPtr = endptr;
2587 
2588  /* Update shared memory status */
2589  {
2590  WalSnd *walsnd = MyWalSnd;
2591 
2592  SpinLockAcquire(&walsnd->mutex);
2593  walsnd->sentPtr = sentPtr;
2594  SpinLockRelease(&walsnd->mutex);
2595  }
2596 
2597  /* Report progress of XLOG streaming in PS display */
2599  {
2600  char activitymsg[50];
2601 
2602  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2603  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2604  set_ps_display(activitymsg, false);
2605  }
2606 
2607  return;
2608 }
2609 
2610 /*
2611  * Stream out logically decoded data.
2612  */
2613 static void
2615 {
2616  XLogRecord *record;
2617  char *errm;
2618 
2619  /*
2620  * Don't know whether we've caught up yet. We'll set it to true in
2621  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2622  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2623  * i.e. when we're shutting down.
2624  */
2625  WalSndCaughtUp = false;
2626 
2627  record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2629 
2630  /* xlog record was invalid */
2631  if (errm != NULL)
2632  elog(ERROR, "%s", errm);
2633 
2634  if (record != NULL)
2635  {
2636  /*
2637  * Note the lack of any call to LagTrackerWrite() which is the responsibility
2638  * of the logical decoding plugin. Response messages are handled normally,
2639  * so this responsibility does not extend to needing to call LagTrackerRead().
2640  */
2641  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2642 
2643  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2644  }
2645  else
2646  {
2647  /*
2648  * If the record we just wanted read is at or beyond the flushed
2649  * point, then we're caught up.
2650  */
2651  if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
2652  WalSndCaughtUp = true;
2653  }
2654 
2655  /* Update shared memory status */
2656  {
2657  WalSnd *walsnd = MyWalSnd;
2658 
2659  SpinLockAcquire(&walsnd->mutex);
2660  walsnd->sentPtr = sentPtr;
2661  SpinLockRelease(&walsnd->mutex);
2662  }
2663 }
2664 
2665 /*
2666  * Shutdown if the sender is caught up.
2667  *
2668  * NB: This should only be called when the shutdown signal has been received
2669  * from postmaster.
2670  *
2671  * Note that if we determine that there's still more data to send, this
2672  * function will return control to the caller.
2673  */
2674 static void
2676 {
2677  XLogRecPtr replicatedPtr;
2678 
2679  /* ... let's just be real sure we're caught up ... */
2680  send_data();
2681 
2682  /*
2683  * To figure out whether all WAL has successfully been replicated, check
2684  * flush location if valid, write otherwise. Tools like pg_receivewal
2685  * will usually (unless in synchronous mode) return an invalid flush
2686  * location.
2687  */
2688  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2689  MyWalSnd->write : MyWalSnd->flush;
2690 
2691  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2692  !pq_is_send_pending())
2693  {
2694  /* Inform the standby that XLOG streaming is done */
2695  EndCommand("COPY 0", DestRemote);
2696  pq_flush();
2697 
2698  proc_exit(0);
2699  }
2701  {
2702  WalSndKeepalive(true);
2704  }
2705 }
2706 
2707 /*
2708  * Returns the latest point in WAL that has been safely flushed to disk, and
2709  * can be sent to the standby. This should only be called when in recovery,
2710  * ie. we're streaming to a cascaded standby.
2711  *
2712  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2713  * replayed WAL record.
2714  */
2715 static XLogRecPtr
2717 {
2718  XLogRecPtr replayPtr;
2719  TimeLineID replayTLI;
2720  XLogRecPtr receivePtr;
2723 
2724  /*
2725  * We can safely send what's already been replayed. Also, if walreceiver
2726  * is streaming WAL from the same timeline, we can send anything that it
2727  * has streamed, but hasn't been replayed yet.
2728  */
2729 
2730  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2731  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2732 
2733  ThisTimeLineID = replayTLI;
2734 
2735  result = replayPtr;
2736  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2737  result = receivePtr;
2738 
2739  return result;
2740 }
2741 
2742 /*
2743  * Request walsenders to reload the currently-open WAL file
2744  */
2745 void
2747 {
2748  int i;
2749 
2750  for (i = 0; i < max_wal_senders; i++)
2751  {
2752  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2753 
2754  if (walsnd->pid == 0)
2755  continue;
2756 
2757  SpinLockAcquire(&walsnd->mutex);
2758  walsnd->needreload = true;
2759  SpinLockRelease(&walsnd->mutex);
2760  }
2761 }
2762 
2763 /* SIGHUP: set flag to re-read config file at next convenient time */
2764 static void
2766 {
2767  int save_errno = errno;
2768 
2769  got_SIGHUP = true;
2770 
2771  SetLatch(MyLatch);
2772 
2773  errno = save_errno;
2774 }
2775 
2776 /* SIGUSR1: set flag to send WAL records */
2777 static void
2779 {
2780  int save_errno = errno;
2781 
2783 
2784  errno = save_errno;
2785 }
2786 
2787 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
2788 static void
2790 {
2791  int save_errno = errno;
2792 
2793  /*
2794  * If replication has not yet started, die like with SIGTERM. If
2795  * replication is active, only set a flag and wake up the main loop. It
2796  * will send any outstanding WAL, wait for it to be replicated to the
2797  * standby, and then exit gracefully.
2798  */
2799  if (!replication_active)
2800  kill(MyProcPid, SIGTERM);
2801 
2802  walsender_ready_to_stop = true;
2803  SetLatch(MyLatch);
2804 
2805  errno = save_errno;
2806 }
2807 
2808 /* Set up signal handlers */
2809 void
2811 {
2812  /* Set up signal handlers */
2813  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2814  * file */
2815  pqsignal(SIGINT, SIG_IGN); /* not used */
2816  pqsignal(SIGTERM, die); /* request shutdown */
2817  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2818  InitializeTimeouts(); /* establishes SIGALRM handler */
2820  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2821  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2822  * shutdown */
2823 
2824  /* Reset some signals that are accepted by postmaster but not here */
2830 }
2831 
2832 /* Report shared-memory space needed by WalSndShmemInit */
2833 Size
2835 {
2836  Size size = 0;
2837 
2838  size = offsetof(WalSndCtlData, walsnds);
2839  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2840 
2841  return size;
2842 }
2843 
2844 /* Allocate and initialize walsender-related shared memory */
2845 void
2847 {
2848  bool found;
2849  int i;
2850 
2851  WalSndCtl = (WalSndCtlData *)
2852  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2853 
2854  if (!found)
2855  {
2856  /* First time through, so initialize */
2857  MemSet(WalSndCtl, 0, WalSndShmemSize());
2858 
2859  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2860  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
2861 
2862  for (i = 0; i < max_wal_senders; i++)
2863  {
2864  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2865 
2866  SpinLockInit(&walsnd->mutex);
2867  }
2868  }
2869 }
2870 
2871 /*
2872  * Wake up all walsenders
2873  *
2874  * This will be called inside critical sections, so throwing an error is not
2875  * adviseable.
2876  */
2877 void
2879 {
2880  int i;
2881 
2882  for (i = 0; i < max_wal_senders; i++)
2883  {
2884  Latch *latch;
2885  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2886 
2887  /*
2888  * Get latch pointer with spinlock held, for the unlikely case that
2889  * pointer reads aren't atomic (as they're 8 bytes).
2890  */
2891  SpinLockAcquire(&walsnd->mutex);
2892  latch = walsnd->latch;
2893  SpinLockRelease(&walsnd->mutex);
2894 
2895  if (latch != NULL)
2896  SetLatch(latch);
2897  }
2898 }
2899 
2900 /* Set state for current walsender (only called in walsender) */
2901 void
2903 {
2904  WalSnd *walsnd = MyWalSnd;
2905 
2907 
2908  if (walsnd->state == state)
2909  return;
2910 
2911  SpinLockAcquire(&walsnd->mutex);
2912  walsnd->state = state;
2913  SpinLockRelease(&walsnd->mutex);
2914 }
2915 
2916 /*
2917  * Return a string constant representing the state. This is used
2918  * in system views, and should *not* be translated.
2919  */
2920 static const char *
2922 {
2923  switch (state)
2924  {
2925  case WALSNDSTATE_STARTUP:
2926  return "startup";
2927  case WALSNDSTATE_BACKUP:
2928  return "backup";
2929  case WALSNDSTATE_CATCHUP:
2930  return "catchup";
2931  case WALSNDSTATE_STREAMING:
2932  return "streaming";
2933  }
2934  return "UNKNOWN";
2935 }
2936 
2937 static Interval *
2939 {
2940  Interval *result = palloc(sizeof(Interval));
2941 
2942  result->month = 0;
2943  result->day = 0;
2944  result->time = offset;
2945 
2946  return result;
2947 }
2948 
2949 /*
2950  * Returns activity of walsenders, including pids and xlog locations sent to
2951  * standby servers.
2952  */
2953 Datum
2955 {
2956 #define PG_STAT_GET_WAL_SENDERS_COLS 11
2957  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2958  TupleDesc tupdesc;
2959  Tuplestorestate *tupstore;
2960  MemoryContext per_query_ctx;
2961  MemoryContext oldcontext;
2962  List *sync_standbys;
2963  int i;
2964 
2965  /* check to see if caller supports us returning a tuplestore */
2966  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
2967  ereport(ERROR,
2968  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2969  errmsg("set-valued function called in context that cannot accept a set")));
2970  if (!(rsinfo->allowedModes & SFRM_Materialize))
2971  ereport(ERROR,
2972  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2973  errmsg("materialize mode required, but it is not " \
2974  "allowed in this context")));
2975 
2976  /* Build a tuple descriptor for our result type */
2977  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
2978  elog(ERROR, "return type must be a row type");
2979 
2980  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
2981  oldcontext = MemoryContextSwitchTo(per_query_ctx);
2982 
2983  tupstore = tuplestore_begin_heap(true, false, work_mem);
2984  rsinfo->returnMode = SFRM_Materialize;
2985  rsinfo->setResult = tupstore;
2986  rsinfo->setDesc = tupdesc;
2987 
2988  MemoryContextSwitchTo(oldcontext);
2989 
2990  /*
2991  * Get the currently active synchronous standbys.
2992  */
2993  LWLockAcquire(SyncRepLock, LW_SHARED);
2994  sync_standbys = SyncRepGetSyncStandbys(NULL);
2995  LWLockRelease(SyncRepLock);
2996 
2997  for (i = 0; i < max_wal_senders; i++)
2998  {
2999  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3001  XLogRecPtr write;
3002  XLogRecPtr flush;
3003  XLogRecPtr apply;
3004  TimeOffset writeLag;
3005  TimeOffset flushLag;
3006  TimeOffset applyLag;
3007  int priority;
3010  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3011 
3012  if (walsnd->pid == 0)
3013  continue;
3014 
3015  SpinLockAcquire(&walsnd->mutex);
3016  sentPtr = walsnd->sentPtr;
3017  state = walsnd->state;
3018  write = walsnd->write;
3019  flush = walsnd->flush;
3020  apply = walsnd->apply;
3021  writeLag = walsnd->writeLag;
3022  flushLag = walsnd->flushLag;
3023  applyLag = walsnd->applyLag;
3024  priority = walsnd->sync_standby_priority;
3025  SpinLockRelease(&walsnd->mutex);
3026 
3027  memset(nulls, 0, sizeof(nulls));
3028  values[0] = Int32GetDatum(walsnd->pid);
3029 
3030  if (!superuser())
3031  {
3032  /*
3033  * Only superusers can see details. Other users only get the pid
3034  * value to know it's a walsender, but no details.
3035  */
3036  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3037  }
3038  else
3039  {
3040  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3041 
3042  if (XLogRecPtrIsInvalid(sentPtr))
3043  nulls[2] = true;
3044  values[2] = LSNGetDatum(sentPtr);
3045 
3046  if (XLogRecPtrIsInvalid(write))
3047  nulls[3] = true;
3048  values[3] = LSNGetDatum(write);
3049 
3050  if (XLogRecPtrIsInvalid(flush))
3051  nulls[4] = true;
3052  values[4] = LSNGetDatum(flush);
3053 
3054  if (XLogRecPtrIsInvalid(apply))
3055  nulls[5] = true;
3056  values[5] = LSNGetDatum(apply);
3057 
3058  /*
3059  * Treat a standby such as a pg_basebackup background process
3060  * which always returns an invalid flush location, as an
3061  * asynchronous standby.
3062  */
3063  priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
3064 
3065  if (writeLag < 0)
3066  nulls[6] = true;
3067  else
3068  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3069 
3070  if (flushLag < 0)
3071  nulls[7] = true;
3072  else
3073  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3074 
3075  if (applyLag < 0)
3076  nulls[8] = true;
3077  else
3078  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3079 
3080  values[9] = Int32GetDatum(priority);
3081 
3082  /*
3083  * More easily understood version of standby state. This is purely
3084  * informational.
3085  *
3086  * In quorum-based sync replication, the role of each standby
3087  * listed in synchronous_standby_names can be changing very
3088  * frequently. Any standbys considered as "sync" at one moment can
3089  * be switched to "potential" ones at the next moment. So, it's
3090  * basically useless to report "sync" or "potential" as their sync
3091  * states. We report just "quorum" for them.
3092  */
3093  if (priority == 0)
3094  values[10] = CStringGetTextDatum("async");
3095  else if (list_member_int(sync_standbys, i))
3097  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3098  else
3099  values[10] = CStringGetTextDatum("potential");
3100  }
3101 
3102  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3103  }
3104 
3105  /* clean up and return the tuplestore */
3106  tuplestore_donestoring(tupstore);
3107 
3108  return (Datum) 0;
3109 }
3110 
3111 /*
3112  * This function is used to send keepalive message to standby.
3113  * If requestReply is set, sets a flag in the message requesting the standby
3114  * to send a message back to us, for heartbeat purposes.
3115  */
3116 static void
3117 WalSndKeepalive(bool requestReply)
3118 {
3119  elog(DEBUG2, "sending replication keepalive");
3120 
3121  /* construct the message... */
3122  resetStringInfo(&output_message);
3123  pq_sendbyte(&output_message, 'k');
3124  pq_sendint64(&output_message, sentPtr);
3125  pq_sendint64(&output_message, GetCurrentTimestamp());
3126  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3127 
3128  /* ... and send it wrapped in CopyData */
3129  pq_putmessage_noblock('d', output_message.data, output_message.len);
3130 }
3131 
3132 /*
3133  * Send keepalive message if too much time has elapsed.
3134  */
3135 static void
3137 {
3138  TimestampTz ping_time;
3139 
3140  /*
3141  * Don't send keepalive messages if timeouts are globally disabled or
3142  * we're doing something not partaking in timeouts.
3143  */
3144  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3145  return;
3146 
3148  return;
3149 
3150  /*
3151  * If half of wal_sender_timeout has lapsed without receiving any reply
3152  * from the standby, send a keep-alive message to the standby requesting
3153  * an immediate reply.
3154  */
3156  wal_sender_timeout / 2);
3157  if (now >= ping_time)
3158  {
3159  WalSndKeepalive(true);
3161 
3162  /* Try to flush pending output to the client */
3163  if (pq_flush_if_writable() != 0)
3164  WalSndShutdown();
3165  }
3166 }
3167 
3168 /*
3169  * Record the end of the WAL and the time it was flushed locally, so that
3170  * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
3171  * eventually reported to have been written, flushed and applied by the
3172  * standby in a reply message.
3173  * Exported to allow logical decoding plugins to call this when they choose.
3174  */
3175 void
3177 {
3178  bool buffer_full;
3179  int new_write_head;
3180  int i;
3181 
3182  if (!am_walsender)
3183  return;
3184 
3185  /*
3186  * If the lsn hasn't advanced since last time, then do nothing. This way
3187  * we only record a new sample when new WAL has been written.
3188  */
3189  if (LagTracker.last_lsn == lsn)
3190  return;
3191  LagTracker.last_lsn = lsn;
3192 
3193  /*
3194  * If advancing the write head of the circular buffer would crash into any
3195  * of the read heads, then the buffer is full. In other words, the
3196  * slowest reader (presumably apply) is the one that controls the release
3197  * of space.
3198  */
3199  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3200  buffer_full = false;
3201  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3202  {
3203  if (new_write_head == LagTracker.read_heads[i])
3204  buffer_full = true;
3205  }
3206 
3207  /*
3208  * If the buffer is full, for now we just rewind by one slot and overwrite
3209  * the last sample, as a simple (if somewhat uneven) way to lower the
3210  * sampling rate. There may be better adaptive compaction algorithms.
3211  */
3212  if (buffer_full)
3213  {
3214  new_write_head = LagTracker.write_head;
3215  if (LagTracker.write_head > 0)
3216  LagTracker.write_head--;
3217  else
3218  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3219  }
3220 
3221  /* Store a sample at the current write head position. */
3222  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3223  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3224  LagTracker.write_head = new_write_head;
3225 }
3226 
3227 /*
3228  * Find out how much time has elapsed between the moment WAL position 'lsn'
3229  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3230  * We have a separate read head for each of the reported LSN locations we
3231  * receive in replies from standby; 'head' controls which read head is
3232  * used. Whenever a read head crosses an LSN which was written into the
3233  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3234  * find out the time this LSN (or an earlier one) was flushed locally, and
3235  * therefore compute the lag.
3236  *
3237  * Return -1 if no new sample data is available, and otherwise the elapsed
3238  * time in microseconds.
3239  */
3240 static TimeOffset
3242 {
3243  TimestampTz time = 0;
3244 
3245  /* Read all unread samples up to this LSN or end of buffer. */
3246  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3247  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3248  {
3249  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3250  LagTracker.last_read[head] =
3251  LagTracker.buffer[LagTracker.read_heads[head]];
3252  LagTracker.read_heads[head] =
3253  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3254  }
3255 
3256  if (time > now)
3257  {
3258  /* If the clock somehow went backwards, treat as not found. */
3259  return -1;
3260  }
3261  else if (time == 0)
3262  {
3263  /*
3264  * We didn't cross a time. If there is a future sample that we
3265  * haven't reached yet, and we've already reached at least one sample,
3266  * let's interpolate the local flushed time. This is mainly useful for
3267  * reporting a completely stuck apply position as having increasing
3268  * lag, since otherwise we'd have to wait for it to eventually start
3269  * moving again and cross one of our samples before we can show the
3270  * lag increasing.
3271  */
3272  if (LagTracker.read_heads[head] != LagTracker.write_head &&
3273  LagTracker.last_read[head].time != 0)
3274  {
3275  double fraction;
3276  WalTimeSample prev = LagTracker.last_read[head];
3277  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3278 
3279  Assert(lsn >= prev.lsn);
3280  Assert(prev.lsn < next.lsn);
3281 
3282  if (prev.time > next.time)
3283  {
3284  /* If the clock somehow went backwards, treat as not found. */
3285  return -1;
3286  }
3287 
3288  /* See how far we are between the previous and next samples. */
3289  fraction =
3290  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3291 
3292  /* Scale the local flush time proportionally. */
3293  time = (TimestampTz)
3294  ((double) prev.time + (next.time - prev.time) * fraction);
3295  }
3296  else
3297  {
3298  /* Couldn't interpolate due to lack of data. */
3299  return -1;
3300  }
3301  }
3302 
3303  /* Return the elapsed time since local flush time in microseconds. */
3304  Assert(time != 0);
3305  return now - time;
3306 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:735
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2778
static void InitWalSenderSlot(void)
Definition: walsender.c:2091
void InitializeTimeouts(void)
Definition: timeout.c:340
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:69
Snapshot SnapBuildInitalSnapshot(SnapBuild *builder)
Definition: snapbuild.c:509
#define XLogSegSize
Definition: xlog_internal.h:92
XLogRecPtr write
static void ProcessStandbyMessage(void)
Definition: walsender.c:1604
#define SIGUSR1
Definition: win32.h:211
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:571
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
XLogRecPtr startpoint
Definition: replnodes.h:84
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:38
int wal_sender_timeout
Definition: walsender.c:115
#define SIGCONT
Definition: win32.h:205
uint32 TimeLineID
Definition: xlogdefs.h:45
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:210
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
struct Port * MyProcPort
Definition: globals.c:40
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
uint32 TransactionId
Definition: c.h:397
bool wake_wal_senders
Definition: walsender.c:122
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1363
bool update_process_title
Definition: ps_status.c:35
#define write(a, b, c)
Definition: win32.h:19
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
TransactionId xmin
Definition: proc.h:208
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2789
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2806
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:418
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:778
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1019
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
#define SIGWINCH
Definition: win32.h:209
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
Size WalSndShmemSize(void)
Definition: walsender.c:2834
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
void CommitTransactionCommand(void)
Definition: xact.c:2747
#define XACT_REPEATABLE_READ
Definition: xact.h:30
#define SpinLockInit(lock)
Definition: spin.h:60
void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3176
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2139
uint8 syncrep_method
Definition: syncrep.h:51
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static int sendFile
Definition: walsender.c:128
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:370
static void XLogSendPhysical(void)
Definition: walsender.c:2355
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
#define SIGTTIN
Definition: win32.h:207
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:220
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
#define INT4OID
Definition: pg_type.h:316
Definition: nodes.h:520
static StringInfoData output_message
Definition: walsender.c:153
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
struct cursor * cur
Definition: ecpg.c:28
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
Definition: walsender.c:1759
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:857
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2152
void ReplicationSlotCleanup()
Definition: slot.c:413
return result
Definition: formatting.c:1618
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2675
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:563
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8212
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:672
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:210
TimestampTz time
Definition: walsender.c:197
int sleeptime
Definition: pg_standby.c:40
ReplicationSlotPersistentData data
Definition: slot.h:115
TimeOffset writeLag
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7863
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:93
XLogRecPtr last_lsn
Definition: walsender.c:206
void list_free_deep(List *list)
Definition: list.c:1147
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:118
#define PG_BINARY
Definition: c.h:1038
#define SIGQUIT
Definition: win32.h:197
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7870
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
Latch procLatch
Definition: proc.h:98
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:799
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr confirmed_flush
Definition: slot.h:76
PGXACT * MyPgXact
Definition: proc.c:68
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
int32 day
Definition: timestamp.h:47
TimeLineID timeline
Definition: replnodes.h:96
ReplicationKind kind
Definition: replnodes.h:56
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:743
bool IsTransactionBlock(void)
Definition: xact.c:4304
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2765
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:587
bool am_walsender
Definition: walsender.c:108
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotReserveWal(void)
Definition: slot.c:825
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:414
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:665
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8281
ReplicationKind kind
Definition: replnodes.h:81
void WalSndRqstFileReload(void)
Definition: walsender.c:2746
#define SIG_IGN
Definition: win32.h:193
void ConditionVariableCancelSleep(void)
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:179
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3117
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:563
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
void pq_startmsgread(void)
Definition: pqcomm.c:1191
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1635
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3762
bool FirstSnapshotSet
Definition: snapmgr.c:203
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void WalSndSignals(void)
Definition: walsender.c:2810
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:686
Latch * latch
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11070
void ReplicationSlotPersist(void)
Definition: slot.c:598
TransactionId effective_xmin
Definition: slot.h:111
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1021
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:190
static TimeLineID curFileTimeLine
Definition: walsender.c:133
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define XLogFilePath(path, tli, logSegNo)
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2048
#define DEBUG2
Definition: elog.h:24
bool list_member_int(const List *list, int datum)
Definition: list.c:485
Definition: latch.h:110
Definition: dest.h:88
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:327
WalSndState state
NodeTag type
Definition: nodes.h:522
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10109
static char * buf
Definition: pg_test_fsync.c:65
static bool streamingDoneSending
Definition: walsender.c:172
uint64 XLogSegNo
Definition: xlogdefs.h:34
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2107
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:598
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:514
#define pq_flush_if_writable()
Definition: libpq.h:40
static struct @25 LagTracker
TimeOffset time
Definition: timestamp.h:45
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:174
#define InvalidTransactionId
Definition: transam.h:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:277
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1144
static XLogRecPtr logical_startptr
Definition: walsender.c:191
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:375
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2180
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1205
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TransactionId xmin
Definition: slot.h:57
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
#define SlotIsLogical(slot)
Definition: slot.h:134
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:2152
void SyncRepInitConfig(void)
Definition: syncrep.c:377
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:176
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
XLogRecPtr lsn
Definition: walsender.c:196
int replication_yyparse(void)
Definition: guc.h:72
int32 month
Definition: timestamp.h:48
int max_wal_senders
Definition: walsender.c:114
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:1955
int CloseTransientFile(int fd)
Definition: fd.c:2268
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1117
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1253
static StringInfoData reply_message
Definition: walsender.c:154
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:179
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:316
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1232
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3241
void WalSndErrorCleanup(void)
Definition: walsender.c:283
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:219
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
uintptr_t Datum
Definition: postgres.h:372
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:2921
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
bool am_db_walsender
Definition: walsender.c:111
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1030
Oid MyDatabaseId
Definition: globals.c:76
static void WalSndShutdown(void)
Definition: walsender.c:223
static TimeLineID receiveTLI
Definition: xlog.c:201
int work_mem
Definition: globals.c:112
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
#define SIGPIPE
Definition: win32.h:201
#define SIGHUP
Definition: win32.h:196
void ReplicationSlotDrop(const char *name)
Definition: slot.c:440
void pq_endmsgread(void)
Definition: pqcomm.c:1215
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:179
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:209
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define SIG_DFL
Definition: win32.h:191
int allowedModes
Definition: execnodes.h:201
TimeLineID currTLI
Definition: xlogreader.h:165
#define INT8OID
Definition: pg_type.h:304
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3136
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:40
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
SetFunctionReturnMode returnMode
Definition: execnodes.h:203
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1886
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:229
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1928
#define lfirst(lc)
Definition: pg_list.h:106
void WalSndSetState(WalSndState state)
Definition: walsender.c:2902
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
int sync_standby_priority
Definition: regguts.h:298
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:457
static void XLogSendLogical(void)
Definition: walsender.c:2614
XLogRecPtr restart_lsn
Definition: slot.h:68
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
void StartTransactionCommand(void)
Definition: xact.c:2677
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:207
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1667
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1181
char * dbname
Definition: streamutil.c:38
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:201
int XactIsoLevel
Definition: xact.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:884
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:135
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
WalSndState
static XLogSegNo sendSegNo
Definition: walsender.c:129
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:2954
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:831
static bool streamingDoneReceiving
Definition: walsender.c:173
#define BYTEAOID
Definition: pg_type.h:292
Tuplestorestate * setResult
Definition: execnodes.h:206
#define pg_attribute_noreturn()
Definition: c.h:653
static StringInfoData tmpbuf
Definition: walsender.c:155
#define SIGTTOU
Definition: win32.h:208
bool IsSubTransaction(void)
Definition: xact.c:4376
static Datum values[MAXATTR]
Definition: bootstrap.c:162
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:199
bool log_replication_commands
Definition: walsender.c:117
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4690
#define Int32GetDatum(X)
Definition: postgres.h:485
char * application_name
Definition: guc.c:472
TupleDesc setDesc
Definition: execnodes.h:207
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:115
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogReaderState * reader
Definition: logical.h:38
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
void die(SIGNAL_ARGS)
Definition: postgres.c:2619
CRSSnapshotAction
Definition: walsender.h:22
StringInfo out
Definition: logical.h:59
int i
void WalSndShmemInit(void)
Definition: walsender.c:2846
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:211
#define NameStr(name)
Definition: c.h:499
TimeLineID timeline
Definition: replnodes.h:83
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void * arg
struct Latch * MyLatch
Definition: globals.c:51
#define PG_FUNCTION_ARGS
Definition: fmgr.h:150
char * defname
Definition: parsenodes.h:708
static uint32 sendOff
Definition: walsender.c:130
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2716
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
#define elog
Definition: elog.h:219
void LWLockReleaseAll(void)
Definition: lwlock.c:1814
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:2938
slock_t mutex
Definition: slot.h:88
#define close(a)
Definition: win32.h:17
void latch_sigusr1_handler(void)
Definition: latch.c:1572
CommandDest whereToSendOutput
Definition: postgres.c:86
#define SIGCHLD
Definition: win32.h:206
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1623
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
#define XLByteInSeg(xlrp, logSegNo)
static bool waiting_for_ping_response
Definition: walsender.c:164
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:400
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:329
Definition: pg_list.h:45
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2558
#define UINT64_FORMAT
Definition: c.h:316
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:617
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:405
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
int write_head
Definition: walsender.c:208
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1502
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
#define MAX_SEND_SIZE
Definition: walsender.c:99
#define read(a, b, c)
Definition: win32.h:18
#define SIGUSR2
Definition: win32.h:212
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
#define offsetof(type, field)
Definition: c.h:555
void WalSndWakeup(void)
Definition: walsender.c:2878
void ReplicationSlotMarkDirty(void)
Definition: slot.c:581
bool am_cascading_walsender
Definition: walsender.c:109
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1793
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:648
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3154
int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:899
void replication_scanner_init(const char *query_string)
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define TLHistoryFilePath(path, tli)