PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, postmaster sends us SIGUSR2 after all
28  * regular backends have exited and the shutdown checkpoint has been written.
29  * This instructs walsender to send any outstanding WAL, including the
30  * shutdown checkpoint record, wait for it to be replicated to the standby,
31  * and then exit.
32  *
33  *
34  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
35  *
36  * IDENTIFICATION
37  * src/backend/replication/walsender.c
38  *
39  *-------------------------------------------------------------------------
40  */
41 #include "postgres.h"
42 
43 #include <signal.h>
44 #include <unistd.h>
45 
46 #include "access/printtup.h"
47 #include "access/timeline.h"
48 #include "access/transam.h"
49 #include "access/xact.h"
50 #include "access/xlog_internal.h"
51 #include "access/xlogutils.h"
52 
53 #include "catalog/pg_type.h"
54 #include "commands/dbcommands.h"
55 #include "commands/defrem.h"
56 #include "funcapi.h"
57 #include "libpq/libpq.h"
58 #include "libpq/pqformat.h"
59 #include "miscadmin.h"
60 #include "nodes/replnodes.h"
61 #include "pgstat.h"
62 #include "replication/basebackup.h"
63 #include "replication/decode.h"
64 #include "replication/logical.h"
66 #include "replication/slot.h"
67 #include "replication/snapbuild.h"
68 #include "replication/syncrep.h"
70 #include "replication/walsender.h"
73 #include "storage/fd.h"
74 #include "storage/ipc.h"
75 #include "storage/pmsignal.h"
76 #include "storage/proc.h"
77 #include "storage/procarray.h"
78 #include "tcop/dest.h"
79 #include "tcop/tcopprot.h"
80 #include "utils/builtins.h"
81 #include "utils/guc.h"
82 #include "utils/memutils.h"
83 #include "utils/pg_lsn.h"
84 #include "utils/portal.h"
85 #include "utils/ps_status.h"
86 #include "utils/resowner.h"
87 #include "utils/timeout.h"
88 #include "utils/timestamp.h"
89 
90 /*
91  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
92  *
93  * We don't have a good idea of what a good value would be; there's some
94  * overhead per message in both walsender and walreceiver, but on the other
95  * hand sending large batches makes walsender less responsive to signals
96  * because signals are checked only between messages. 128kB (with
97  * default 8k blocks) seems like a reasonable guess for now.
98  */
99 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
100 
101 /* Array of WalSnds in shared memory */
103 
104 /* My slot in the shared memory array */
106 
107 /* Global state */
108 bool am_walsender = false; /* Am I a walsender process? */
109 bool am_cascading_walsender = false; /* Am I cascading WAL to
110  * another standby? */
111 bool am_db_walsender = false; /* Connected to a database? */
112 
113 /* User-settable parameters for walsender */
114 int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
115 int wal_sender_timeout = 60 * 1000; /* maximum time to send one
116  * WAL data message */
118 
119 /*
120  * State for WalSndWakeupRequest
121  */
122 bool wake_wal_senders = false;
123 
124 /*
125  * These variables are used similarly to openLogFile/SegNo/Off,
126  * but for walsender to read the XLOG.
127  */
128 static int sendFile = -1;
129 static XLogSegNo sendSegNo = 0;
130 static uint32 sendOff = 0;
131 
132 /* Timeline ID of the currently open file */
134 
135 /*
136  * These variables keep track of the state of the timeline we're currently
137  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
138  * the timeline is not the latest timeline on this server, and the server's
139  * history forked off from that timeline at sendTimeLineValidUpto.
140  */
143 static bool sendTimeLineIsHistoric = false;
145 
146 /*
147  * How far have we sent WAL already? This is also advertised in
148  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
149  */
150 static XLogRecPtr sentPtr = 0;
151 
152 /* Buffers for constructing outgoing messages and processing reply messages. */
156 
157 /*
158  * Timestamp of the last receipt of the reply from the standby. Set to 0 if
159  * wal_sender_timeout doesn't need to be active.
160  */
162 
163 /* Have we sent a heartbeat message asking for reply, since last reply? */
164 static bool waiting_for_ping_response = false;
165 
166 /*
167  * While streaming WAL in Copy mode, streamingDoneSending is set to true
168  * after we have sent CopyDone. We should not send any more CopyData messages
169  * after that. streamingDoneReceiving is set to true when we receive CopyDone
170  * from the other end. When both become true, it's time to exit Copy mode.
171  */
174 
175 /* Are we there yet? */
176 static bool WalSndCaughtUp = false;
177 
178 /* Flags set by signal handlers for later service in main loop */
179 static volatile sig_atomic_t got_SIGHUP = false;
180 static volatile sig_atomic_t walsender_ready_to_stop = false;
181 
182 /*
183  * This is set while we are streaming. When not set, SIGUSR2 signal will be
184  * handled like SIGTERM. When set, the main loop is responsible for checking
185  * walsender_ready_to_stop and terminating when it's set (after streaming any
186  * remaining WAL).
187  */
188 static volatile sig_atomic_t replication_active = false;
189 
192 
193 /* A sample associating a log position with the time it was written. */
194 typedef struct
195 {
198 } WalTimeSample;
199 
200 /* The size of our buffer of time samples. */
201 #define LAG_TRACKER_BUFFER_SIZE 8192
202 
203 /* A mechanism for tracking replication lag. */
204 static struct
205 {
211 } LagTracker;
212 
213 /* Signal handlers */
214 static void WalSndSigHupHandler(SIGNAL_ARGS);
217 
218 /* Prototypes for private functions */
219 typedef void (*WalSndSendDataCallback) (void);
220 static void WalSndLoop(WalSndSendDataCallback send_data);
221 static void InitWalSenderSlot(void);
222 static void WalSndKill(int code, Datum arg);
224 static void XLogSendPhysical(void);
225 static void XLogSendLogical(void);
226 static void WalSndDone(WalSndSendDataCallback send_data);
227 static XLogRecPtr GetStandbyFlushRecPtr(void);
228 static void IdentifySystem(void);
231 static void StartReplication(StartReplicationCmd *cmd);
233 static void ProcessStandbyMessage(void);
234 static void ProcessStandbyReplyMessage(void);
235 static void ProcessStandbyHSFeedbackMessage(void);
236 static void ProcessRepliesIfAny(void);
237 static void WalSndKeepalive(bool requestReply);
239 static void WalSndCheckTimeOut(TimestampTz now);
240 static long WalSndComputeSleeptime(TimestampTz now);
241 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
242 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
244 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
246 
247 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
248 
249 
250 /* Initialize walsender process before entering the main command loop */
251 void
252 InitWalSender(void)
253 {
255 
256  /* Create a per-walsender data structure in shared memory */
258 
259  /* Set up resource owner */
260  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
261 
262  /*
263  * Let postmaster know that we're a WAL sender. Once we've declared us as
264  * a WAL sender process, postmaster will let us outlive the bgwriter and
265  * kill us last in the shutdown sequence, so we get a chance to stream all
266  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
267  * there's no going back, and we mustn't write any WAL records after this.
268  */
271 
272  /* Initialize empty timestamp buffer for lag tracking. */
273  memset(&LagTracker, 0, sizeof(LagTracker));
274 }
275 
276 /*
277  * Clean up after an error.
278  *
279  * WAL sender processes don't use transactions like regular backends do.
280  * This function does any cleanup required after an error in a WAL sender
281  * process, similar to what transaction abort does in a regular backend.
282  */
283 void
285 {
289 
290  if (sendFile >= 0)
291  {
292  close(sendFile);
293  sendFile = -1;
294  }
295 
296  if (MyReplicationSlot != NULL)
298 
300 
301  replication_active = false;
303  proc_exit(0);
304 
305  /* Revert back to startup state */
307 }
308 
309 /*
310  * Handle a client's connection abort in an orderly manner.
311  */
312 static void
313 WalSndShutdown(void)
314 {
315  /*
316  * Reset whereToSendOutput to prevent ereport from attempting to send any
317  * more messages to the standby.
318  */
321 
322  proc_exit(0);
323  abort(); /* keep the compiler quiet */
324 }
325 
326 /*
327  * Handle the IDENTIFY_SYSTEM command.
328  */
329 static void
331 {
332  char sysid[32];
333  char xpos[MAXFNAMELEN];
334  XLogRecPtr logptr;
335  char *dbname = NULL;
336  DestReceiver *dest;
337  TupOutputState *tstate;
338  TupleDesc tupdesc;
339  Datum values[4];
340  bool nulls[4];
341 
342  /*
343  * Reply with a result set with one row, four columns. First col is system
344  * ID, second is timeline ID, third is current xlog location and the
345  * fourth contains the database name if we are connected to one.
346  */
347 
348  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
350 
353  {
354  /* this also updates ThisTimeLineID */
355  logptr = GetStandbyFlushRecPtr();
356  }
357  else
358  logptr = GetFlushRecPtr();
359 
360  snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
361 
362  if (MyDatabaseId != InvalidOid)
363  {
365 
366  /* syscache access needs a transaction env. */
368  /* make dbname live outside TX context */
372  /* CommitTransactionCommand switches to TopMemoryContext */
374  }
375 
377  MemSet(nulls, false, sizeof(nulls));
378 
379  /* need a tuple descriptor representing four columns */
380  tupdesc = CreateTemplateTupleDesc(4, false);
381  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
382  TEXTOID, -1, 0);
383  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
384  INT4OID, -1, 0);
385  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
386  TEXTOID, -1, 0);
387  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
388  TEXTOID, -1, 0);
389 
390  /* prepare for projection of tuples */
391  tstate = begin_tup_output_tupdesc(dest, tupdesc);
392 
393  /* column 1: system identifier */
394  values[0] = CStringGetTextDatum(sysid);
395 
396  /* column 2: timeline */
397  values[1] = Int32GetDatum(ThisTimeLineID);
398 
399  /* column 3: xlog position */
400  values[2] = CStringGetTextDatum(xpos);
401 
402  /* column 4: database name, or NULL if none */
403  if (dbname)
404  values[3] = CStringGetTextDatum(dbname);
405  else
406  nulls[3] = true;
407 
408  /* send it to dest */
409  do_tup_output(tstate, values, nulls);
410 
411  end_tup_output(tstate);
412 }
413 
414 
415 /*
416  * Handle TIMELINE_HISTORY command.
417  */
418 static void
420 {
422  char histfname[MAXFNAMELEN];
423  char path[MAXPGPATH];
424  int fd;
425  off_t histfilelen;
426  off_t bytesleft;
427  Size len;
428 
429  /*
430  * Reply with a result set with one row, and two columns. The first col is
431  * the name of the history file, 2nd is the contents.
432  */
433 
434  TLHistoryFileName(histfname, cmd->timeline);
435  TLHistoryFilePath(path, cmd->timeline);
436 
437  /* Send a RowDescription message */
438  pq_beginmessage(&buf, 'T');
439  pq_sendint(&buf, 2, 2); /* 2 fields */
440 
441  /* first field */
442  pq_sendstring(&buf, "filename"); /* col name */
443  pq_sendint(&buf, 0, 4); /* table oid */
444  pq_sendint(&buf, 0, 2); /* attnum */
445  pq_sendint(&buf, TEXTOID, 4); /* type oid */
446  pq_sendint(&buf, -1, 2); /* typlen */
447  pq_sendint(&buf, 0, 4); /* typmod */
448  pq_sendint(&buf, 0, 2); /* format code */
449 
450  /* second field */
451  pq_sendstring(&buf, "content"); /* col name */
452  pq_sendint(&buf, 0, 4); /* table oid */
453  pq_sendint(&buf, 0, 2); /* attnum */
454  pq_sendint(&buf, BYTEAOID, 4); /* type oid */
455  pq_sendint(&buf, -1, 2); /* typlen */
456  pq_sendint(&buf, 0, 4); /* typmod */
457  pq_sendint(&buf, 0, 2); /* format code */
458  pq_endmessage(&buf);
459 
460  /* Send a DataRow message */
461  pq_beginmessage(&buf, 'D');
462  pq_sendint(&buf, 2, 2); /* # of columns */
463  len = strlen(histfname);
464  pq_sendint(&buf, len, 4); /* col1 len */
465  pq_sendbytes(&buf, histfname, len);
466 
467  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
468  if (fd < 0)
469  ereport(ERROR,
471  errmsg("could not open file \"%s\": %m", path)));
472 
473  /* Determine file length and send it to client */
474  histfilelen = lseek(fd, 0, SEEK_END);
475  if (histfilelen < 0)
476  ereport(ERROR,
478  errmsg("could not seek to end of file \"%s\": %m", path)));
479  if (lseek(fd, 0, SEEK_SET) != 0)
480  ereport(ERROR,
482  errmsg("could not seek to beginning of file \"%s\": %m", path)));
483 
484  pq_sendint(&buf, histfilelen, 4); /* col2 len */
485 
486  bytesleft = histfilelen;
487  while (bytesleft > 0)
488  {
489  char rbuf[BLCKSZ];
490  int nread;
491 
493  nread = read(fd, rbuf, sizeof(rbuf));
495  if (nread <= 0)
496  ereport(ERROR,
498  errmsg("could not read file \"%s\": %m",
499  path)));
500  pq_sendbytes(&buf, rbuf, nread);
501  bytesleft -= nread;
502  }
503  CloseTransientFile(fd);
504 
505  pq_endmessage(&buf);
506 }
507 
508 /*
509  * Handle START_REPLICATION command.
510  *
511  * At the moment, this never returns, but an ereport(ERROR) will take us back
512  * to the main loop.
513  */
514 static void
516 {
518  XLogRecPtr FlushPtr;
519 
520  if (ThisTimeLineID == 0)
521  ereport(ERROR,
522  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
523  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
524 
525  /*
526  * We assume here that we're logging enough information in the WAL for
527  * log-shipping, since this is checked in PostmasterMain().
528  *
529  * NOTE: wal_level can only change at shutdown, so in most cases it is
530  * difficult for there to be WAL data that we can still see that was
531  * written at wal_level='minimal'.
532  */
533 
534  if (cmd->slotname)
535  {
538  ereport(ERROR,
539  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
540  (errmsg("cannot use a logical replication slot for physical replication"))));
541  }
542 
543  /*
544  * Select the timeline. If it was given explicitly by the client, use
545  * that. Otherwise use the timeline of the last replayed record, which is
546  * kept in ThisTimeLineID.
547  */
549  {
550  /* this also updates ThisTimeLineID */
551  FlushPtr = GetStandbyFlushRecPtr();
552  }
553  else
554  FlushPtr = GetFlushRecPtr();
555 
556  if (cmd->timeline != 0)
557  {
558  XLogRecPtr switchpoint;
559 
560  sendTimeLine = cmd->timeline;
562  {
563  sendTimeLineIsHistoric = false;
565  }
566  else
567  {
568  List *timeLineHistory;
569 
570  sendTimeLineIsHistoric = true;
571 
572  /*
573  * Check that the timeline the client requested exists, and
574  * the requested start location is on that timeline.
575  */
576  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
577  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
579  list_free_deep(timeLineHistory);
580 
581  /*
582  * Found the requested timeline in the history. Check that
583  * requested startpoint is on that timeline in our history.
584  *
585  * This is quite loose on purpose. We only check that we didn't
586  * fork off the requested timeline before the switchpoint. We
587  * don't check that we switched *to* it before the requested
588  * starting point. This is because the client can legitimately
589  * request to start replication from the beginning of the WAL
590  * segment that contains switchpoint, but on the new timeline, so
591  * that it doesn't end up with a partial segment. If you ask for
592  * too old a starting point, you'll get an error later when we fail
593  * to find the requested WAL segment in pg_wal.
594  *
595  * XXX: we could be more strict here and only allow a startpoint
596  * that's older than the switchpoint, if it's still in the same
597  * WAL segment.
598  */
599  if (!XLogRecPtrIsInvalid(switchpoint) &&
600  switchpoint < cmd->startpoint)
601  {
602  ereport(ERROR,
603  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
604  (uint32) (cmd->startpoint >> 32),
605  (uint32) (cmd->startpoint),
606  cmd->timeline),
607  errdetail("This server's history forked from timeline %u at %X/%X.",
608  cmd->timeline,
609  (uint32) (switchpoint >> 32),
610  (uint32) (switchpoint))));
611  }
612  sendTimeLineValidUpto = switchpoint;
613  }
614  }
615  else
616  {
619  sendTimeLineIsHistoric = false;
620  }
621 
623 
624  /* If there is nothing to stream, don't even enter COPY mode */
626  {
627  /*
628  * When we first start replication the standby will be behind the
629  * primary. For some applications, for example synchronous
630  * replication, it is important to have a clear state for this initial
631  * catchup mode, so we can trigger actions when we change streaming
632  * state later. We may stay in this state for a long time, which is
633  * exactly why we want to be able to monitor whether or not we are
634  * still here.
635  */
637 
638  /* Send a CopyBothResponse message, and start streaming */
639  pq_beginmessage(&buf, 'W');
640  pq_sendbyte(&buf, 0);
641  pq_sendint(&buf, 0, 2);
642  pq_endmessage(&buf);
643  pq_flush();
644 
645  /*
646  * Don't allow a request to stream from a future point in WAL that
647  * hasn't been flushed to disk in this server yet.
648  */
649  if (FlushPtr < cmd->startpoint)
650  {
651  ereport(ERROR,
652  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
653  (uint32) (cmd->startpoint >> 32),
654  (uint32) (cmd->startpoint),
655  (uint32) (FlushPtr >> 32),
656  (uint32) (FlushPtr))));
657  }
658 
659  /* Start streaming from the requested point */
660  sentPtr = cmd->startpoint;
661 
662  /* Initialize shared memory status, too */
663  {
664  WalSnd *walsnd = MyWalSnd;
665 
666  SpinLockAcquire(&walsnd->mutex);
667  walsnd->sentPtr = sentPtr;
668  SpinLockRelease(&walsnd->mutex);
669  }
670 
672 
673  /* Main loop of walsender */
674  replication_active = true;
675 
677 
678  replication_active = false;
680  proc_exit(0);
682 
684  }
685 
686  if (cmd->slotname)
688 
689  /*
690  * Copy is finished now. Send a single-row result set indicating the next
691  * timeline.
692  */
694  {
695  char startpos_str[8 + 1 + 8 + 1];
696  DestReceiver *dest;
697  TupOutputState *tstate;
698  TupleDesc tupdesc;
699  Datum values[2];
700  bool nulls[2];
701 
702  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
703  (uint32) (sendTimeLineValidUpto >> 32),
705 
707  MemSet(nulls, false, sizeof(nulls));
708 
709  /*
710  * Need a tuple descriptor representing two columns.
711  * int8 may seem like a surprising data type for this, but in theory
712  * int4 would not be wide enough for this, as TimeLineID is unsigned.
713  */
714  tupdesc = CreateTemplateTupleDesc(2, false);
715  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
716  INT8OID, -1, 0);
717  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
718  TEXTOID, -1, 0);
719 
720  /* prepare for projection of tuple */
721  tstate = begin_tup_output_tupdesc(dest, tupdesc);
722 
723  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
724  values[1] = CStringGetTextDatum(startpos_str);
725 
726  /* send it to dest */
727  do_tup_output(tstate, values, nulls);
728 
729  end_tup_output(tstate);
730  }
731 
732  /* Send CommandComplete message */
733  pq_puttextmessage('C', "START_STREAMING");
734 }
735 
736 /*
737  * read_page callback for logical decoding contexts, as a walsender process.
738  *
739  * Inside the walsender we can do better than logical_read_local_xlog_page,
740  * which has to do a plain sleep/busy loop, because the walsender's latch gets
741  * set every time WAL is flushed.
742  */
743 static int
745  XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
746 {
747  XLogRecPtr flushptr;
748  int count;
749 
750  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
752  sendTimeLine = state->currTLI;
754  sendTimeLineNextTLI = state->nextTLI;
755 
756  /* make sure we have enough WAL available */
757  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
758 
759  /* more than one block available */
760  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
761  count = XLOG_BLCKSZ;
762  /* not enough WAL synced, that can happen during shutdown */
763  else if (targetPagePtr + reqLen > flushptr)
764  return -1;
765  /* part of the page available */
766  else
767  count = flushptr - targetPagePtr;
768 
769  /* now actually read the data, we know it's there */
770  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
771 
772  return count;
773 }
774 
775 /*
776  * Process extra options given to CREATE_REPLICATION_SLOT.
777  */
778 static void
780  bool *reserve_wal,
781  CRSSnapshotAction *snapshot_action)
782 {
783  ListCell *lc;
784  bool snapshot_action_given = false;
785  bool reserve_wal_given = false;
786 
787  /* Parse options */
788  foreach (lc, cmd->options)
789  {
790  DefElem *defel = (DefElem *) lfirst(lc);
791 
792  if (strcmp(defel->defname, "export_snapshot") == 0)
793  {
794  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
795  ereport(ERROR,
796  (errcode(ERRCODE_SYNTAX_ERROR),
797  errmsg("conflicting or redundant options")));
798 
799  snapshot_action_given = true;
800  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
802  }
803  else if (strcmp(defel->defname, "use_snapshot") == 0)
804  {
805  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
806  ereport(ERROR,
807  (errcode(ERRCODE_SYNTAX_ERROR),
808  errmsg("conflicting or redundant options")));
809 
810  snapshot_action_given = true;
811  *snapshot_action = CRS_USE_SNAPSHOT;
812  }
813  else if (strcmp(defel->defname, "reserve_wal") == 0)
814  {
815  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
816  ereport(ERROR,
817  (errcode(ERRCODE_SYNTAX_ERROR),
818  errmsg("conflicting or redundant options")));
819 
820  reserve_wal_given = true;
821  *reserve_wal = true;
822  }
823  else
824  elog(ERROR, "unrecognized option: %s", defel->defname);
825  }
826 }
827 
828 /*
829  * Create a new replication slot.
830  */
831 static void
833 {
834  const char *snapshot_name = NULL;
835  char xpos[MAXFNAMELEN];
836  char *slot_name;
837  bool reserve_wal = false;
838  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
839  DestReceiver *dest;
840  TupOutputState *tstate;
841  TupleDesc tupdesc;
842  Datum values[4];
843  bool nulls[4];
844 
846 
847  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
848 
849  /* setup state for XLogReadPage */
850  sendTimeLineIsHistoric = false;
852 
853  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
854  {
855  ReplicationSlotCreate(cmd->slotname, false,
857  }
858  else
859  {
861 
862  /*
863  * Initially create persistent slot as ephemeral - that allows us to
864  * nicely handle errors during initialization because it'll get
865  * dropped if this transaction fails. We'll make it persistent at the
866  * end. Temporary slots can be created as temporary from beginning as
867  * they get dropped on error as well.
868  */
869  ReplicationSlotCreate(cmd->slotname, true,
871  }
872 
873  if (cmd->kind == REPLICATION_KIND_LOGICAL)
874  {
876 
877  /*
878  * Do options check early so that we can bail before calling the
879  * DecodingContextFindStartpoint which can take long time.
880  */
881  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
882  {
883  if (IsTransactionBlock())
884  ereport(ERROR,
885  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
886  "must not be called inside a transaction")));
887  }
888  else if (snapshot_action == CRS_USE_SNAPSHOT)
889  {
890  if (!IsTransactionBlock())
891  ereport(ERROR,
892  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
893  "must be called inside a transaction")));
894 
896  ereport(ERROR,
897  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
898  "must be called in REPEATABLE READ isolation mode transaction")));
899 
900  if (FirstSnapshotSet)
901  ereport(ERROR,
902  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
903  "must be called before any query")));
904 
905  if (IsSubTransaction())
906  ereport(ERROR,
907  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
908  "must not be called in a subtransaction")));
909  }
910 
914 
915  /*
916  * Signal that we don't need the timeout mechanism. We're just
917  * creating the replication slot and don't yet accept feedback
918  * messages or send keepalives. As we possibly need to wait for
919  * further WAL the walsender would otherwise possibly be killed too
920  * soon.
921  */
923 
924  /* build initial snapshot, might take a while */
926 
927  /*
928  * Export or use the snapshot if we've been asked to do so.
929  *
930  * NB. We will convert the snapbuild.c kind of snapshot to normal
931  * snapshot when doing this.
932  */
933  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
934  {
935  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
936  }
937  else if (snapshot_action == CRS_USE_SNAPSHOT)
938  {
939  Snapshot snap;
940 
943  }
944 
945  /* don't need the decoding context anymore */
946  FreeDecodingContext(ctx);
947 
948  if (!cmd->temporary)
950  }
951  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
952  {
954 
956 
957  /* Write this slot to disk if it's a permanent one. */
958  if (!cmd->temporary)
960  }
961 
962  snprintf(xpos, sizeof(xpos), "%X/%X",
965 
967  MemSet(nulls, false, sizeof(nulls));
968 
969  /*----------
970  * Need a tuple descriptor representing four columns:
971  * - first field: the slot name
972  * - second field: LSN at which we became consistent
973  * - third field: exported snapshot's name
974  * - fourth field: output plugin
975  *----------
976  */
977  tupdesc = CreateTemplateTupleDesc(4, false);
978  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
979  TEXTOID, -1, 0);
980  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
981  TEXTOID, -1, 0);
982  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
983  TEXTOID, -1, 0);
984  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
985  TEXTOID, -1, 0);
986 
987  /* prepare for projection of tuples */
988  tstate = begin_tup_output_tupdesc(dest, tupdesc);
989 
990  /* slot_name */
991  slot_name = NameStr(MyReplicationSlot->data.name);
992  values[0] = CStringGetTextDatum(slot_name);
993 
994  /* consistent wal location */
995  values[1] = CStringGetTextDatum(xpos);
996 
997  /* snapshot name, or NULL if none */
998  if (snapshot_name != NULL)
999  values[2] = CStringGetTextDatum(snapshot_name);
1000  else
1001  nulls[2] = true;
1002 
1003  /* plugin, or NULL if none */
1004  if (cmd->plugin != NULL)
1005  values[3] = CStringGetTextDatum(cmd->plugin);
1006  else
1007  nulls[3] = true;
1008 
1009  /* send it to dest */
1010  do_tup_output(tstate, values, nulls);
1011  end_tup_output(tstate);
1012 
1014 }
1015 
1016 /*
1017  * Get rid of a replication slot that is no longer wanted.
1018  */
1019 static void
1021 {
1023  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1024 }
1025 
1026 /*
1027  * Load previously initiated logical slot and prepare for sending data (via
1028  * WalSndLoop).
1029  */
1030 static void
1032 {
1034 
1035  /* make sure that our requirements are still fulfilled */
1037 
1039 
1041 
1042  /*
1043  * Force a disconnect, so that the decoding code doesn't need to care
1044  * about an eventual switch from running in recovery, to running in a
1045  * normal environment. Client code is expected to handle reconnects.
1046  */
1048  {
1049  ereport(LOG,
1050  (errmsg("terminating walsender process after promotion")));
1051  walsender_ready_to_stop = true;
1052  }
1053 
1055 
1056  /* Send a CopyBothResponse message, and start streaming */
1057  pq_beginmessage(&buf, 'W');
1058  pq_sendbyte(&buf, 0);
1059  pq_sendint(&buf, 0, 2);
1060  pq_endmessage(&buf);
1061  pq_flush();
1062 
1063  /*
1064  * Initialize position to the last ack'ed one, then the xlog records begin
1065  * to be shipped from that position.
1066  */
1067  logical_decoding_ctx = CreateDecodingContext(
1068  cmd->startpoint, cmd->options,
1071 
1072  /* Start reading WAL from the oldest required WAL. */
1074 
1075  /*
1076  * Report the location after which we'll send out further commits as the
1077  * current sentPtr.
1078  */
1080 
1081  /* Also update the sent position status in shared memory */
1082  {
1083  WalSnd *walsnd = MyWalSnd;
1084 
1085  SpinLockAcquire(&walsnd->mutex);
1087  SpinLockRelease(&walsnd->mutex);
1088  }
1089 
1090  replication_active = true;
1091 
1093 
1094  /* Main loop of walsender */
1096 
1097  FreeDecodingContext(logical_decoding_ctx);
1099 
1100  replication_active = false;
1102  proc_exit(0);
1104 
1105  /* Get out of COPY mode (CommandComplete). */
1106  EndCommand("COPY 0", DestRemote);
1107 }
1108 
1109 /*
1110  * LogicalDecodingContext 'prepare_write' callback.
1111  *
1112  * Prepare a write into a StringInfo.
1113  *
1114  * Don't do anything lasting in here, it's quite possible that nothing will be done
1115  * with the data.
1116  */
1117 static void
1119 {
1120  /* can't have sync rep confused by sending the same LSN several times */
1121  if (!last_write)
1122  lsn = InvalidXLogRecPtr;
1123 
1124  resetStringInfo(ctx->out);
1125 
1126  pq_sendbyte(ctx->out, 'w');
1127  pq_sendint64(ctx->out, lsn); /* dataStart */
1128  pq_sendint64(ctx->out, lsn); /* walEnd */
1129 
1130  /*
1131  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1132  * reserve space here.
1133  */
1134  pq_sendint64(ctx->out, 0); /* sendtime */
1135 }
1136 
1137 /*
1138  * LogicalDecodingContext 'write' callback.
1139  *
1140  * Actually write out data previously prepared by WalSndPrepareWrite out to
1141  * the network. Take as long as needed, but process replies from the other
1142  * side and check timeouts during that.
1143  */
1144 static void
1146  bool last_write)
1147 {
1148  /* output previously gathered data in a CopyData packet */
1149  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1150 
1151  /*
1152  * Fill the send timestamp last, so that it is taken as late as possible.
1153  * This is somewhat ugly, but the protocol is set as it's already used for
1154  * several releases by streaming physical replication.
1155  */
1156  resetStringInfo(&tmpbuf);
1157  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
1158  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1159  tmpbuf.data, sizeof(int64));
1160 
1161  /* fast path */
1162  /* Try to flush pending output to the client */
1163  if (pq_flush_if_writable() != 0)
1164  WalSndShutdown();
1165 
1166  if (!pq_is_send_pending())
1167  return;
1168 
1169  for (;;)
1170  {
1171  int wakeEvents;
1172  long sleeptime;
1173  TimestampTz now;
1174 
1175  /*
1176  * Emergency bailout if postmaster has died. This is to avoid the
1177  * necessity for manual cleanup of all postmaster children.
1178  */
1179  if (!PostmasterIsAlive())
1180  exit(1);
1181 
1182  /* Clear any already-pending wakeups */
1184 
1186 
1187  /* Process any requests or signals received recently */
1188  if (got_SIGHUP)
1189  {
1190  got_SIGHUP = false;
1193  }
1194 
1195  /* Check for input from the client */
1197 
1198  /* Try to flush pending output to the client */
1199  if (pq_flush_if_writable() != 0)
1200  WalSndShutdown();
1201 
1202  /* If we finished clearing the buffered data, we're done here. */
1203  if (!pq_is_send_pending())
1204  break;
1205 
1206  now = GetCurrentTimestamp();
1207 
1208  /* die if timeout was reached */
1209  WalSndCheckTimeOut(now);
1210 
1211  /* Send keepalive if the time has come */
1213 
1214  sleeptime = WalSndComputeSleeptime(now);
1215 
1216  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1218 
1219  /* Sleep until something happens or we time out */
1220  WaitLatchOrSocket(MyLatch, wakeEvents,
1221  MyProcPort->sock, sleeptime,
1223  }
1224 
1225  /* reactivate latch so WalSndLoop knows to continue */
1226  SetLatch(MyLatch);
1227 }
1228 
1229 /*
1230  * Wait till WAL < loc is flushed to disk so it can be safely read.
1231  */
1232 static XLogRecPtr
1234 {
1235  int wakeEvents;
1236  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1237 
1238 
1239  /*
1240  * Fast path to avoid acquiring the spinlock in case we already know we
1241  * have enough WAL available. This is particularly interesting if we're
1242  * far behind.
1243  */
1244  if (RecentFlushPtr != InvalidXLogRecPtr &&
1245  loc <= RecentFlushPtr)
1246  return RecentFlushPtr;
1247 
1248  /* Get a more recent flush pointer. */
1249  if (!RecoveryInProgress())
1250  RecentFlushPtr = GetFlushRecPtr();
1251  else
1252  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1253 
1254  for (;;)
1255  {
1256  long sleeptime;
1257  TimestampTz now;
1258 
1259  /*
1260  * Emergency bailout if postmaster has died. This is to avoid the
1261  * necessity for manual cleanup of all postmaster children.
1262  */
1263  if (!PostmasterIsAlive())
1264  exit(1);
1265 
1266  /* Clear any already-pending wakeups */
1268 
1270 
1271  /* Process any requests or signals received recently */
1272  if (got_SIGHUP)
1273  {
1274  got_SIGHUP = false;
1277  }
1278 
1279  /* Check for input from the client */
1281 
1282  /* Update our idea of the currently flushed position. */
1283  if (!RecoveryInProgress())
1284  RecentFlushPtr = GetFlushRecPtr();
1285  else
1286  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1287 
1288  /*
1289  * If postmaster asked us to stop, don't wait here anymore. This will
1290  * cause the xlogreader to return without reading a full record, which
1291  * is the fastest way to reach the mainloop which then can quit.
1292  *
1293  * It's important to do this check after the recomputation of
1294  * RecentFlushPtr, so we can send all remaining data before shutting
1295  * down.
1296  */
1298  break;
1299 
1300  /*
1301  * We only send regular messages to the client for full decoded
1302  * transactions, but a synchronous replication and walsender shutdown
1303  * possibly are waiting for a later location. So we send pings
1304  * containing the flush location every now and then.
1305  */
1306  if (MyWalSnd->flush < sentPtr &&
1307  MyWalSnd->write < sentPtr &&
1309  {
1310  WalSndKeepalive(false);
1312  }
1313 
1314  /* check whether we're done */
1315  if (loc <= RecentFlushPtr)
1316  break;
1317 
1318  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1319  WalSndCaughtUp = true;
1320 
1321  /*
1322  * Try to flush pending output to the client. Also wait for the socket
1323  * becoming writable, if there's still pending output after an attempt
1324  * to flush. Otherwise we might just sit on output data while waiting
1325  * for new WAL being generated.
1326  */
1327  if (pq_flush_if_writable() != 0)
1328  WalSndShutdown();
1329 
1330  now = GetCurrentTimestamp();
1331 
1332  /* die if timeout was reached */
1333  WalSndCheckTimeOut(now);
1334 
1335  /* Send keepalive if the time has come */
1337 
1338  sleeptime = WalSndComputeSleeptime(now);
1339 
1340  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1342 
1343  if (pq_is_send_pending())
1344  wakeEvents |= WL_SOCKET_WRITEABLE;
1345 
1346  /* Sleep until something happens or we time out */
1347  WaitLatchOrSocket(MyLatch, wakeEvents,
1348  MyProcPort->sock, sleeptime,
1350  }
1351 
1352  /* reactivate latch so WalSndLoop knows to continue */
1353  SetLatch(MyLatch);
1354  return RecentFlushPtr;
1355 }
1356 
1357 /*
1358  * Execute an incoming replication command.
1359  *
1360  * Returns true if the cmd_string was recognized as WalSender command, false
1361  * if not.
1362  */
1363 bool
1364 exec_replication_command(const char *cmd_string)
1365 {
1366  int parse_rc;
1367  Node *cmd_node;
1368  MemoryContext cmd_context;
1369  MemoryContext old_context;
1370 
1371  /*
1372  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1373  * command arrives. Clean up the old stuff if there's anything.
1374  */
1376 
1378 
1380  "Replication command context",
1382  old_context = MemoryContextSwitchTo(cmd_context);
1383 
1384  replication_scanner_init(cmd_string);
1385  parse_rc = replication_yyparse();
1386  if (parse_rc != 0)
1387  ereport(ERROR,
1388  (errcode(ERRCODE_SYNTAX_ERROR),
1389  (errmsg_internal("replication command parser returned %d",
1390  parse_rc))));
1391 
1392  cmd_node = replication_parse_result;
1393 
1394  /*
1395  * Log replication command if log_replication_commands is enabled. Even
1396  * when it's disabled, log the command with DEBUG1 level for backward
1397  * compatibility. Note that SQL commands are not logged here, and will be
1398  * logged later if log_statement is enabled.
1399  */
1400  if (cmd_node->type != T_SQLCmd)
1402  (errmsg("received replication command: %s", cmd_string)));
1403 
1404  /*
1405  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1406  * called outside of transaction the snapshot should be cleared here.
1407  */
1408  if (!IsTransactionBlock())
1410 
1411  /*
1412  * For aborted transactions, don't allow anything except pure SQL,
1413  * the exec_simple_query() will handle it correctly.
1414  */
1415  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1416  ereport(ERROR,
1417  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1418  errmsg("current transaction is aborted, "
1419  "commands ignored until end of transaction block")));
1420 
1422 
1423  /*
1424  * Allocate buffers that will be used for each outgoing and incoming
1425  * message. We do this just once per command to reduce palloc overhead.
1426  */
1427  initStringInfo(&output_message);
1428  initStringInfo(&reply_message);
1429  initStringInfo(&tmpbuf);
1430 
1431  switch (cmd_node->type)
1432  {
1433  case T_IdentifySystemCmd:
1434  IdentifySystem();
1435  break;
1436 
1437  case T_BaseBackupCmd:
1438  PreventTransactionChain(true, "BASE_BACKUP");
1439  SendBaseBackup((BaseBackupCmd *) cmd_node);
1440  break;
1441 
1444  break;
1445 
1448  break;
1449 
1450  case T_StartReplicationCmd:
1451  {
1452  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1453 
1454  PreventTransactionChain(true, "START_REPLICATION");
1455 
1456  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1457  StartReplication(cmd);
1458  else
1460  break;
1461  }
1462 
1463  case T_TimeLineHistoryCmd:
1464  PreventTransactionChain(true, "TIMELINE_HISTORY");
1466  break;
1467 
1468  case T_VariableShowStmt:
1469  {
1471  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1472 
1473  GetPGVariable(n->name, dest);
1474  }
1475  break;
1476 
1477  case T_SQLCmd:
1478  if (MyDatabaseId == InvalidOid)
1479  ereport(ERROR,
1480  (errmsg("not connected to database")));
1481 
1482  /* Tell the caller that this wasn't a WalSender command. */
1483  return false;
1484 
1485  default:
1486  elog(ERROR, "unrecognized replication command node tag: %u",
1487  cmd_node->type);
1488  }
1489 
1490  /* done */
1491  MemoryContextSwitchTo(old_context);
1492  MemoryContextDelete(cmd_context);
1493 
1494  /* Send CommandComplete message */
1495  EndCommand("SELECT", DestRemote);
1496 
1497  return true;
1498 }
1499 
1500 /*
1501  * Process any incoming messages while streaming. Also checks if the remote
1502  * end has closed the connection.
1503  */
1504 static void
1506 {
1507  unsigned char firstchar;
1508  int r;
1509  bool received = false;
1510 
1511  for (;;)
1512  {
1513  pq_startmsgread();
1514  r = pq_getbyte_if_available(&firstchar);
1515  if (r < 0)
1516  {
1517  /* unexpected error or EOF */
1519  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1520  errmsg("unexpected EOF on standby connection")));
1521  proc_exit(0);
1522  }
1523  if (r == 0)
1524  {
1525  /* no data available without blocking */
1526  pq_endmsgread();
1527  break;
1528  }
1529 
1530  /* Read the message contents */
1531  resetStringInfo(&reply_message);
1532  if (pq_getmessage(&reply_message, 0))
1533  {
1535  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1536  errmsg("unexpected EOF on standby connection")));
1537  proc_exit(0);
1538  }
1539 
1540  /*
1541  * If we already received a CopyDone from the frontend, the frontend
1542  * should not send us anything until we've closed our end of the COPY.
1543  * XXX: In theory, the frontend could already send the next command
1544  * before receiving the CopyDone, but libpq doesn't currently allow
1545  * that.
1546  */
1547  if (streamingDoneReceiving && firstchar != 'X')
1548  ereport(FATAL,
1549  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1550  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1551  firstchar)));
1552 
1553  /* Handle the very limited subset of commands expected in this phase */
1554  switch (firstchar)
1555  {
1556  /*
1557  * 'd' means a standby reply wrapped in a CopyData packet.
1558  */
1559  case 'd':
1561  received = true;
1562  break;
1563 
1564  /*
1565  * CopyDone means the standby requested to finish streaming.
1566  * Reply with CopyDone, if we had not sent that already.
1567  */
1568  case 'c':
1569  if (!streamingDoneSending)
1570  {
1571  pq_putmessage_noblock('c', NULL, 0);
1572  streamingDoneSending = true;
1573  }
1574 
1575  streamingDoneReceiving = true;
1576  received = true;
1577  break;
1578 
1579  /*
1580  * 'X' means that the standby is closing down the socket.
1581  */
1582  case 'X':
1583  proc_exit(0);
1584 
1585  default:
1586  ereport(FATAL,
1587  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1588  errmsg("invalid standby message type \"%c\"",
1589  firstchar)));
1590  }
1591  }
1592 
1593  /*
1594  * Save the last reply timestamp if we've received at least one reply.
1595  */
1596  if (received)
1597  {
1599  waiting_for_ping_response = false;
1600  }
1601 }
1602 
1603 /*
1604  * Process a status update message received from standby.
1605  */
1606 static void
1608 {
1609  char msgtype;
1610 
1611  /*
1612  * Check message type from the first byte.
1613  */
1614  msgtype = pq_getmsgbyte(&reply_message);
1615 
1616  switch (msgtype)
1617  {
1618  case 'r':
1620  break;
1621 
1622  case 'h':
1624  break;
1625 
1626  default:
1628  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1629  errmsg("unexpected message type \"%c\"", msgtype)));
1630  proc_exit(0);
1631  }
1632 }
1633 
1634 /*
1635  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1636  */
1637 static void
1639 {
1640  bool changed = false;
1642 
1643  Assert(lsn != InvalidXLogRecPtr);
1644  SpinLockAcquire(&slot->mutex);
1645  if (slot->data.restart_lsn != lsn)
1646  {
1647  changed = true;
1648  slot->data.restart_lsn = lsn;
1649  }
1650  SpinLockRelease(&slot->mutex);
1651 
1652  if (changed)
1653  {
1656  }
1657 
1658  /*
1659  * One could argue that the slot should be saved to disk now, but that'd
1660  * be energy wasted - the worst lost information can do here is give us
1661  * wrong information in a statistics view - we'll just potentially be more
1662  * conservative in removing files.
1663  */
1664 }
1665 
1666 /*
1667  * Regular reply from standby advising of WAL positions on standby server.
1668  */
1669 static void
1671 {
1672  XLogRecPtr writePtr,
1673  flushPtr,
1674  applyPtr;
1675  bool replyRequested;
1676  TimeOffset writeLag,
1677  flushLag,
1678  applyLag;
1679  bool clearLagTimes;
1680  TimestampTz now;
1681 
1682  static bool fullyAppliedLastTime = false;
1683 
1684  /* the caller already consumed the msgtype byte */
1685  writePtr = pq_getmsgint64(&reply_message);
1686  flushPtr = pq_getmsgint64(&reply_message);
1687  applyPtr = pq_getmsgint64(&reply_message);
1688  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1689  replyRequested = pq_getmsgbyte(&reply_message);
1690 
1691  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1692  (uint32) (writePtr >> 32), (uint32) writePtr,
1693  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1694  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1695  replyRequested ? " (reply requested)" : "");
1696 
1697  /* See if we can compute the round-trip lag for these positions. */
1698  now = GetCurrentTimestamp();
1699  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1700  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1701  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1702 
1703  /*
1704  * If the standby reports that it has fully replayed the WAL in two
1705  * consecutive reply messages, then the second such message must result
1706  * from wal_receiver_status_interval expiring on the standby. This is a
1707  * convenient time to forget the lag times measured when it last
1708  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1709  * until more WAL traffic arrives.
1710  */
1711  clearLagTimes = false;
1712  if (applyPtr == sentPtr)
1713  {
1714  if (fullyAppliedLastTime)
1715  clearLagTimes = true;
1716  fullyAppliedLastTime = true;
1717  }
1718  else
1719  fullyAppliedLastTime = false;
1720 
1721  /* Send a reply if the standby requested one. */
1722  if (replyRequested)
1723  WalSndKeepalive(false);
1724 
1725  /*
1726  * Update shared state for this WalSender process based on reply data from
1727  * standby.
1728  */
1729  {
1730  WalSnd *walsnd = MyWalSnd;
1731 
1732  SpinLockAcquire(&walsnd->mutex);
1733  walsnd->write = writePtr;
1734  walsnd->flush = flushPtr;
1735  walsnd->apply = applyPtr;
1736  if (writeLag != -1 || clearLagTimes)
1737  walsnd->writeLag = writeLag;
1738  if (flushLag != -1 || clearLagTimes)
1739  walsnd->flushLag = flushLag;
1740  if (applyLag != -1 || clearLagTimes)
1741  walsnd->applyLag = applyLag;
1742  SpinLockRelease(&walsnd->mutex);
1743  }
1744 
1747 
1748  /*
1749  * Advance our local xmin horizon when the client confirmed a flush.
1750  */
1751  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1752  {
1755  else
1757  }
1758 }
1759 
1760 /* compute new replication slot xmin horizon if needed */
1761 static void
1763 {
1764  bool changed = false;
1766 
1767  SpinLockAcquire(&slot->mutex);
1769 
1770  /*
1771  * For physical replication we don't need the interlock provided by xmin
1772  * and effective_xmin since the consequences of a missed increase are
1773  * limited to query cancellations, so set both at once.
1774  */
1775  if (!TransactionIdIsNormal(slot->data.xmin) ||
1776  !TransactionIdIsNormal(feedbackXmin) ||
1777  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1778  {
1779  changed = true;
1780  slot->data.xmin = feedbackXmin;
1781  slot->effective_xmin = feedbackXmin;
1782  }
1783  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1784  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1785  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1786  {
1787  changed = true;
1788  slot->data.catalog_xmin = feedbackCatalogXmin;
1789  slot->effective_catalog_xmin = feedbackCatalogXmin;
1790  }
1791  SpinLockRelease(&slot->mutex);
1792 
1793  if (changed)
1794  {
1797  }
1798 }
1799 
1800 /*
1801  * Check that the provided xmin/epoch are sane, that is, not in the future
1802  * and not so far back as to be already wrapped around.
1803  *
1804  * Epoch of nextXid should be same as standby, or if the counter has
1805  * wrapped, then one greater than standby.
1806  *
1807  * This check doesn't care about whether clog exists for these xids
1808  * at all.
1809  */
1810 static bool
1812 {
1813  TransactionId nextXid;
1814  uint32 nextEpoch;
1815 
1816  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1817 
1818  if (xid <= nextXid)
1819  {
1820  if (epoch != nextEpoch)
1821  return false;
1822  }
1823  else
1824  {
1825  if (epoch + 1 != nextEpoch)
1826  return false;
1827  }
1828 
1829  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1830  return false; /* epoch OK, but it's wrapped around */
1831 
1832  return true;
1833 }
1834 
1835 /*
1836  * Hot Standby feedback
1837  */
1838 static void
1840 {
1841  TransactionId feedbackXmin;
1842  uint32 feedbackEpoch;
1843  TransactionId feedbackCatalogXmin;
1844  uint32 feedbackCatalogEpoch;
1845 
1846  /*
1847  * Decipher the reply message. The caller already consumed the msgtype
1848  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1849  * of this message.
1850  */
1851  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1852  feedbackXmin = pq_getmsgint(&reply_message, 4);
1853  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1854  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1855  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1856 
1857  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1858  feedbackXmin,
1859  feedbackEpoch,
1860  feedbackCatalogXmin,
1861  feedbackCatalogEpoch);
1862 
1863  /*
1864  * Unset WalSender's xmins if the feedback message values are invalid.
1865  * This happens when the downstream turned hot_standby_feedback off.
1866  */
1867  if (!TransactionIdIsNormal(feedbackXmin)
1868  && !TransactionIdIsNormal(feedbackCatalogXmin))
1869  {
1871  if (MyReplicationSlot != NULL)
1872  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1873  return;
1874  }
1875 
1876  /*
1877  * Check that the provided xmin/epoch are sane, that is, not in the future
1878  * and not so far back as to be already wrapped around. Ignore if not.
1879  */
1880  if (TransactionIdIsNormal(feedbackXmin) &&
1881  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1882  return;
1883 
1884  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1885  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1886  return;
1887 
1888  /*
1889  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1890  * the xmin will be taken into account by GetOldestXmin. This will hold
1891  * back the removal of dead rows and thereby prevent the generation of
1892  * cleanup conflicts on the standby server.
1893  *
1894  * There is a small window for a race condition here: although we just
1895  * checked that feedbackXmin precedes nextXid, the nextXid could have
1896  * gotten advanced between our fetching it and applying the xmin below,
1897  * perhaps far enough to make feedbackXmin wrap around. In that case the
1898  * xmin we set here would be "in the future" and have no effect. No point
1899  * in worrying about this since it's too late to save the desired data
1900  * anyway. Assuming that the standby sends us an increasing sequence of
1901  * xmins, this could only happen during the first reply cycle, else our
1902  * own xmin would prevent nextXid from advancing so far.
1903  *
1904  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1905  * is assumed atomic, and there's no real need to prevent a concurrent
1906  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1907  * safe, and if we're moving it backwards, well, the data is at risk
1908  * already since a VACUUM could have just finished calling GetOldestXmin.)
1909  *
1910  * If we're using a replication slot we reserve the xmin via that,
1911  * otherwise via the walsender's PGXACT entry. We can only track the
1912  * catalog xmin separately when using a slot, so we store the least
1913  * of the two provided when not using a slot.
1914  *
1915  * XXX: It might make sense to generalize the ephemeral slot concept and
1916  * always use the slot mechanism to handle the feedback xmin.
1917  */
1918  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1919  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1920  else
1921  {
1922  if (TransactionIdIsNormal(feedbackCatalogXmin)
1923  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
1924  MyPgXact->xmin = feedbackCatalogXmin;
1925  else
1926  MyPgXact->xmin = feedbackXmin;
1927  }
1928 }
1929 
1930 /*
1931  * Compute how long send/receive loops should sleep.
1932  *
1933  * If wal_sender_timeout is enabled we want to wake up in time to send
1934  * keepalives and to abort the connection if wal_sender_timeout has been
1935  * reached.
1936  */
1937 static long
1939 {
1940  long sleeptime = 10000; /* 10 s */
1941 
1943  {
1944  TimestampTz wakeup_time;
1945  long sec_to_timeout;
1946  int microsec_to_timeout;
1947 
1948  /*
1949  * At the latest stop sleeping once wal_sender_timeout has been
1950  * reached.
1951  */
1954 
1955  /*
1956  * If no ping has been sent yet, wakeup when it's time to do so.
1957  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
1958  * the timeout passed without a response.
1959  */
1962  wal_sender_timeout / 2);
1963 
1964  /* Compute relative time until wakeup. */
1965  TimestampDifference(now, wakeup_time,
1966  &sec_to_timeout, &microsec_to_timeout);
1967 
1968  sleeptime = sec_to_timeout * 1000 +
1969  microsec_to_timeout / 1000;
1970  }
1971 
1972  return sleeptime;
1973 }
1974 
1975 /*
1976  * Check whether there have been responses by the client within
1977  * wal_sender_timeout and shutdown if not.
1978  */
1979 static void
1981 {
1982  TimestampTz timeout;
1983 
1984  /* don't bail out if we're doing something that doesn't require timeouts */
1985  if (last_reply_timestamp <= 0)
1986  return;
1987 
1990 
1991  if (wal_sender_timeout > 0 && now >= timeout)
1992  {
1993  /*
1994  * Since typically expiration of replication timeout means
1995  * communication problem, we don't send the error message to the
1996  * standby.
1997  */
1999  (errmsg("terminating walsender process due to replication timeout")));
2000 
2001  WalSndShutdown();
2002  }
2003 }
2004 
2005 /* Main loop of walsender process that streams the WAL over Copy messages. */
2006 static void
2008 {
2009  /*
2010  * Initialize the last reply timestamp. That enables timeout processing
2011  * from hereon.
2012  */
2014  waiting_for_ping_response = false;
2015 
2016  /* Report to pgstat that this process is running */
2018 
2019  /*
2020  * Loop until we reach the end of this timeline or the client requests to
2021  * stop streaming.
2022  */
2023  for (;;)
2024  {
2025  TimestampTz now;
2026 
2027  /*
2028  * Emergency bailout if postmaster has died. This is to avoid the
2029  * necessity for manual cleanup of all postmaster children.
2030  */
2031  if (!PostmasterIsAlive())
2032  exit(1);
2033 
2034  /* Clear any already-pending wakeups */
2036 
2038 
2039  /* Process any requests or signals received recently */
2040  if (got_SIGHUP)
2041  {
2042  got_SIGHUP = false;
2045  }
2046 
2047  /* Check for input from the client */
2049 
2050  /*
2051  * If we have received CopyDone from the client, sent CopyDone
2052  * ourselves, and the output buffer is empty, it's time to exit
2053  * streaming.
2054  */
2056  break;
2057 
2058  /*
2059  * If we don't have any pending data in the output buffer, try to send
2060  * some more. If there is some, we don't bother to call send_data
2061  * again until we've flushed it ... but we'd better assume we are not
2062  * caught up.
2063  */
2064  if (!pq_is_send_pending())
2065  send_data();
2066  else
2067  WalSndCaughtUp = false;
2068 
2069  /* Try to flush pending output to the client */
2070  if (pq_flush_if_writable() != 0)
2071  WalSndShutdown();
2072 
2073  /* If nothing remains to be sent right now ... */
2075  {
2076  /*
2077  * If we're in catchup state, move to streaming. This is an
2078  * important state change for users to know about, since before
2079  * this point data loss might occur if the primary dies and we
2080  * need to failover to the standby. The state change is also
2081  * important for synchronous replication, since commits that
2082  * started to wait at that point might wait for some time.
2083  */
2084  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2085  {
2086  ereport(DEBUG1,
2087  (errmsg("standby \"%s\" has now caught up with primary",
2088  application_name)));
2090  }
2091 
2092  /*
2093  * When SIGUSR2 arrives, we send any outstanding logs up to the
2094  * shutdown checkpoint record (i.e., the latest record), wait for
2095  * them to be replicated to the standby, and exit. This may be a
2096  * normal termination at shutdown, or a promotion, the walsender
2097  * is not sure which.
2098  */
2100  WalSndDone(send_data);
2101  }
2102 
2103  now = GetCurrentTimestamp();
2104 
2105  /* Check for replication timeout. */
2106  WalSndCheckTimeOut(now);
2107 
2108  /* Send keepalive if the time has come */
2110 
2111  /*
2112  * We don't block if not caught up, unless there is unsent data
2113  * pending in which case we'd better block until the socket is
2114  * write-ready. This test is only needed for the case where the
2115  * send_data callback handled a subset of the available data but then
2116  * pq_flush_if_writable flushed it all --- we should immediately try
2117  * to send more.
2118  */
2120  {
2121  long sleeptime;
2122  int wakeEvents;
2123 
2124  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2126 
2127  sleeptime = WalSndComputeSleeptime(now);
2128 
2129  if (pq_is_send_pending())
2130  wakeEvents |= WL_SOCKET_WRITEABLE;
2131 
2132  /* Sleep until something happens or we time out */
2133  WaitLatchOrSocket(MyLatch, wakeEvents,
2134  MyProcPort->sock, sleeptime,
2136  }
2137  }
2138  return;
2139 }
2140 
2141 /* Initialize a per-walsender data structure for this walsender process */
2142 static void
2144 {
2145  int i;
2146 
2147  /*
2148  * WalSndCtl should be set up already (we inherit this by fork() or
2149  * EXEC_BACKEND mechanism from the postmaster).
2150  */
2151  Assert(WalSndCtl != NULL);
2152  Assert(MyWalSnd == NULL);
2153 
2154  /*
2155  * Find a free walsender slot and reserve it. If this fails, we must be
2156  * out of WalSnd structures.
2157  */
2158  for (i = 0; i < max_wal_senders; i++)
2159  {
2160  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2161 
2162  SpinLockAcquire(&walsnd->mutex);
2163 
2164  if (walsnd->pid != 0)
2165  {
2166  SpinLockRelease(&walsnd->mutex);
2167  continue;
2168  }
2169  else
2170  {
2171  /*
2172  * Found a free slot. Reserve it for us.
2173  */
2174  walsnd->pid = MyProcPid;
2175  walsnd->sentPtr = InvalidXLogRecPtr;
2176  walsnd->write = InvalidXLogRecPtr;
2177  walsnd->flush = InvalidXLogRecPtr;
2178  walsnd->apply = InvalidXLogRecPtr;
2179  walsnd->writeLag = -1;
2180  walsnd->flushLag = -1;
2181  walsnd->applyLag = -1;
2182  walsnd->state = WALSNDSTATE_STARTUP;
2183  walsnd->latch = &MyProc->procLatch;
2184  SpinLockRelease(&walsnd->mutex);
2185  /* don't need the lock anymore */
2186  MyWalSnd = (WalSnd *) walsnd;
2187 
2188  break;
2189  }
2190  }
2191  if (MyWalSnd == NULL)
2192  ereport(FATAL,
2193  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2194  errmsg("number of requested standby connections "
2195  "exceeds max_wal_senders (currently %d)",
2196  max_wal_senders)));
2197 
2198  /* Arrange to clean up at walsender exit */
2200 }
2201 
2202 /* Destroy the per-walsender data structure for this walsender process */
2203 static void
2205 {
2206  WalSnd *walsnd = MyWalSnd;
2207 
2208  Assert(walsnd != NULL);
2209 
2210  MyWalSnd = NULL;
2211 
2212  SpinLockAcquire(&walsnd->mutex);
2213  /* clear latch while holding the spinlock, so it can safely be read */
2214  walsnd->latch = NULL;
2215  /* Mark WalSnd struct as no longer being in use. */
2216  walsnd->pid = 0;
2217  SpinLockRelease(&walsnd->mutex);
2218 }
2219 
2220 /*
2221  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2222  *
2223  * XXX probably this should be improved to suck data directly from the
2224  * WAL buffers when possible.
2225  *
2226  * Will open, and keep open, one WAL segment stored in the global file
2227  * descriptor sendFile. This means if XLogRead is used once, there will
2228  * always be one descriptor left open until the process ends, but never
2229  * more than one.
2230  */
2231 static void
2232 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2233 {
2234  char *p;
2235  XLogRecPtr recptr;
2236  Size nbytes;
2237  XLogSegNo segno;
2238 
2239 retry:
2240  p = buf;
2241  recptr = startptr;
2242  nbytes = count;
2243 
2244  while (nbytes > 0)
2245  {
2246  uint32 startoff;
2247  int segbytes;
2248  int readbytes;
2249 
2250  startoff = recptr % XLogSegSize;
2251 
2252  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2253  {
2254  char path[MAXPGPATH];
2255 
2256  /* Switch to another logfile segment */
2257  if (sendFile >= 0)
2258  close(sendFile);
2259 
2260  XLByteToSeg(recptr, sendSegNo);
2261 
2262  /*-------
2263  * When reading from a historic timeline, and there is a timeline
2264  * switch within this segment, read from the WAL segment belonging
2265  * to the new timeline.
2266  *
2267  * For example, imagine that this server is currently on timeline
2268  * 5, and we're streaming timeline 4. The switch from timeline 4
2269  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2270  *
2271  * ...
2272  * 000000040000000000000012
2273  * 000000040000000000000013
2274  * 000000050000000000000013
2275  * 000000050000000000000014
2276  * ...
2277  *
2278  * In this situation, when requested to send the WAL from
2279  * segment 0x13, on timeline 4, we read the WAL from file
2280  * 000000050000000000000013. Archive recovery prefers files from
2281  * newer timelines, so if the segment was restored from the
2282  * archive on this server, the file belonging to the old timeline,
2283  * 000000040000000000000013, might not exist. Their contents are
2284  * equal up to the switchpoint, because at a timeline switch, the
2285  * used portion of the old segment is copied to the new file.
2286  *-------
2287  */
2290  {
2291  XLogSegNo endSegNo;
2292 
2294  if (sendSegNo == endSegNo)
2296  }
2297 
2299 
2300  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2301  if (sendFile < 0)
2302  {
2303  /*
2304  * If the file is not found, assume it's because the standby
2305  * asked for a too old WAL segment that has already been
2306  * removed or recycled.
2307  */
2308  if (errno == ENOENT)
2309  ereport(ERROR,
2311  errmsg("requested WAL segment %s has already been removed",
2313  else
2314  ereport(ERROR,
2316  errmsg("could not open file \"%s\": %m",
2317  path)));
2318  }
2319  sendOff = 0;
2320  }
2321 
2322  /* Need to seek in the file? */
2323  if (sendOff != startoff)
2324  {
2325  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2326  ereport(ERROR,
2328  errmsg("could not seek in log segment %s to offset %u: %m",
2330  startoff)));
2331  sendOff = startoff;
2332  }
2333 
2334  /* How many bytes are within this segment? */
2335  if (nbytes > (XLogSegSize - startoff))
2336  segbytes = XLogSegSize - startoff;
2337  else
2338  segbytes = nbytes;
2339 
2341  readbytes = read(sendFile, p, segbytes);
2343  if (readbytes <= 0)
2344  {
2345  ereport(ERROR,
2347  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2349  sendOff, (unsigned long) segbytes)));
2350  }
2351 
2352  /* Update state for read */
2353  recptr += readbytes;
2354 
2355  sendOff += readbytes;
2356  nbytes -= readbytes;
2357  p += readbytes;
2358  }
2359 
2360  /*
2361  * After reading into the buffer, check that what we read was valid. We do
2362  * this after reading, because even though the segment was present when we
2363  * opened it, it might get recycled or removed while we read it. The
2364  * read() succeeds in that case, but the data we tried to read might
2365  * already have been overwritten with new WAL records.
2366  */
2367  XLByteToSeg(startptr, segno);
2369 
2370  /*
2371  * During recovery, the currently-open WAL file might be replaced with the
2372  * file of the same name retrieved from archive. So we always need to
2373  * check what we read was valid after reading into the buffer. If it's
2374  * invalid, we try to open and read the file again.
2375  */
2377  {
2378  WalSnd *walsnd = MyWalSnd;
2379  bool reload;
2380 
2381  SpinLockAcquire(&walsnd->mutex);
2382  reload = walsnd->needreload;
2383  walsnd->needreload = false;
2384  SpinLockRelease(&walsnd->mutex);
2385 
2386  if (reload && sendFile >= 0)
2387  {
2388  close(sendFile);
2389  sendFile = -1;
2390 
2391  goto retry;
2392  }
2393  }
2394 }
2395 
2396 /*
2397  * Send out the WAL in its normal physical/stored form.
2398  *
2399  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2400  * but not yet sent to the client, and buffer it in the libpq output
2401  * buffer.
2402  *
2403  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2404  * otherwise WalSndCaughtUp is set to false.
2405  */
2406 static void
2408 {
2409  XLogRecPtr SendRqstPtr;
2410  XLogRecPtr startptr;
2411  XLogRecPtr endptr;
2412  Size nbytes;
2413 
2415  {
2416  WalSndCaughtUp = true;
2417  return;
2418  }
2419 
2420  /* Figure out how far we can safely send the WAL. */
2422  {
2423  /*
2424  * Streaming an old timeline that's in this server's history, but is
2425  * not the one we're currently inserting or replaying. It can be
2426  * streamed up to the point where we switched off that timeline.
2427  */
2428  SendRqstPtr = sendTimeLineValidUpto;
2429  }
2430  else if (am_cascading_walsender)
2431  {
2432  /*
2433  * Streaming the latest timeline on a standby.
2434  *
2435  * Attempt to send all WAL that has already been replayed, so that we
2436  * know it's valid. If we're receiving WAL through streaming
2437  * replication, it's also OK to send any WAL that has been received
2438  * but not replayed.
2439  *
2440  * The timeline we're recovering from can change, or we can be
2441  * promoted. In either case, the current timeline becomes historic. We
2442  * need to detect that so that we don't try to stream past the point
2443  * where we switched to another timeline. We check for promotion or
2444  * timeline switch after calculating FlushPtr, to avoid a race
2445  * condition: if the timeline becomes historic just after we checked
2446  * that it was still current, it's still be OK to stream it up to the
2447  * FlushPtr that was calculated before it became historic.
2448  */
2449  bool becameHistoric = false;
2450 
2451  SendRqstPtr = GetStandbyFlushRecPtr();
2452 
2453  if (!RecoveryInProgress())
2454  {
2455  /*
2456  * We have been promoted. RecoveryInProgress() updated
2457  * ThisTimeLineID to the new current timeline.
2458  */
2459  am_cascading_walsender = false;
2460  becameHistoric = true;
2461  }
2462  else
2463  {
2464  /*
2465  * Still a cascading standby. But is the timeline we're sending
2466  * still the one recovery is recovering from? ThisTimeLineID was
2467  * updated by the GetStandbyFlushRecPtr() call above.
2468  */
2470  becameHistoric = true;
2471  }
2472 
2473  if (becameHistoric)
2474  {
2475  /*
2476  * The timeline we were sending has become historic. Read the
2477  * timeline history file of the new timeline to see where exactly
2478  * we forked off from the timeline we were sending.
2479  */
2480  List *history;
2481 
2484 
2486  list_free_deep(history);
2487 
2488  sendTimeLineIsHistoric = true;
2489 
2490  SendRqstPtr = sendTimeLineValidUpto;
2491  }
2492  }
2493  else
2494  {
2495  /*
2496  * Streaming the current timeline on a master.
2497  *
2498  * Attempt to send all data that's already been written out and
2499  * fsync'd to disk. We cannot go further than what's been written out
2500  * given the current implementation of XLogRead(). And in any case
2501  * it's unsafe to send WAL that is not securely down to disk on the
2502  * master: if the master subsequently crashes and restarts, slaves
2503  * must not have applied any WAL that got lost on the master.
2504  */
2505  SendRqstPtr = GetFlushRecPtr();
2506  }
2507 
2508  /*
2509  * Record the current system time as an approximation of the time at which
2510  * this WAL position was written for the purposes of lag tracking.
2511  *
2512  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2513  * is flushed and we could get that time as well as the LSN when we call
2514  * GetFlushRecPtr() above (and likewise for the cascading standby
2515  * equivalent), but rather than putting any new code into the hot WAL path
2516  * it seems good enough to capture the time here. We should reach this
2517  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2518  * may take some time, we read the WAL flush pointer and take the time
2519  * very close to together here so that we'll get a later position if it
2520  * is still moving.
2521  *
2522  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2523  * this gives us a cheap approximation for the WAL flush time for this
2524  * LSN.
2525  *
2526  * Note that the LSN is not necessarily the LSN for the data contained in
2527  * the present message; it's the end of the WAL, which might be
2528  * further ahead. All the lag tracking machinery cares about is finding
2529  * out when that arbitrary LSN is eventually reported as written, flushed
2530  * and applied, so that it can measure the elapsed time.
2531  */
2532  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2533 
2534  /*
2535  * If this is a historic timeline and we've reached the point where we
2536  * forked to the next timeline, stop streaming.
2537  *
2538  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2539  * startup process will normally replay all WAL that has been received
2540  * from the master, before promoting, but if the WAL streaming is
2541  * terminated at a WAL page boundary, the valid portion of the timeline
2542  * might end in the middle of a WAL record. We might've already sent the
2543  * first half of that partial WAL record to the cascading standby, so that
2544  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2545  * replay the partial WAL record either, so it can still follow our
2546  * timeline switch.
2547  */
2549  {
2550  /* close the current file. */
2551  if (sendFile >= 0)
2552  close(sendFile);
2553  sendFile = -1;
2554 
2555  /* Send CopyDone */
2556  pq_putmessage_noblock('c', NULL, 0);
2557  streamingDoneSending = true;
2558 
2559  WalSndCaughtUp = true;
2560 
2561  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2563  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2564  return;
2565  }
2566 
2567  /* Do we have any work to do? */
2568  Assert(sentPtr <= SendRqstPtr);
2569  if (SendRqstPtr <= sentPtr)
2570  {
2571  WalSndCaughtUp = true;
2572  return;
2573  }
2574 
2575  /*
2576  * Figure out how much to send in one message. If there's no more than
2577  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2578  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2579  *
2580  * The rounding is not only for performance reasons. Walreceiver relies on
2581  * the fact that we never split a WAL record across two messages. Since a
2582  * long WAL record is split at page boundary into continuation records,
2583  * page boundary is always a safe cut-off point. We also assume that
2584  * SendRqstPtr never points to the middle of a WAL record.
2585  */
2586  startptr = sentPtr;
2587  endptr = startptr;
2588  endptr += MAX_SEND_SIZE;
2589 
2590  /* if we went beyond SendRqstPtr, back off */
2591  if (SendRqstPtr <= endptr)
2592  {
2593  endptr = SendRqstPtr;
2595  WalSndCaughtUp = false;
2596  else
2597  WalSndCaughtUp = true;
2598  }
2599  else
2600  {
2601  /* round down to page boundary. */
2602  endptr -= (endptr % XLOG_BLCKSZ);
2603  WalSndCaughtUp = false;
2604  }
2605 
2606  nbytes = endptr - startptr;
2607  Assert(nbytes <= MAX_SEND_SIZE);
2608 
2609  /*
2610  * OK to read and send the slice.
2611  */
2612  resetStringInfo(&output_message);
2613  pq_sendbyte(&output_message, 'w');
2614 
2615  pq_sendint64(&output_message, startptr); /* dataStart */
2616  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2617  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2618 
2619  /*
2620  * Read the log directly into the output buffer to avoid extra memcpy
2621  * calls.
2622  */
2623  enlargeStringInfo(&output_message, nbytes);
2624  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2625  output_message.len += nbytes;
2626  output_message.data[output_message.len] = '\0';
2627 
2628  /*
2629  * Fill the send timestamp last, so that it is taken as late as possible.
2630  */
2631  resetStringInfo(&tmpbuf);
2632  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2633  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2634  tmpbuf.data, sizeof(int64));
2635 
2636  pq_putmessage_noblock('d', output_message.data, output_message.len);
2637 
2638  sentPtr = endptr;
2639 
2640  /* Update shared memory status */
2641  {
2642  WalSnd *walsnd = MyWalSnd;
2643 
2644  SpinLockAcquire(&walsnd->mutex);
2645  walsnd->sentPtr = sentPtr;
2646  SpinLockRelease(&walsnd->mutex);
2647  }
2648 
2649  /* Report progress of XLOG streaming in PS display */
2651  {
2652  char activitymsg[50];
2653 
2654  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2655  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2656  set_ps_display(activitymsg, false);
2657  }
2658 
2659  return;
2660 }
2661 
2662 /*
2663  * Stream out logically decoded data.
2664  */
2665 static void
2667 {
2668  XLogRecord *record;
2669  char *errm;
2670 
2671  /*
2672  * Don't know whether we've caught up yet. We'll set it to true in
2673  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2674  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2675  * i.e. when we're shutting down.
2676  */
2677  WalSndCaughtUp = false;
2678 
2679  record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2681 
2682  /* xlog record was invalid */
2683  if (errm != NULL)
2684  elog(ERROR, "%s", errm);
2685 
2686  if (record != NULL)
2687  {
2688  /*
2689  * Note the lack of any call to LagTrackerWrite() which is the responsibility
2690  * of the logical decoding plugin. Response messages are handled normally,
2691  * so this responsibility does not extend to needing to call LagTrackerRead().
2692  */
2693  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2694 
2695  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2696  }
2697  else
2698  {
2699  /*
2700  * If the record we just wanted read is at or beyond the flushed
2701  * point, then we're caught up.
2702  */
2703  if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
2704  WalSndCaughtUp = true;
2705  }
2706 
2707  /* Update shared memory status */
2708  {
2709  WalSnd *walsnd = MyWalSnd;
2710 
2711  SpinLockAcquire(&walsnd->mutex);
2712  walsnd->sentPtr = sentPtr;
2713  SpinLockRelease(&walsnd->mutex);
2714  }
2715 }
2716 
2717 /*
2718  * Shutdown if the sender is caught up.
2719  *
2720  * NB: This should only be called when the shutdown signal has been received
2721  * from postmaster.
2722  *
2723  * Note that if we determine that there's still more data to send, this
2724  * function will return control to the caller.
2725  */
2726 static void
2728 {
2729  XLogRecPtr replicatedPtr;
2730 
2731  /* ... let's just be real sure we're caught up ... */
2732  send_data();
2733 
2734  /*
2735  * To figure out whether all WAL has successfully been replicated, check
2736  * flush location if valid, write otherwise. Tools like pg_receivewal
2737  * will usually (unless in synchronous mode) return an invalid flush
2738  * location.
2739  */
2740  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2741  MyWalSnd->write : MyWalSnd->flush;
2742 
2743  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2744  !pq_is_send_pending())
2745  {
2746  /* Inform the standby that XLOG streaming is done */
2747  EndCommand("COPY 0", DestRemote);
2748  pq_flush();
2749 
2750  proc_exit(0);
2751  }
2753  {
2754  WalSndKeepalive(true);
2756  }
2757 }
2758 
2759 /*
2760  * Returns the latest point in WAL that has been safely flushed to disk, and
2761  * can be sent to the standby. This should only be called when in recovery,
2762  * ie. we're streaming to a cascaded standby.
2763  *
2764  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2765  * replayed WAL record.
2766  */
2767 static XLogRecPtr
2769 {
2770  XLogRecPtr replayPtr;
2771  TimeLineID replayTLI;
2772  XLogRecPtr receivePtr;
2775 
2776  /*
2777  * We can safely send what's already been replayed. Also, if walreceiver
2778  * is streaming WAL from the same timeline, we can send anything that it
2779  * has streamed, but hasn't been replayed yet.
2780  */
2781 
2782  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2783  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2784 
2785  ThisTimeLineID = replayTLI;
2786 
2787  result = replayPtr;
2788  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2789  result = receivePtr;
2790 
2791  return result;
2792 }
2793 
2794 /*
2795  * Request walsenders to reload the currently-open WAL file
2796  */
2797 void
2799 {
2800  int i;
2801 
2802  for (i = 0; i < max_wal_senders; i++)
2803  {
2804  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2805 
2806  if (walsnd->pid == 0)
2807  continue;
2808 
2809  SpinLockAcquire(&walsnd->mutex);
2810  walsnd->needreload = true;
2811  SpinLockRelease(&walsnd->mutex);
2812  }
2813 }
2814 
2815 /* SIGHUP: set flag to re-read config file at next convenient time */
2816 static void
2818 {
2819  int save_errno = errno;
2820 
2821  got_SIGHUP = true;
2822 
2823  SetLatch(MyLatch);
2824 
2825  errno = save_errno;
2826 }
2827 
2828 /* SIGUSR1: set flag to send WAL records */
2829 static void
2831 {
2832  int save_errno = errno;
2833 
2835 
2836  errno = save_errno;
2837 }
2838 
2839 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
2840 static void
2842 {
2843  int save_errno = errno;
2844 
2845  /*
2846  * If replication has not yet started, die like with SIGTERM. If
2847  * replication is active, only set a flag and wake up the main loop. It
2848  * will send any outstanding WAL, wait for it to be replicated to the
2849  * standby, and then exit gracefully.
2850  */
2851  if (!replication_active)
2852  kill(MyProcPid, SIGTERM);
2853 
2854  walsender_ready_to_stop = true;
2855  SetLatch(MyLatch);
2856 
2857  errno = save_errno;
2858 }
2859 
2860 /* Set up signal handlers */
2861 void
2863 {
2864  /* Set up signal handlers */
2865  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2866  * file */
2867  pqsignal(SIGINT, SIG_IGN); /* not used */
2868  pqsignal(SIGTERM, die); /* request shutdown */
2869  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2870  InitializeTimeouts(); /* establishes SIGALRM handler */
2872  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2873  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2874  * shutdown */
2875 
2876  /* Reset some signals that are accepted by postmaster but not here */
2882 }
2883 
2884 /* Report shared-memory space needed by WalSndShmemInit */
2885 Size
2887 {
2888  Size size = 0;
2889 
2890  size = offsetof(WalSndCtlData, walsnds);
2891  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2892 
2893  return size;
2894 }
2895 
2896 /* Allocate and initialize walsender-related shared memory */
2897 void
2899 {
2900  bool found;
2901  int i;
2902 
2903  WalSndCtl = (WalSndCtlData *)
2904  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2905 
2906  if (!found)
2907  {
2908  /* First time through, so initialize */
2909  MemSet(WalSndCtl, 0, WalSndShmemSize());
2910 
2911  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2912  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
2913 
2914  for (i = 0; i < max_wal_senders; i++)
2915  {
2916  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2917 
2918  SpinLockInit(&walsnd->mutex);
2919  }
2920  }
2921 }
2922 
2923 /*
2924  * Wake up all walsenders
2925  *
2926  * This will be called inside critical sections, so throwing an error is not
2927  * advisable.
2928  */
2929 void
2931 {
2932  int i;
2933 
2934  for (i = 0; i < max_wal_senders; i++)
2935  {
2936  Latch *latch;
2937  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2938 
2939  /*
2940  * Get latch pointer with spinlock held, for the unlikely case that
2941  * pointer reads aren't atomic (as they're 8 bytes).
2942  */
2943  SpinLockAcquire(&walsnd->mutex);
2944  latch = walsnd->latch;
2945  SpinLockRelease(&walsnd->mutex);
2946 
2947  if (latch != NULL)
2948  SetLatch(latch);
2949  }
2950 }
2951 
2952 /* Set state for current walsender (only called in walsender) */
2953 void
2955 {
2956  WalSnd *walsnd = MyWalSnd;
2957 
2959 
2960  if (walsnd->state == state)
2961  return;
2962 
2963  SpinLockAcquire(&walsnd->mutex);
2964  walsnd->state = state;
2965  SpinLockRelease(&walsnd->mutex);
2966 }
2967 
2968 /*
2969  * Return a string constant representing the state. This is used
2970  * in system views, and should *not* be translated.
2971  */
2972 static const char *
2974 {
2975  switch (state)
2976  {
2977  case WALSNDSTATE_STARTUP:
2978  return "startup";
2979  case WALSNDSTATE_BACKUP:
2980  return "backup";
2981  case WALSNDSTATE_CATCHUP:
2982  return "catchup";
2983  case WALSNDSTATE_STREAMING:
2984  return "streaming";
2985  }
2986  return "UNKNOWN";
2987 }
2988 
2989 static Interval *
2991 {
2992  Interval *result = palloc(sizeof(Interval));
2993 
2994  result->month = 0;
2995  result->day = 0;
2996  result->time = offset;
2997 
2998  return result;
2999 }
3000 
3001 /*
3002  * Returns activity of walsenders, including pids and xlog locations sent to
3003  * standby servers.
3004  */
3005 Datum
3007 {
3008 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3009  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3010  TupleDesc tupdesc;
3011  Tuplestorestate *tupstore;
3012  MemoryContext per_query_ctx;
3013  MemoryContext oldcontext;
3014  List *sync_standbys;
3015  int i;
3016 
3017  /* check to see if caller supports us returning a tuplestore */
3018  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3019  ereport(ERROR,
3020  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3021  errmsg("set-valued function called in context that cannot accept a set")));
3022  if (!(rsinfo->allowedModes & SFRM_Materialize))
3023  ereport(ERROR,
3024  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3025  errmsg("materialize mode required, but it is not " \
3026  "allowed in this context")));
3027 
3028  /* Build a tuple descriptor for our result type */
3029  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3030  elog(ERROR, "return type must be a row type");
3031 
3032  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3033  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3034 
3035  tupstore = tuplestore_begin_heap(true, false, work_mem);
3036  rsinfo->returnMode = SFRM_Materialize;
3037  rsinfo->setResult = tupstore;
3038  rsinfo->setDesc = tupdesc;
3039 
3040  MemoryContextSwitchTo(oldcontext);
3041 
3042  /*
3043  * Get the currently active synchronous standbys.
3044  */
3045  LWLockAcquire(SyncRepLock, LW_SHARED);
3046  sync_standbys = SyncRepGetSyncStandbys(NULL);
3047  LWLockRelease(SyncRepLock);
3048 
3049  for (i = 0; i < max_wal_senders; i++)
3050  {
3051  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3053  XLogRecPtr write;
3054  XLogRecPtr flush;
3055  XLogRecPtr apply;
3056  TimeOffset writeLag;
3057  TimeOffset flushLag;
3058  TimeOffset applyLag;
3059  int priority;
3062  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3063 
3064  if (walsnd->pid == 0)
3065  continue;
3066 
3067  SpinLockAcquire(&walsnd->mutex);
3068  sentPtr = walsnd->sentPtr;
3069  state = walsnd->state;
3070  write = walsnd->write;
3071  flush = walsnd->flush;
3072  apply = walsnd->apply;
3073  writeLag = walsnd->writeLag;
3074  flushLag = walsnd->flushLag;
3075  applyLag = walsnd->applyLag;
3076  priority = walsnd->sync_standby_priority;
3077  SpinLockRelease(&walsnd->mutex);
3078 
3079  memset(nulls, 0, sizeof(nulls));
3080  values[0] = Int32GetDatum(walsnd->pid);
3081 
3082  if (!superuser())
3083  {
3084  /*
3085  * Only superusers can see details. Other users only get the pid
3086  * value to know it's a walsender, but no details.
3087  */
3088  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3089  }
3090  else
3091  {
3092  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3093 
3094  if (XLogRecPtrIsInvalid(sentPtr))
3095  nulls[2] = true;
3096  values[2] = LSNGetDatum(sentPtr);
3097 
3098  if (XLogRecPtrIsInvalid(write))
3099  nulls[3] = true;
3100  values[3] = LSNGetDatum(write);
3101 
3102  if (XLogRecPtrIsInvalid(flush))
3103  nulls[4] = true;
3104  values[4] = LSNGetDatum(flush);
3105 
3106  if (XLogRecPtrIsInvalid(apply))
3107  nulls[5] = true;
3108  values[5] = LSNGetDatum(apply);
3109 
3110  /*
3111  * Treat a standby such as a pg_basebackup background process
3112  * which always returns an invalid flush location, as an
3113  * asynchronous standby.
3114  */
3115  priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
3116 
3117  if (writeLag < 0)
3118  nulls[6] = true;
3119  else
3120  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3121 
3122  if (flushLag < 0)
3123  nulls[7] = true;
3124  else
3125  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3126 
3127  if (applyLag < 0)
3128  nulls[8] = true;
3129  else
3130  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3131 
3132  values[9] = Int32GetDatum(priority);
3133 
3134  /*
3135  * More easily understood version of standby state. This is purely
3136  * informational.
3137  *
3138  * In quorum-based sync replication, the role of each standby
3139  * listed in synchronous_standby_names can be changing very
3140  * frequently. Any standbys considered as "sync" at one moment can
3141  * be switched to "potential" ones at the next moment. So, it's
3142  * basically useless to report "sync" or "potential" as their sync
3143  * states. We report just "quorum" for them.
3144  */
3145  if (priority == 0)
3146  values[10] = CStringGetTextDatum("async");
3147  else if (list_member_int(sync_standbys, i))
3149  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3150  else
3151  values[10] = CStringGetTextDatum("potential");
3152  }
3153 
3154  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3155  }
3156 
3157  /* clean up and return the tuplestore */
3158  tuplestore_donestoring(tupstore);
3159 
3160  return (Datum) 0;
3161 }
3162 
3163 /*
3164  * This function is used to send a keepalive message to standby.
3165  * If requestReply is set, sets a flag in the message requesting the standby
3166  * to send a message back to us, for heartbeat purposes.
3167  */
3168 static void
3169 WalSndKeepalive(bool requestReply)
3170 {
3171  elog(DEBUG2, "sending replication keepalive");
3172 
3173  /* construct the message... */
3174  resetStringInfo(&output_message);
3175  pq_sendbyte(&output_message, 'k');
3176  pq_sendint64(&output_message, sentPtr);
3177  pq_sendint64(&output_message, GetCurrentTimestamp());
3178  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3179 
3180  /* ... and send it wrapped in CopyData */
3181  pq_putmessage_noblock('d', output_message.data, output_message.len);
3182 }
3183 
3184 /*
3185  * Send keepalive message if too much time has elapsed.
3186  */
3187 static void
3189 {
3190  TimestampTz ping_time;
3191 
3192  /*
3193  * Don't send keepalive messages if timeouts are globally disabled or
3194  * we're doing something not partaking in timeouts.
3195  */
3196  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3197  return;
3198 
3200  return;
3201 
3202  /*
3203  * If half of wal_sender_timeout has lapsed without receiving any reply
3204  * from the standby, send a keep-alive message to the standby requesting
3205  * an immediate reply.
3206  */
3208  wal_sender_timeout / 2);
3209  if (now >= ping_time)
3210  {
3211  WalSndKeepalive(true);
3213 
3214  /* Try to flush pending output to the client */
3215  if (pq_flush_if_writable() != 0)
3216  WalSndShutdown();
3217  }
3218 }
3219 
3220 /*
3221  * Record the end of the WAL and the time it was flushed locally, so that
3222  * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
3223  * eventually reported to have been written, flushed and applied by the
3224  * standby in a reply message.
3225  * Exported to allow logical decoding plugins to call this when they choose.
3226  */
3227 void
3229 {
3230  bool buffer_full;
3231  int new_write_head;
3232  int i;
3233 
3234  if (!am_walsender)
3235  return;
3236 
3237  /*
3238  * If the lsn hasn't advanced since last time, then do nothing. This way
3239  * we only record a new sample when new WAL has been written.
3240  */
3241  if (LagTracker.last_lsn == lsn)
3242  return;
3243  LagTracker.last_lsn = lsn;
3244 
3245  /*
3246  * If advancing the write head of the circular buffer would crash into any
3247  * of the read heads, then the buffer is full. In other words, the
3248  * slowest reader (presumably apply) is the one that controls the release
3249  * of space.
3250  */
3251  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3252  buffer_full = false;
3253  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3254  {
3255  if (new_write_head == LagTracker.read_heads[i])
3256  buffer_full = true;
3257  }
3258 
3259  /*
3260  * If the buffer is full, for now we just rewind by one slot and overwrite
3261  * the last sample, as a simple (if somewhat uneven) way to lower the
3262  * sampling rate. There may be better adaptive compaction algorithms.
3263  */
3264  if (buffer_full)
3265  {
3266  new_write_head = LagTracker.write_head;
3267  if (LagTracker.write_head > 0)
3268  LagTracker.write_head--;
3269  else
3270  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3271  }
3272 
3273  /* Store a sample at the current write head position. */
3274  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3275  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3276  LagTracker.write_head = new_write_head;
3277 }
3278 
3279 /*
3280  * Find out how much time has elapsed between the moment WAL position 'lsn'
3281  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3282  * We have a separate read head for each of the reported LSN locations we
3283  * receive in replies from standby; 'head' controls which read head is
3284  * used. Whenever a read head crosses an LSN which was written into the
3285  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3286  * find out the time this LSN (or an earlier one) was flushed locally, and
3287  * therefore compute the lag.
3288  *
3289  * Return -1 if no new sample data is available, and otherwise the elapsed
3290  * time in microseconds.
3291  */
3292 static TimeOffset
3294 {
3295  TimestampTz time = 0;
3296 
3297  /* Read all unread samples up to this LSN or end of buffer. */
3298  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3299  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3300  {
3301  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3302  LagTracker.last_read[head] =
3303  LagTracker.buffer[LagTracker.read_heads[head]];
3304  LagTracker.read_heads[head] =
3305  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3306  }
3307 
3308  if (time > now)
3309  {
3310  /* If the clock somehow went backwards, treat as not found. */
3311  return -1;
3312  }
3313  else if (time == 0)
3314  {
3315  /*
3316  * We didn't cross a time. If there is a future sample that we
3317  * haven't reached yet, and we've already reached at least one sample,
3318  * let's interpolate the local flushed time. This is mainly useful for
3319  * reporting a completely stuck apply position as having increasing
3320  * lag, since otherwise we'd have to wait for it to eventually start
3321  * moving again and cross one of our samples before we can show the
3322  * lag increasing.
3323  */
3324  if (LagTracker.read_heads[head] != LagTracker.write_head &&
3325  LagTracker.last_read[head].time != 0)
3326  {
3327  double fraction;
3328  WalTimeSample prev = LagTracker.last_read[head];
3329  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3330 
3331  if (lsn < prev.lsn)
3332  {
3333  /*
3334  * Reported LSNs shouldn't normally go backwards, but it's
3335  * possible when there is a timeline change. Treat as not
3336  * found.
3337  */
3338  return -1;
3339  }
3340 
3341  Assert(prev.lsn < next.lsn);
3342 
3343  if (prev.time > next.time)
3344  {
3345  /* If the clock somehow went backwards, treat as not found. */
3346  return -1;
3347  }
3348 
3349  /* See how far we are between the previous and next samples. */
3350  fraction =
3351  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3352 
3353  /* Scale the local flush time proportionally. */
3354  time = (TimestampTz)
3355  ((double) prev.time + (next.time - prev.time) * fraction);
3356  }
3357  else
3358  {
3359  /* Couldn't interpolate due to lack of data. */
3360  return -1;
3361  }
3362  }
3363 
3364  /* Return the elapsed time since local flush time in microseconds. */
3365  Assert(time != 0);
3366  return now - time;
3367 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2830
static void InitWalSenderSlot(void)
Definition: walsender.c:2143
void InitializeTimeouts(void)
Definition: timeout.c:340
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:69
#define XLogSegSize
Definition: xlog_internal.h:92
XLogRecPtr write
static void ProcessStandbyMessage(void)
Definition: walsender.c:1607
#define SIGUSR1
Definition: win32.h:202
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
XLogRecPtr startpoint
Definition: replnodes.h:84
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:38
int wal_sender_timeout
Definition: walsender.c:115
#define SIGCONT
Definition: win32.h:197
uint32 TimeLineID
Definition: xlogdefs.h:45
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:210
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
struct Port * MyProcPort
Definition: globals.c:40
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
uint32 TransactionId
Definition: c.h:397
bool wake_wal_senders
Definition: walsender.c:122
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1364
bool update_process_title
Definition: ps_status.c:35
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
TransactionId xmin
Definition: proc.h:213
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2841
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2958
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:419
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:779
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1020
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
#define SIGWINCH
Definition: win32.h:201
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
Size WalSndShmemSize(void)
Definition: walsender.c:2886
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
void CommitTransactionCommand(void)
Definition: xact.c:2747
#define XACT_REPEATABLE_READ
Definition: xact.h:30
#define SpinLockInit(lock)
Definition: spin.h:60
void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3228
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2148
uint8 syncrep_method
Definition: syncrep.h:51
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static int sendFile
Definition: walsender.c:128
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:370
static void XLogSendPhysical(void)
Definition: walsender.c:2407
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
#define SIGTTIN
Definition: win32.h:199
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:220
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
#define INT4OID
Definition: pg_type.h:316
Definition: nodes.h:509
static StringInfoData output_message
Definition: walsender.c:153
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
struct cursor * cur
Definition: ecpg.c:28
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:857
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2204
return result
Definition: formatting.c:1618
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2727
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:563
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8223
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:677
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:210
TimestampTz time
Definition: walsender.c:197
int sleeptime
Definition: pg_standby.c:40
ReplicationSlotPersistentData data
Definition: slot.h:115
TimeOffset writeLag
void ResetLatch(volatile Latch *latch)
Definition: latch.c:450
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7874
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
XLogRecPtr last_lsn
Definition: walsender.c:206
void list_free_deep(List *list)
Definition: list.c:1147
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:118
#define PG_BINARY
Definition: c.h:1038
#define SIGQUIT
Definition: win32.h:189
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7898
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
Latch procLatch
Definition: proc.h:103
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:799
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr confirmed_flush
Definition: slot.h:76
PGXACT * MyPgXact
Definition: proc.c:68
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
int32 day
Definition: timestamp.h:47
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1811
TimeLineID timeline
Definition: replnodes.h:96
ReplicationKind kind
Definition: replnodes.h:56
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:744
bool IsTransactionBlock(void)
Definition: xact.c:4304
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2817
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:587
bool am_walsender
Definition: walsender.c:108
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotReserveWal(void)
Definition: slot.c:909
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:414
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:665
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8292
ReplicationKind kind
Definition: replnodes.h:81
void WalSndRqstFileReload(void)
Definition: walsender.c:2798
#define SIG_IGN
Definition: win32.h:185
void ConditionVariableCancelSleep(void)
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:179
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3169
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
void pq_startmsgread(void)
Definition: pqcomm.c:1191
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1638
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3766
bool FirstSnapshotSet
Definition: snapmgr.c:203
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void WalSndSignals(void)
Definition: walsender.c:2862
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:686
Latch * latch
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11077
void ReplicationSlotPersist(void)
Definition: slot.c:598
TransactionId effective_xmin
Definition: slot.h:111
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1021
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:190
static TimeLineID curFileTimeLine
Definition: walsender.c:133
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define XLogFilePath(path, tli, logSegNo)
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2056
#define DEBUG2
Definition: elog.h:24
bool list_member_int(const List *list, int datum)
Definition: list.c:485
Definition: latch.h:110
Definition: dest.h:88
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:327
WalSndState state
NodeTag type
Definition: nodes.h:511
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10120
static char * buf
Definition: pg_test_fsync.c:66
static bool streamingDoneSending
Definition: walsender.c:172
uint64 XLogSegNo
Definition: xlogdefs.h:34
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:598
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:515
TransactionId catalog_xmin
Definition: slot.h:65
#define pq_flush_if_writable()
Definition: libpq.h:40
TimeOffset time
Definition: timestamp.h:45
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:174
#define InvalidTransactionId
Definition: transam.h:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:277
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1145
static XLogRecPtr logical_startptr
Definition: walsender.c:191
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:375
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2232
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TransactionId xmin
Definition: slot.h:57
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
#define SlotIsLogical(slot)
Definition: slot.h:134
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1791
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:176
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
XLogRecPtr lsn
Definition: walsender.c:196
int replication_yyparse(void)
Definition: guc.h:72
int32 month
Definition: timestamp.h:48
int max_wal_senders
Definition: walsender.c:114
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2007
int CloseTransientFile(int fd)
Definition: fd.c:2305
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1118
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1253
static StringInfoData reply_message
Definition: walsender.c:154
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:179
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1233
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3293
void WalSndErrorCleanup(void)
Definition: walsender.c:284
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:219
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
uintptr_t Datum
Definition: postgres.h:372
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:2973
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
bool am_db_walsender
Definition: walsender.c:111
TransactionId effective_catalog_xmin
Definition: slot.h:112
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1031
Oid MyDatabaseId
Definition: globals.c:76
static void WalSndShutdown(void)
Definition: walsender.c:223
static TimeLineID receiveTLI
Definition: xlog.c:201
int work_mem
Definition: globals.c:112
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:105
static XLogRecPtr sentPtr
Definition: walsender.c:150
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
void ReplicationSlotDrop(const char *name)
Definition: slot.c:440
void pq_endmsgread(void)
Definition: pqcomm.c:1215
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:179
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:209
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define SIG_DFL
Definition: win32.h:183
int allowedModes
Definition: execnodes.h:268
TimeLineID currTLI
Definition: xlogreader.h:165
#define INT8OID
Definition: pg_type.h:304
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3188
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:40
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1938
void SetLatch(volatile Latch *latch)
Definition: latch.c:367
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:229
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1980
#define lfirst(lc)
Definition: pg_list.h:106
void WalSndSetState(WalSndState state)
Definition: walsender.c:2954
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:308
int sync_standby_priority
Definition: regguts.h:298
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:457
static void XLogSendLogical(void)
Definition: walsender.c:2666
XLogRecPtr restart_lsn
Definition: slot.h:68
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
void StartTransactionCommand(void)
Definition: xact.c:2677
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:207
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1670
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
char * dbname
Definition: streamutil.c:38
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:201
int XactIsoLevel
Definition: xact.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:884
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
WalSndState
static XLogSegNo sendSegNo
Definition: walsender.c:129
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3006
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:832
static bool streamingDoneReceiving
Definition: walsender.c:173
#define BYTEAOID
Definition: pg_type.h:292
Tuplestorestate * setResult
Definition: execnodes.h:273
#define pg_attribute_noreturn()
Definition: c.h:653
static StringInfoData tmpbuf
Definition: walsender.c:155
#define SIGTTOU
Definition: win32.h:200
bool IsSubTransaction(void)
Definition: xact.c:4376
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:509
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:266
bool log_replication_commands
Definition: walsender.c:117
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4691
#define Int32GetDatum(X)
Definition: postgres.h:485
char * application_name
Definition: guc.c:473
TupleDesc setDesc
Definition: execnodes.h:274
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:115
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogReaderState * reader
Definition: logical.h:38
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
void die(SIGNAL_ARGS)
Definition: postgres.c:2623
CRSSnapshotAction
Definition: walsender.h:22
StringInfo out
Definition: logical.h:59
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
int i
void WalSndShmemInit(void)
Definition: walsender.c:2898
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:211
#define NameStr(name)
Definition: c.h:499
TimeLineID timeline
Definition: replnodes.h:83
static struct @27 LagTracker
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1762
struct Latch * MyLatch
Definition: globals.c:51
void ReplicationSlotCleanup(void)
Definition: slot.c:413
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
char * defname
Definition: parsenodes.h:719
static uint32 sendOff
Definition: walsender.c:130
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2768
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
#define elog
Definition: elog.h:219
void LWLockReleaseAll(void)
Definition: lwlock.c:1814
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:2990
slock_t mutex
Definition: slot.h:88
#define close(a)
Definition: win32.h:12
void latch_sigusr1_handler(void)
Definition: latch.c:1404
CommandDest whereToSendOutput
Definition: postgres.c:86
#define SIGCHLD
Definition: win32.h:198
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
#define XLByteInSeg(xlrp, logSegNo)
static bool waiting_for_ping_response
Definition: walsender.c:164
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:400
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:330
Definition: pg_list.h:45
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2562
#define UINT64_FORMAT
Definition: c.h:316
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:617
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:409
static TimestampTz last_reply_timestamp
Definition: walsender.c:161
int write_head
Definition: walsender.c:208
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1505
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
#define MAX_SEND_SIZE
Definition: walsender.c:99
#define read(a, b, c)
Definition: win32.h:13
#define SIGUSR2
Definition: win32.h:203
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
#define offsetof(type, field)
Definition: c.h:555
void WalSndWakeup(void)
Definition: walsender.c:2930
void ReplicationSlotMarkDirty(void)
Definition: slot.c:581
bool am_cascading_walsender
Definition: walsender.c:109
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1839
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:648
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3154
int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:936
void replication_scanner_init(const char *query_string)
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define TLHistoryFilePath(path, tli)