PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, postmaster sends us SIGUSR2 after all regular
28  * backends have exited. This causes the walsender to switch to the "stopping"
29  * state. In this state, the walsender will reject any replication command
30  * that may generate WAL activity. The checkpointer begins the shutdown
31  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
32  * checkpoint finishes, the postmaster sends us SIGINT. This instructs
33  * walsender to send any outstanding WAL, including the shutdown checkpoint
34  * record, wait for it to be replicated to the standby, and then exit.
35  *
36  *
37  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
38  *
39  * IDENTIFICATION
40  * src/backend/replication/walsender.c
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45 
46 #include <signal.h>
47 #include <unistd.h>
48 
49 #include "access/printtup.h"
50 #include "access/timeline.h"
51 #include "access/transam.h"
52 #include "access/xact.h"
53 #include "access/xlog_internal.h"
54 #include "access/xlogutils.h"
55 
56 #include "catalog/pg_type.h"
57 #include "commands/dbcommands.h"
58 #include "commands/defrem.h"
59 #include "funcapi.h"
60 #include "libpq/libpq.h"
61 #include "libpq/pqformat.h"
62 #include "miscadmin.h"
63 #include "nodes/replnodes.h"
64 #include "pgstat.h"
65 #include "replication/basebackup.h"
66 #include "replication/decode.h"
67 #include "replication/logical.h"
69 #include "replication/slot.h"
70 #include "replication/snapbuild.h"
71 #include "replication/syncrep.h"
73 #include "replication/walsender.h"
76 #include "storage/fd.h"
77 #include "storage/ipc.h"
78 #include "storage/pmsignal.h"
79 #include "storage/proc.h"
80 #include "storage/procarray.h"
81 #include "tcop/dest.h"
82 #include "tcop/tcopprot.h"
83 #include "utils/builtins.h"
84 #include "utils/guc.h"
85 #include "utils/memutils.h"
86 #include "utils/pg_lsn.h"
87 #include "utils/portal.h"
88 #include "utils/ps_status.h"
89 #include "utils/resowner.h"
90 #include "utils/timeout.h"
91 #include "utils/timestamp.h"
92 
93 /*
94  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
95  *
96  * We don't have a good idea of what a good value would be; there's some
97  * overhead per message in both walsender and walreceiver, but on the other
98  * hand sending large batches makes walsender less responsive to signals
99  * because signals are checked only between messages. 128kB (with
100  * default 8k blocks) seems like a reasonable guess for now.
101  */
102 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
103 
104 /* Array of WalSnds in shared memory */
106 
107 /* My slot in the shared memory array */
109 
110 /* Global state */
111 bool am_walsender = false; /* Am I a walsender process? */
112 bool am_cascading_walsender = false; /* Am I cascading WAL to
113  * another standby? */
114 bool am_db_walsender = false; /* Connected to a database? */
115 
116 /* User-settable parameters for walsender */
117 int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
118 int wal_sender_timeout = 60 * 1000; /* maximum time to send one
119  * WAL data message */
121 
122 /*
123  * State for WalSndWakeupRequest
124  */
125 bool wake_wal_senders = false;
126 
127 /*
128  * These variables are used similarly to openLogFile/SegNo/Off,
129  * but for walsender to read the XLOG.
130  */
131 static int sendFile = -1;
132 static XLogSegNo sendSegNo = 0;
133 static uint32 sendOff = 0;
134 
135 /* Timeline ID of the currently open file */
137 
138 /*
139  * These variables keep track of the state of the timeline we're currently
140  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
141  * the timeline is not the latest timeline on this server, and the server's
142  * history forked off from that timeline at sendTimeLineValidUpto.
143  */
146 static bool sendTimeLineIsHistoric = false;
148 
149 /*
150  * How far have we sent WAL already? This is also advertised in
151  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
152  */
153 static XLogRecPtr sentPtr = 0;
154 
155 /* Buffers for constructing outgoing messages and processing reply messages. */
159 
160 /*
161  * Timestamp of the last receipt of the reply from the standby. Set to 0 if
162  * wal_sender_timeout doesn't need to be active.
163  */
165 
166 /* Have we sent a heartbeat message asking for reply, since last reply? */
167 static bool waiting_for_ping_response = false;
168 
169 /*
170  * While streaming WAL in Copy mode, streamingDoneSending is set to true
171  * after we have sent CopyDone. We should not send any more CopyData messages
172  * after that. streamingDoneReceiving is set to true when we receive CopyDone
173  * from the other end. When both become true, it's time to exit Copy mode.
174  */
177 
178 /* Are we there yet? */
179 static bool WalSndCaughtUp = false;
180 
181 /* Flags set by signal handlers for later service in main loop */
182 static volatile sig_atomic_t got_SIGHUP = false;
183 static volatile sig_atomic_t got_SIGINT = false;
184 static volatile sig_atomic_t got_SIGUSR2 = false;
185 
186 /*
187  * This is set while we are streaming. When not set, SIGINT signal will be
188  * handled like SIGTERM. When set, the main loop is responsible for checking
189  * got_SIGINT and terminating when it's set (after streaming any remaining
190  * WAL).
191  */
192 static volatile sig_atomic_t replication_active = false;
193 
196 
197 /* A sample associating a WAL location with the time it was written. */
198 typedef struct
199 {
202 } WalTimeSample;
203 
204 /* The size of our buffer of time samples. */
205 #define LAG_TRACKER_BUFFER_SIZE 8192
206 
207 /* A mechanism for tracking replication lag. */
208 static struct
209 {
215 } LagTracker;
216 
217 /* Signal handlers */
218 static void WalSndSigHupHandler(SIGNAL_ARGS);
220 static void WalSndSwitchStopping(SIGNAL_ARGS);
222 
223 /* Prototypes for private functions */
224 typedef void (*WalSndSendDataCallback) (void);
225 static void WalSndLoop(WalSndSendDataCallback send_data);
226 static void InitWalSenderSlot(void);
227 static void WalSndKill(int code, Datum arg);
229 static void XLogSendPhysical(void);
230 static void XLogSendLogical(void);
231 static void WalSndDone(WalSndSendDataCallback send_data);
232 static XLogRecPtr GetStandbyFlushRecPtr(void);
233 static void IdentifySystem(void);
236 static void StartReplication(StartReplicationCmd *cmd);
238 static void ProcessStandbyMessage(void);
239 static void ProcessStandbyReplyMessage(void);
240 static void ProcessStandbyHSFeedbackMessage(void);
241 static void ProcessRepliesIfAny(void);
242 static void WalSndKeepalive(bool requestReply);
244 static void WalSndCheckTimeOut(TimestampTz now);
245 static long WalSndComputeSleeptime(TimestampTz now);
246 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
247 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
250 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
251 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
253 
254 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
255 
256 
257 /* Initialize walsender process before entering the main command loop */
258 void
259 InitWalSender(void)
260 {
262 
263  /* Create a per-walsender data structure in shared memory */
265 
266  /* Set up resource owner */
267  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
268 
269  /*
270  * Let postmaster know that we're a WAL sender. Once we've declared us as
271  * a WAL sender process, postmaster will let us outlive the bgwriter and
272  * kill us last in the shutdown sequence, so we get a chance to stream all
273  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
274  * there's no going back, and we mustn't write any WAL records after this.
275  */
278 
279  /* Initialize empty timestamp buffer for lag tracking. */
280  memset(&LagTracker, 0, sizeof(LagTracker));
281 }
282 
283 /*
284  * Clean up after an error.
285  *
286  * WAL sender processes don't use transactions like regular backends do.
287  * This function does any cleanup required after an error in a WAL sender
288  * process, similar to what transaction abort does in a regular backend.
289  */
290 void
292 {
296 
297  if (sendFile >= 0)
298  {
299  close(sendFile);
300  sendFile = -1;
301  }
302 
303  if (MyReplicationSlot != NULL)
305 
307 
308  replication_active = false;
309  if (got_SIGINT)
310  proc_exit(0);
311 
312  /* Revert back to startup state */
314 
315  if (got_SIGUSR2)
317 }
318 
319 /*
320  * Handle a client's connection abort in an orderly manner.
321  */
322 static void
323 WalSndShutdown(void)
324 {
325  /*
326  * Reset whereToSendOutput to prevent ereport from attempting to send any
327  * more messages to the standby.
328  */
331 
332  proc_exit(0);
333  abort(); /* keep the compiler quiet */
334 }
335 
336 /*
337  * Handle the IDENTIFY_SYSTEM command.
338  */
339 static void
341 {
342  char sysid[32];
343  char xloc[MAXFNAMELEN];
344  XLogRecPtr logptr;
345  char *dbname = NULL;
346  DestReceiver *dest;
347  TupOutputState *tstate;
348  TupleDesc tupdesc;
349  Datum values[4];
350  bool nulls[4];
351 
352  /*
353  * Reply with a result set with one row, four columns. First col is system
354  * ID, second is timeline ID, third is current xlog location and the
355  * fourth contains the database name if we are connected to one.
356  */
357 
358  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
360 
363  {
364  /* this also updates ThisTimeLineID */
365  logptr = GetStandbyFlushRecPtr();
366  }
367  else
368  logptr = GetFlushRecPtr();
369 
370  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
371 
372  if (MyDatabaseId != InvalidOid)
373  {
375 
376  /* syscache access needs a transaction env. */
378  /* make dbname live outside TX context */
382  /* CommitTransactionCommand switches to TopMemoryContext */
384  }
385 
387  MemSet(nulls, false, sizeof(nulls));
388 
389  /* need a tuple descriptor representing four columns */
390  tupdesc = CreateTemplateTupleDesc(4, false);
391  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
392  TEXTOID, -1, 0);
393  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
394  INT4OID, -1, 0);
395  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
396  TEXTOID, -1, 0);
397  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
398  TEXTOID, -1, 0);
399 
400  /* prepare for projection of tuples */
401  tstate = begin_tup_output_tupdesc(dest, tupdesc);
402 
403  /* column 1: system identifier */
404  values[0] = CStringGetTextDatum(sysid);
405 
406  /* column 2: timeline */
407  values[1] = Int32GetDatum(ThisTimeLineID);
408 
409  /* column 3: wal location */
410  values[2] = CStringGetTextDatum(xloc);
411 
412  /* column 4: database name, or NULL if none */
413  if (dbname)
414  values[3] = CStringGetTextDatum(dbname);
415  else
416  nulls[3] = true;
417 
418  /* send it to dest */
419  do_tup_output(tstate, values, nulls);
420 
421  end_tup_output(tstate);
422 }
423 
424 
425 /*
426  * Handle TIMELINE_HISTORY command.
427  */
428 static void
430 {
432  char histfname[MAXFNAMELEN];
433  char path[MAXPGPATH];
434  int fd;
435  off_t histfilelen;
436  off_t bytesleft;
437  Size len;
438 
439  /*
440  * Reply with a result set with one row, and two columns. The first col is
441  * the name of the history file, 2nd is the contents.
442  */
443 
444  TLHistoryFileName(histfname, cmd->timeline);
445  TLHistoryFilePath(path, cmd->timeline);
446 
447  /* Send a RowDescription message */
448  pq_beginmessage(&buf, 'T');
449  pq_sendint(&buf, 2, 2); /* 2 fields */
450 
451  /* first field */
452  pq_sendstring(&buf, "filename"); /* col name */
453  pq_sendint(&buf, 0, 4); /* table oid */
454  pq_sendint(&buf, 0, 2); /* attnum */
455  pq_sendint(&buf, TEXTOID, 4); /* type oid */
456  pq_sendint(&buf, -1, 2); /* typlen */
457  pq_sendint(&buf, 0, 4); /* typmod */
458  pq_sendint(&buf, 0, 2); /* format code */
459 
460  /* second field */
461  pq_sendstring(&buf, "content"); /* col name */
462  pq_sendint(&buf, 0, 4); /* table oid */
463  pq_sendint(&buf, 0, 2); /* attnum */
464  pq_sendint(&buf, BYTEAOID, 4); /* type oid */
465  pq_sendint(&buf, -1, 2); /* typlen */
466  pq_sendint(&buf, 0, 4); /* typmod */
467  pq_sendint(&buf, 0, 2); /* format code */
468  pq_endmessage(&buf);
469 
470  /* Send a DataRow message */
471  pq_beginmessage(&buf, 'D');
472  pq_sendint(&buf, 2, 2); /* # of columns */
473  len = strlen(histfname);
474  pq_sendint(&buf, len, 4); /* col1 len */
475  pq_sendbytes(&buf, histfname, len);
476 
477  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
478  if (fd < 0)
479  ereport(ERROR,
481  errmsg("could not open file \"%s\": %m", path)));
482 
483  /* Determine file length and send it to client */
484  histfilelen = lseek(fd, 0, SEEK_END);
485  if (histfilelen < 0)
486  ereport(ERROR,
488  errmsg("could not seek to end of file \"%s\": %m", path)));
489  if (lseek(fd, 0, SEEK_SET) != 0)
490  ereport(ERROR,
492  errmsg("could not seek to beginning of file \"%s\": %m", path)));
493 
494  pq_sendint(&buf, histfilelen, 4); /* col2 len */
495 
496  bytesleft = histfilelen;
497  while (bytesleft > 0)
498  {
499  char rbuf[BLCKSZ];
500  int nread;
501 
503  nread = read(fd, rbuf, sizeof(rbuf));
505  if (nread <= 0)
506  ereport(ERROR,
508  errmsg("could not read file \"%s\": %m",
509  path)));
510  pq_sendbytes(&buf, rbuf, nread);
511  bytesleft -= nread;
512  }
513  CloseTransientFile(fd);
514 
515  pq_endmessage(&buf);
516 }
517 
518 /*
519  * Handle START_REPLICATION command.
520  *
521  * At the moment, this never returns, but an ereport(ERROR) will take us back
522  * to the main loop.
523  */
524 static void
526 {
528  XLogRecPtr FlushPtr;
529 
530  if (ThisTimeLineID == 0)
531  ereport(ERROR,
532  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
533  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
534 
535  /*
536  * We assume here that we're logging enough information in the WAL for
537  * log-shipping, since this is checked in PostmasterMain().
538  *
539  * NOTE: wal_level can only change at shutdown, so in most cases it is
540  * difficult for there to be WAL data that we can still see that was
541  * written at wal_level='minimal'.
542  */
543 
544  if (cmd->slotname)
545  {
548  ereport(ERROR,
549  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
550  (errmsg("cannot use a logical replication slot for physical replication"))));
551  }
552 
553  /*
554  * Select the timeline. If it was given explicitly by the client, use
555  * that. Otherwise use the timeline of the last replayed record, which is
556  * kept in ThisTimeLineID.
557  */
559  {
560  /* this also updates ThisTimeLineID */
561  FlushPtr = GetStandbyFlushRecPtr();
562  }
563  else
564  FlushPtr = GetFlushRecPtr();
565 
566  if (cmd->timeline != 0)
567  {
568  XLogRecPtr switchpoint;
569 
570  sendTimeLine = cmd->timeline;
572  {
573  sendTimeLineIsHistoric = false;
575  }
576  else
577  {
578  List *timeLineHistory;
579 
580  sendTimeLineIsHistoric = true;
581 
582  /*
583  * Check that the timeline the client requested exists, and the
584  * requested start location is on that timeline.
585  */
586  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
587  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
589  list_free_deep(timeLineHistory);
590 
591  /*
592  * Found the requested timeline in the history. Check that
593  * requested startpoint is on that timeline in our history.
594  *
595  * This is quite loose on purpose. We only check that we didn't
596  * fork off the requested timeline before the switchpoint. We
597  * don't check that we switched *to* it before the requested
598  * starting point. This is because the client can legitimately
599  * request to start replication from the beginning of the WAL
600  * segment that contains switchpoint, but on the new timeline, so
601  * that it doesn't end up with a partial segment. If you ask for
602  * too old a starting point, you'll get an error later when we
603  * fail to find the requested WAL segment in pg_wal.
604  *
605  * XXX: we could be more strict here and only allow a startpoint
606  * that's older than the switchpoint, if it's still in the same
607  * WAL segment.
608  */
609  if (!XLogRecPtrIsInvalid(switchpoint) &&
610  switchpoint < cmd->startpoint)
611  {
612  ereport(ERROR,
613  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
614  (uint32) (cmd->startpoint >> 32),
615  (uint32) (cmd->startpoint),
616  cmd->timeline),
617  errdetail("This server's history forked from timeline %u at %X/%X.",
618  cmd->timeline,
619  (uint32) (switchpoint >> 32),
620  (uint32) (switchpoint))));
621  }
622  sendTimeLineValidUpto = switchpoint;
623  }
624  }
625  else
626  {
629  sendTimeLineIsHistoric = false;
630  }
631 
633 
634  /* If there is nothing to stream, don't even enter COPY mode */
636  {
637  /*
638  * When we first start replication the standby will be behind the
639  * primary. For some applications, for example synchronous
640  * replication, it is important to have a clear state for this initial
641  * catchup mode, so we can trigger actions when we change streaming
642  * state later. We may stay in this state for a long time, which is
643  * exactly why we want to be able to monitor whether or not we are
644  * still here.
645  */
647 
648  /* Send a CopyBothResponse message, and start streaming */
649  pq_beginmessage(&buf, 'W');
650  pq_sendbyte(&buf, 0);
651  pq_sendint(&buf, 0, 2);
652  pq_endmessage(&buf);
653  pq_flush();
654 
655  /*
656  * Don't allow a request to stream from a future point in WAL that
657  * hasn't been flushed to disk in this server yet.
658  */
659  if (FlushPtr < cmd->startpoint)
660  {
661  ereport(ERROR,
662  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
663  (uint32) (cmd->startpoint >> 32),
664  (uint32) (cmd->startpoint),
665  (uint32) (FlushPtr >> 32),
666  (uint32) (FlushPtr))));
667  }
668 
669  /* Start streaming from the requested point */
670  sentPtr = cmd->startpoint;
671 
672  /* Initialize shared memory status, too */
673  {
674  WalSnd *walsnd = MyWalSnd;
675 
676  SpinLockAcquire(&walsnd->mutex);
677  walsnd->sentPtr = sentPtr;
678  SpinLockRelease(&walsnd->mutex);
679  }
680 
682 
683  /* Main loop of walsender */
684  replication_active = true;
685 
687 
688  replication_active = false;
689  if (got_SIGINT)
690  proc_exit(0);
692 
694  }
695 
696  if (cmd->slotname)
698 
699  /*
700  * Copy is finished now. Send a single-row result set indicating the next
701  * timeline.
702  */
704  {
705  char startpos_str[8 + 1 + 8 + 1];
706  DestReceiver *dest;
707  TupOutputState *tstate;
708  TupleDesc tupdesc;
709  Datum values[2];
710  bool nulls[2];
711 
712  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
713  (uint32) (sendTimeLineValidUpto >> 32),
715 
717  MemSet(nulls, false, sizeof(nulls));
718 
719  /*
720  * Need a tuple descriptor representing two columns. int8 may seem
721  * like a surprising data type for this, but in theory int4 would not
722  * be wide enough for this, as TimeLineID is unsigned.
723  */
724  tupdesc = CreateTemplateTupleDesc(2, false);
725  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
726  INT8OID, -1, 0);
727  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
728  TEXTOID, -1, 0);
729 
730  /* prepare for projection of tuple */
731  tstate = begin_tup_output_tupdesc(dest, tupdesc);
732 
733  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
734  values[1] = CStringGetTextDatum(startpos_str);
735 
736  /* send it to dest */
737  do_tup_output(tstate, values, nulls);
738 
739  end_tup_output(tstate);
740  }
741 
742  /* Send CommandComplete message */
743  pq_puttextmessage('C', "START_STREAMING");
744 }
745 
746 /*
747  * read_page callback for logical decoding contexts, as a walsender process.
748  *
749  * Inside the walsender we can do better than logical_read_local_xlog_page,
750  * which has to do a plain sleep/busy loop, because the walsender's latch gets
751  * set every time WAL is flushed.
752  */
753 static int
755  XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
756 {
757  XLogRecPtr flushptr;
758  int count;
759 
760  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
762  sendTimeLine = state->currTLI;
764  sendTimeLineNextTLI = state->nextTLI;
765 
766  /* make sure we have enough WAL available */
767  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
768 
769  /* more than one block available */
770  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
771  count = XLOG_BLCKSZ;
772  /* not enough WAL synced, that can happen during shutdown */
773  else if (targetPagePtr + reqLen > flushptr)
774  return -1;
775  /* part of the page available */
776  else
777  count = flushptr - targetPagePtr;
778 
779  /* now actually read the data, we know it's there */
780  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
781 
782  return count;
783 }
784 
785 /*
786  * Process extra options given to CREATE_REPLICATION_SLOT.
787  */
788 static void
790  bool *reserve_wal,
791  CRSSnapshotAction *snapshot_action)
792 {
793  ListCell *lc;
794  bool snapshot_action_given = false;
795  bool reserve_wal_given = false;
796 
797  /* Parse options */
798  foreach(lc, cmd->options)
799  {
800  DefElem *defel = (DefElem *) lfirst(lc);
801 
802  if (strcmp(defel->defname, "export_snapshot") == 0)
803  {
804  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
805  ereport(ERROR,
806  (errcode(ERRCODE_SYNTAX_ERROR),
807  errmsg("conflicting or redundant options")));
808 
809  snapshot_action_given = true;
810  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
812  }
813  else if (strcmp(defel->defname, "use_snapshot") == 0)
814  {
815  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
816  ereport(ERROR,
817  (errcode(ERRCODE_SYNTAX_ERROR),
818  errmsg("conflicting or redundant options")));
819 
820  snapshot_action_given = true;
821  *snapshot_action = CRS_USE_SNAPSHOT;
822  }
823  else if (strcmp(defel->defname, "reserve_wal") == 0)
824  {
825  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
826  ereport(ERROR,
827  (errcode(ERRCODE_SYNTAX_ERROR),
828  errmsg("conflicting or redundant options")));
829 
830  reserve_wal_given = true;
831  *reserve_wal = true;
832  }
833  else
834  elog(ERROR, "unrecognized option: %s", defel->defname);
835  }
836 }
837 
838 /*
839  * Create a new replication slot.
840  */
841 static void
843 {
844  const char *snapshot_name = NULL;
845  char xloc[MAXFNAMELEN];
846  char *slot_name;
847  bool reserve_wal = false;
848  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
849  DestReceiver *dest;
850  TupOutputState *tstate;
851  TupleDesc tupdesc;
852  Datum values[4];
853  bool nulls[4];
854 
856 
857  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
858 
859  /* setup state for XLogReadPage */
860  sendTimeLineIsHistoric = false;
862 
863  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
864  {
865  ReplicationSlotCreate(cmd->slotname, false,
867  }
868  else
869  {
871 
872  /*
873  * Initially create persistent slot as ephemeral - that allows us to
874  * nicely handle errors during initialization because it'll get
875  * dropped if this transaction fails. We'll make it persistent at the
876  * end. Temporary slots can be created as temporary from beginning as
877  * they get dropped on error as well.
878  */
879  ReplicationSlotCreate(cmd->slotname, true,
881  }
882 
883  if (cmd->kind == REPLICATION_KIND_LOGICAL)
884  {
886  bool need_full_snapshot = false;
887 
888  /*
889  * Do options check early so that we can bail before calling the
890  * DecodingContextFindStartpoint which can take long time.
891  */
892  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
893  {
894  if (IsTransactionBlock())
895  ereport(ERROR,
896  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
897  "must not be called inside a transaction")));
898 
899  need_full_snapshot = true;
900  }
901  else if (snapshot_action == CRS_USE_SNAPSHOT)
902  {
903  if (!IsTransactionBlock())
904  ereport(ERROR,
905  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
906  "must be called inside a transaction")));
907 
909  ereport(ERROR,
910  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
911  "must be called in REPEATABLE READ isolation mode transaction")));
912 
913  if (FirstSnapshotSet)
914  ereport(ERROR,
915  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
916  "must be called before any query")));
917 
918  if (IsSubTransaction())
919  ereport(ERROR,
920  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
921  "must not be called in a subtransaction")));
922 
923  need_full_snapshot = true;
924  }
925 
926  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
930 
931  /*
932  * Signal that we don't need the timeout mechanism. We're just
933  * creating the replication slot and don't yet accept feedback
934  * messages or send keepalives. As we possibly need to wait for
935  * further WAL the walsender would otherwise possibly be killed too
936  * soon.
937  */
939 
940  /* build initial snapshot, might take a while */
942 
943  /*
944  * Export or use the snapshot if we've been asked to do so.
945  *
946  * NB. We will convert the snapbuild.c kind of snapshot to normal
947  * snapshot when doing this.
948  */
949  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
950  {
951  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
952  }
953  else if (snapshot_action == CRS_USE_SNAPSHOT)
954  {
955  Snapshot snap;
956 
959  }
960 
961  /* don't need the decoding context anymore */
962  FreeDecodingContext(ctx);
963 
964  if (!cmd->temporary)
966  }
967  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
968  {
970 
972 
973  /* Write this slot to disk if it's a permanent one. */
974  if (!cmd->temporary)
976  }
977 
978  snprintf(xloc, sizeof(xloc), "%X/%X",
981 
983  MemSet(nulls, false, sizeof(nulls));
984 
985  /*----------
986  * Need a tuple descriptor representing four columns:
987  * - first field: the slot name
988  * - second field: LSN at which we became consistent
989  * - third field: exported snapshot's name
990  * - fourth field: output plugin
991  *----------
992  */
993  tupdesc = CreateTemplateTupleDesc(4, false);
994  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
995  TEXTOID, -1, 0);
996  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
997  TEXTOID, -1, 0);
998  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
999  TEXTOID, -1, 0);
1000  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1001  TEXTOID, -1, 0);
1002 
1003  /* prepare for projection of tuples */
1004  tstate = begin_tup_output_tupdesc(dest, tupdesc);
1005 
1006  /* slot_name */
1007  slot_name = NameStr(MyReplicationSlot->data.name);
1008  values[0] = CStringGetTextDatum(slot_name);
1009 
1010  /* consistent wal location */
1011  values[1] = CStringGetTextDatum(xloc);
1012 
1013  /* snapshot name, or NULL if none */
1014  if (snapshot_name != NULL)
1015  values[2] = CStringGetTextDatum(snapshot_name);
1016  else
1017  nulls[2] = true;
1018 
1019  /* plugin, or NULL if none */
1020  if (cmd->plugin != NULL)
1021  values[3] = CStringGetTextDatum(cmd->plugin);
1022  else
1023  nulls[3] = true;
1024 
1025  /* send it to dest */
1026  do_tup_output(tstate, values, nulls);
1027  end_tup_output(tstate);
1028 
1030 }
1031 
1032 /*
1033  * Get rid of a replication slot that is no longer wanted.
1034  */
1035 static void
1037 {
1039  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1040 }
1041 
1042 /*
1043  * Load previously initiated logical slot and prepare for sending data (via
1044  * WalSndLoop).
1045  */
1046 static void
1048 {
1050 
1051  /* make sure that our requirements are still fulfilled */
1053 
1055 
1057 
1058  /*
1059  * Force a disconnect, so that the decoding code doesn't need to care
1060  * about an eventual switch from running in recovery, to running in a
1061  * normal environment. Client code is expected to handle reconnects.
1062  */
1064  {
1065  ereport(LOG,
1066  (errmsg("terminating walsender process after promotion")));
1067  got_SIGINT = true;
1068  }
1069 
1071 
1072  /* Send a CopyBothResponse message, and start streaming */
1073  pq_beginmessage(&buf, 'W');
1074  pq_sendbyte(&buf, 0);
1075  pq_sendint(&buf, 0, 2);
1076  pq_endmessage(&buf);
1077  pq_flush();
1078 
1079  /*
1080  * Initialize position to the last ack'ed one, then the xlog records begin
1081  * to be shipped from that position.
1082  */
1083  logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
1088 
1089  /* Start reading WAL from the oldest required WAL. */
1091 
1092  /*
1093  * Report the location after which we'll send out further commits as the
1094  * current sentPtr.
1095  */
1097 
1098  /* Also update the sent position status in shared memory */
1099  {
1100  WalSnd *walsnd = MyWalSnd;
1101 
1102  SpinLockAcquire(&walsnd->mutex);
1104  SpinLockRelease(&walsnd->mutex);
1105  }
1106 
1107  replication_active = true;
1108 
1110 
1111  /* Main loop of walsender */
1113 
1114  FreeDecodingContext(logical_decoding_ctx);
1116 
1117  replication_active = false;
1118  if (got_SIGINT)
1119  proc_exit(0);
1121 
1122  /* Get out of COPY mode (CommandComplete). */
1123  EndCommand("COPY 0", DestRemote);
1124 }
1125 
1126 /*
1127  * LogicalDecodingContext 'prepare_write' callback.
1128  *
1129  * Prepare a write into a StringInfo.
1130  *
1131  * Don't do anything lasting in here, it's quite possible that nothing will be done
1132  * with the data.
1133  */
1134 static void
1136 {
1137  /* can't have sync rep confused by sending the same LSN several times */
1138  if (!last_write)
1139  lsn = InvalidXLogRecPtr;
1140 
1141  resetStringInfo(ctx->out);
1142 
1143  pq_sendbyte(ctx->out, 'w');
1144  pq_sendint64(ctx->out, lsn); /* dataStart */
1145  pq_sendint64(ctx->out, lsn); /* walEnd */
1146 
1147  /*
1148  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1149  * reserve space here.
1150  */
1151  pq_sendint64(ctx->out, 0); /* sendtime */
1152 }
1153 
1154 /*
1155  * LogicalDecodingContext 'write' callback.
1156  *
1157  * Actually write out data previously prepared by WalSndPrepareWrite out to
1158  * the network. Take as long as needed, but process replies from the other
1159  * side and check timeouts during that.
1160  */
1161 static void
1163  bool last_write)
1164 {
1165  /* output previously gathered data in a CopyData packet */
1166  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1167 
1168  /*
1169  * Fill the send timestamp last, so that it is taken as late as possible.
1170  * This is somewhat ugly, but the protocol is set as it's already used for
1171  * several releases by streaming physical replication.
1172  */
1173  resetStringInfo(&tmpbuf);
1174  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
1175  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1176  tmpbuf.data, sizeof(int64));
1177 
1178  /* fast path */
1179  /* Try to flush pending output to the client */
1180  if (pq_flush_if_writable() != 0)
1181  WalSndShutdown();
1182 
1183  if (!pq_is_send_pending())
1184  return;
1185 
1186  for (;;)
1187  {
1188  int wakeEvents;
1189  long sleeptime;
1190  TimestampTz now;
1191 
1192  /*
1193  * Emergency bailout if postmaster has died. This is to avoid the
1194  * necessity for manual cleanup of all postmaster children.
1195  */
1196  if (!PostmasterIsAlive())
1197  exit(1);
1198 
1199  /* Clear any already-pending wakeups */
1201 
1203 
1204  /* Process any requests or signals received recently */
1205  if (got_SIGHUP)
1206  {
1207  got_SIGHUP = false;
1210  }
1211 
1212  /* Check for input from the client */
1214 
1215  /* Try to flush pending output to the client */
1216  if (pq_flush_if_writable() != 0)
1217  WalSndShutdown();
1218 
1219  /* If we finished clearing the buffered data, we're done here. */
1220  if (!pq_is_send_pending())
1221  break;
1222 
1223  now = GetCurrentTimestamp();
1224 
1225  /* die if timeout was reached */
1226  WalSndCheckTimeOut(now);
1227 
1228  /* Send keepalive if the time has come */
1230 
1231  sleeptime = WalSndComputeSleeptime(now);
1232 
1233  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1235 
1236  /* Sleep until something happens or we time out */
1237  WaitLatchOrSocket(MyLatch, wakeEvents,
1238  MyProcPort->sock, sleeptime,
1240  }
1241 
1242  /* reactivate latch so WalSndLoop knows to continue */
1243  SetLatch(MyLatch);
1244 }
1245 
1246 /*
1247  * LogicalDecodingContext 'progress_update' callback.
1248  *
1249  * Write the current position to the log tracker (see XLogSendPhysical).
1250  */
1251 static void
1253 {
1254  static TimestampTz sendTime = 0;
1256 
1257  /*
1258  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1259  * avoid flooding the lag tracker when we commit frequently.
1260  */
1261 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1262  if (!TimestampDifferenceExceeds(sendTime, now,
1264  return;
1265 
1266  LagTrackerWrite(lsn, now);
1267  sendTime = now;
1268 }
1269 
1270 /*
1271  * Wait till WAL < loc is flushed to disk so it can be safely read.
1272  */
1273 static XLogRecPtr
1275 {
1276  int wakeEvents;
1277  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1278 
1279 
1280  /*
1281  * Fast path to avoid acquiring the spinlock in case we already know we
1282  * have enough WAL available. This is particularly interesting if we're
1283  * far behind.
1284  */
1285  if (RecentFlushPtr != InvalidXLogRecPtr &&
1286  loc <= RecentFlushPtr)
1287  return RecentFlushPtr;
1288 
1289  /* Get a more recent flush pointer. */
1290  if (!RecoveryInProgress())
1291  RecentFlushPtr = GetFlushRecPtr();
1292  else
1293  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1294 
1295  for (;;)
1296  {
1297  long sleeptime;
1298  TimestampTz now;
1299 
1300  /*
1301  * Emergency bailout if postmaster has died. This is to avoid the
1302  * necessity for manual cleanup of all postmaster children.
1303  */
1304  if (!PostmasterIsAlive())
1305  exit(1);
1306 
1307  /* Clear any already-pending wakeups */
1309 
1311 
1312  /* Process any requests or signals received recently */
1313  if (got_SIGHUP)
1314  {
1315  got_SIGHUP = false;
1318  }
1319 
1320  /* Check for input from the client */
1322 
1323  /* Update our idea of the currently flushed position. */
1324  if (!RecoveryInProgress())
1325  RecentFlushPtr = GetFlushRecPtr();
1326  else
1327  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1328 
1329  /*
1330  * If postmaster asked us to switch to the stopping state, do so.
1331  * Shutdown is in progress and this will allow the checkpointer to
1332  * move on with the shutdown checkpoint.
1333  */
1334  if (got_SIGUSR2)
1336 
1337  /*
1338  * If postmaster asked us to stop, don't wait here anymore. This will
1339  * cause the xlogreader to return without reading a full record, which
1340  * is the fastest way to reach the mainloop which then can quit.
1341  *
1342  * It's important to do this check after the recomputation of
1343  * RecentFlushPtr, so we can send all remaining data before shutting
1344  * down.
1345  */
1346  if (got_SIGINT)
1347  break;
1348 
1349  /*
1350  * We only send regular messages to the client for full decoded
1351  * transactions, but a synchronous replication and walsender shutdown
1352  * possibly are waiting for a later location. So we send pings
1353  * containing the flush location every now and then.
1354  */
1355  if (MyWalSnd->flush < sentPtr &&
1356  MyWalSnd->write < sentPtr &&
1358  {
1359  WalSndKeepalive(false);
1361  }
1362 
1363  /* check whether we're done */
1364  if (loc <= RecentFlushPtr)
1365  break;
1366 
1367  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1368  WalSndCaughtUp = true;
1369 
1370  /*
1371  * Try to flush pending output to the client. Also wait for the socket
1372  * becoming writable, if there's still pending output after an attempt
1373  * to flush. Otherwise we might just sit on output data while waiting
1374  * for new WAL being generated.
1375  */
1376  if (pq_flush_if_writable() != 0)
1377  WalSndShutdown();
1378 
1379  now = GetCurrentTimestamp();
1380 
1381  /* die if timeout was reached */
1382  WalSndCheckTimeOut(now);
1383 
1384  /* Send keepalive if the time has come */
1386 
1387  sleeptime = WalSndComputeSleeptime(now);
1388 
1389  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1391 
1392  if (pq_is_send_pending())
1393  wakeEvents |= WL_SOCKET_WRITEABLE;
1394 
1395  /* Sleep until something happens or we time out */
1396  WaitLatchOrSocket(MyLatch, wakeEvents,
1397  MyProcPort->sock, sleeptime,
1399  }
1400 
1401  /* reactivate latch so WalSndLoop knows to continue */
1402  SetLatch(MyLatch);
1403  return RecentFlushPtr;
1404 }
1405 
1406 /*
1407  * Execute an incoming replication command.
1408  *
1409  * Returns true if the cmd_string was recognized as WalSender command, false
1410  * if not.
1411  */
1412 bool
1413 exec_replication_command(const char *cmd_string)
1414 {
1415  int parse_rc;
1416  Node *cmd_node;
1417  MemoryContext cmd_context;
1418  MemoryContext old_context;
1419 
1420  /*
1421  * If WAL sender has been told that shutdown is getting close, switch its
1422  * status accordingly to handle the next replication commands correctly.
1423  */
1424  if (got_SIGUSR2)
1426 
1427  /*
1428  * Throw error if in stopping mode. We need prevent commands that could
1429  * generate WAL while the shutdown checkpoint is being written. To be
1430  * safe, we just prohibit all new commands.
1431  */
1432  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1433  ereport(ERROR,
1434  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1435 
1436  /*
1437  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1438  * command arrives. Clean up the old stuff if there's anything.
1439  */
1441 
1443 
1445  "Replication command context",
1447  old_context = MemoryContextSwitchTo(cmd_context);
1448 
1449  replication_scanner_init(cmd_string);
1450  parse_rc = replication_yyparse();
1451  if (parse_rc != 0)
1452  ereport(ERROR,
1453  (errcode(ERRCODE_SYNTAX_ERROR),
1454  (errmsg_internal("replication command parser returned %d",
1455  parse_rc))));
1456 
1457  cmd_node = replication_parse_result;
1458 
1459  /*
1460  * Log replication command if log_replication_commands is enabled. Even
1461  * when it's disabled, log the command with DEBUG1 level for backward
1462  * compatibility. Note that SQL commands are not logged here, and will be
1463  * logged later if log_statement is enabled.
1464  */
1465  if (cmd_node->type != T_SQLCmd)
1467  (errmsg("received replication command: %s", cmd_string)));
1468 
1469  /*
1470  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1471  * called outside of transaction the snapshot should be cleared here.
1472  */
1473  if (!IsTransactionBlock())
1475 
1476  /*
1477  * For aborted transactions, don't allow anything except pure SQL, the
1478  * exec_simple_query() will handle it correctly.
1479  */
1480  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1481  ereport(ERROR,
1482  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1483  errmsg("current transaction is aborted, "
1484  "commands ignored until end of transaction block")));
1485 
1487 
1488  /*
1489  * Allocate buffers that will be used for each outgoing and incoming
1490  * message. We do this just once per command to reduce palloc overhead.
1491  */
1492  initStringInfo(&output_message);
1493  initStringInfo(&reply_message);
1494  initStringInfo(&tmpbuf);
1495 
1496  switch (cmd_node->type)
1497  {
1498  case T_IdentifySystemCmd:
1499  IdentifySystem();
1500  break;
1501 
1502  case T_BaseBackupCmd:
1503  PreventTransactionChain(true, "BASE_BACKUP");
1504  SendBaseBackup((BaseBackupCmd *) cmd_node);
1505  break;
1506 
1509  break;
1510 
1513  break;
1514 
1515  case T_StartReplicationCmd:
1516  {
1517  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1518 
1519  PreventTransactionChain(true, "START_REPLICATION");
1520 
1521  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1522  StartReplication(cmd);
1523  else
1525  break;
1526  }
1527 
1528  case T_TimeLineHistoryCmd:
1529  PreventTransactionChain(true, "TIMELINE_HISTORY");
1531  break;
1532 
1533  case T_VariableShowStmt:
1534  {
1536  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1537 
1538  GetPGVariable(n->name, dest);
1539  }
1540  break;
1541 
1542  case T_SQLCmd:
1543  if (MyDatabaseId == InvalidOid)
1544  ereport(ERROR,
1545  (errmsg("not connected to database")));
1546 
1547  /* Tell the caller that this wasn't a WalSender command. */
1548  return false;
1549 
1550  default:
1551  elog(ERROR, "unrecognized replication command node tag: %u",
1552  cmd_node->type);
1553  }
1554 
1555  /* done */
1556  MemoryContextSwitchTo(old_context);
1557  MemoryContextDelete(cmd_context);
1558 
1559  /* Send CommandComplete message */
1560  EndCommand("SELECT", DestRemote);
1561 
1562  return true;
1563 }
1564 
1565 /*
1566  * Process any incoming messages while streaming. Also checks if the remote
1567  * end has closed the connection.
1568  */
1569 static void
1571 {
1572  unsigned char firstchar;
1573  int r;
1574  bool received = false;
1575 
1576  for (;;)
1577  {
1578  pq_startmsgread();
1579  r = pq_getbyte_if_available(&firstchar);
1580  if (r < 0)
1581  {
1582  /* unexpected error or EOF */
1584  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1585  errmsg("unexpected EOF on standby connection")));
1586  proc_exit(0);
1587  }
1588  if (r == 0)
1589  {
1590  /* no data available without blocking */
1591  pq_endmsgread();
1592  break;
1593  }
1594 
1595  /* Read the message contents */
1596  resetStringInfo(&reply_message);
1597  if (pq_getmessage(&reply_message, 0))
1598  {
1600  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1601  errmsg("unexpected EOF on standby connection")));
1602  proc_exit(0);
1603  }
1604 
1605  /*
1606  * If we already received a CopyDone from the frontend, the frontend
1607  * should not send us anything until we've closed our end of the COPY.
1608  * XXX: In theory, the frontend could already send the next command
1609  * before receiving the CopyDone, but libpq doesn't currently allow
1610  * that.
1611  */
1612  if (streamingDoneReceiving && firstchar != 'X')
1613  ereport(FATAL,
1614  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1615  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1616  firstchar)));
1617 
1618  /* Handle the very limited subset of commands expected in this phase */
1619  switch (firstchar)
1620  {
1621  /*
1622  * 'd' means a standby reply wrapped in a CopyData packet.
1623  */
1624  case 'd':
1626  received = true;
1627  break;
1628 
1629  /*
1630  * CopyDone means the standby requested to finish streaming.
1631  * Reply with CopyDone, if we had not sent that already.
1632  */
1633  case 'c':
1634  if (!streamingDoneSending)
1635  {
1636  pq_putmessage_noblock('c', NULL, 0);
1637  streamingDoneSending = true;
1638  }
1639 
1640  streamingDoneReceiving = true;
1641  received = true;
1642  break;
1643 
1644  /*
1645  * 'X' means that the standby is closing down the socket.
1646  */
1647  case 'X':
1648  proc_exit(0);
1649 
1650  default:
1651  ereport(FATAL,
1652  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1653  errmsg("invalid standby message type \"%c\"",
1654  firstchar)));
1655  }
1656  }
1657 
1658  /*
1659  * Save the last reply timestamp if we've received at least one reply.
1660  */
1661  if (received)
1662  {
1664  waiting_for_ping_response = false;
1665  }
1666 }
1667 
1668 /*
1669  * Process a status update message received from standby.
1670  */
1671 static void
1673 {
1674  char msgtype;
1675 
1676  /*
1677  * Check message type from the first byte.
1678  */
1679  msgtype = pq_getmsgbyte(&reply_message);
1680 
1681  switch (msgtype)
1682  {
1683  case 'r':
1685  break;
1686 
1687  case 'h':
1689  break;
1690 
1691  default:
1693  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1694  errmsg("unexpected message type \"%c\"", msgtype)));
1695  proc_exit(0);
1696  }
1697 }
1698 
1699 /*
1700  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1701  */
1702 static void
1704 {
1705  bool changed = false;
1707 
1708  Assert(lsn != InvalidXLogRecPtr);
1709  SpinLockAcquire(&slot->mutex);
1710  if (slot->data.restart_lsn != lsn)
1711  {
1712  changed = true;
1713  slot->data.restart_lsn = lsn;
1714  }
1715  SpinLockRelease(&slot->mutex);
1716 
1717  if (changed)
1718  {
1721  }
1722 
1723  /*
1724  * One could argue that the slot should be saved to disk now, but that'd
1725  * be energy wasted - the worst lost information can do here is give us
1726  * wrong information in a statistics view - we'll just potentially be more
1727  * conservative in removing files.
1728  */
1729 }
1730 
1731 /*
1732  * Regular reply from standby advising of WAL locations on standby server.
1733  */
1734 static void
1736 {
1737  XLogRecPtr writePtr,
1738  flushPtr,
1739  applyPtr;
1740  bool replyRequested;
1741  TimeOffset writeLag,
1742  flushLag,
1743  applyLag;
1744  bool clearLagTimes;
1745  TimestampTz now;
1746 
1747  static bool fullyAppliedLastTime = false;
1748 
1749  /* the caller already consumed the msgtype byte */
1750  writePtr = pq_getmsgint64(&reply_message);
1751  flushPtr = pq_getmsgint64(&reply_message);
1752  applyPtr = pq_getmsgint64(&reply_message);
1753  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1754  replyRequested = pq_getmsgbyte(&reply_message);
1755 
1756  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1757  (uint32) (writePtr >> 32), (uint32) writePtr,
1758  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1759  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1760  replyRequested ? " (reply requested)" : "");
1761 
1762  /* See if we can compute the round-trip lag for these positions. */
1763  now = GetCurrentTimestamp();
1764  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1765  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1766  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1767 
1768  /*
1769  * If the standby reports that it has fully replayed the WAL in two
1770  * consecutive reply messages, then the second such message must result
1771  * from wal_receiver_status_interval expiring on the standby. This is a
1772  * convenient time to forget the lag times measured when it last
1773  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1774  * until more WAL traffic arrives.
1775  */
1776  clearLagTimes = false;
1777  if (applyPtr == sentPtr)
1778  {
1779  if (fullyAppliedLastTime)
1780  clearLagTimes = true;
1781  fullyAppliedLastTime = true;
1782  }
1783  else
1784  fullyAppliedLastTime = false;
1785 
1786  /* Send a reply if the standby requested one. */
1787  if (replyRequested)
1788  WalSndKeepalive(false);
1789 
1790  /*
1791  * Update shared state for this WalSender process based on reply data from
1792  * standby.
1793  */
1794  {
1795  WalSnd *walsnd = MyWalSnd;
1796 
1797  SpinLockAcquire(&walsnd->mutex);
1798  walsnd->write = writePtr;
1799  walsnd->flush = flushPtr;
1800  walsnd->apply = applyPtr;
1801  if (writeLag != -1 || clearLagTimes)
1802  walsnd->writeLag = writeLag;
1803  if (flushLag != -1 || clearLagTimes)
1804  walsnd->flushLag = flushLag;
1805  if (applyLag != -1 || clearLagTimes)
1806  walsnd->applyLag = applyLag;
1807  SpinLockRelease(&walsnd->mutex);
1808  }
1809 
1812 
1813  /*
1814  * Advance our local xmin horizon when the client confirmed a flush.
1815  */
1816  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1817  {
1820  else
1822  }
1823 }
1824 
1825 /* compute new replication slot xmin horizon if needed */
1826 static void
1828 {
1829  bool changed = false;
1831 
1832  SpinLockAcquire(&slot->mutex);
1834 
1835  /*
1836  * For physical replication we don't need the interlock provided by xmin
1837  * and effective_xmin since the consequences of a missed increase are
1838  * limited to query cancellations, so set both at once.
1839  */
1840  if (!TransactionIdIsNormal(slot->data.xmin) ||
1841  !TransactionIdIsNormal(feedbackXmin) ||
1842  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1843  {
1844  changed = true;
1845  slot->data.xmin = feedbackXmin;
1846  slot->effective_xmin = feedbackXmin;
1847  }
1848  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1849  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1850  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1851  {
1852  changed = true;
1853  slot->data.catalog_xmin = feedbackCatalogXmin;
1854  slot->effective_catalog_xmin = feedbackCatalogXmin;
1855  }
1856  SpinLockRelease(&slot->mutex);
1857 
1858  if (changed)
1859  {
1862  }
1863 }
1864 
1865 /*
1866  * Check that the provided xmin/epoch are sane, that is, not in the future
1867  * and not so far back as to be already wrapped around.
1868  *
1869  * Epoch of nextXid should be same as standby, or if the counter has
1870  * wrapped, then one greater than standby.
1871  *
1872  * This check doesn't care about whether clog exists for these xids
1873  * at all.
1874  */
1875 static bool
1877 {
1878  TransactionId nextXid;
1879  uint32 nextEpoch;
1880 
1881  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1882 
1883  if (xid <= nextXid)
1884  {
1885  if (epoch != nextEpoch)
1886  return false;
1887  }
1888  else
1889  {
1890  if (epoch + 1 != nextEpoch)
1891  return false;
1892  }
1893 
1894  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1895  return false; /* epoch OK, but it's wrapped around */
1896 
1897  return true;
1898 }
1899 
1900 /*
1901  * Hot Standby feedback
1902  */
1903 static void
1905 {
1906  TransactionId feedbackXmin;
1907  uint32 feedbackEpoch;
1908  TransactionId feedbackCatalogXmin;
1909  uint32 feedbackCatalogEpoch;
1910 
1911  /*
1912  * Decipher the reply message. The caller already consumed the msgtype
1913  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1914  * of this message.
1915  */
1916  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1917  feedbackXmin = pq_getmsgint(&reply_message, 4);
1918  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1919  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1920  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1921 
1922  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1923  feedbackXmin,
1924  feedbackEpoch,
1925  feedbackCatalogXmin,
1926  feedbackCatalogEpoch);
1927 
1928  /*
1929  * Unset WalSender's xmins if the feedback message values are invalid.
1930  * This happens when the downstream turned hot_standby_feedback off.
1931  */
1932  if (!TransactionIdIsNormal(feedbackXmin)
1933  && !TransactionIdIsNormal(feedbackCatalogXmin))
1934  {
1936  if (MyReplicationSlot != NULL)
1937  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1938  return;
1939  }
1940 
1941  /*
1942  * Check that the provided xmin/epoch are sane, that is, not in the future
1943  * and not so far back as to be already wrapped around. Ignore if not.
1944  */
1945  if (TransactionIdIsNormal(feedbackXmin) &&
1946  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1947  return;
1948 
1949  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1950  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1951  return;
1952 
1953  /*
1954  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1955  * the xmin will be taken into account by GetOldestXmin. This will hold
1956  * back the removal of dead rows and thereby prevent the generation of
1957  * cleanup conflicts on the standby server.
1958  *
1959  * There is a small window for a race condition here: although we just
1960  * checked that feedbackXmin precedes nextXid, the nextXid could have
1961  * gotten advanced between our fetching it and applying the xmin below,
1962  * perhaps far enough to make feedbackXmin wrap around. In that case the
1963  * xmin we set here would be "in the future" and have no effect. No point
1964  * in worrying about this since it's too late to save the desired data
1965  * anyway. Assuming that the standby sends us an increasing sequence of
1966  * xmins, this could only happen during the first reply cycle, else our
1967  * own xmin would prevent nextXid from advancing so far.
1968  *
1969  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1970  * is assumed atomic, and there's no real need to prevent a concurrent
1971  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1972  * safe, and if we're moving it backwards, well, the data is at risk
1973  * already since a VACUUM could have just finished calling GetOldestXmin.)
1974  *
1975  * If we're using a replication slot we reserve the xmin via that,
1976  * otherwise via the walsender's PGXACT entry. We can only track the
1977  * catalog xmin separately when using a slot, so we store the least of the
1978  * two provided when not using a slot.
1979  *
1980  * XXX: It might make sense to generalize the ephemeral slot concept and
1981  * always use the slot mechanism to handle the feedback xmin.
1982  */
1983  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1984  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1985  else
1986  {
1987  if (TransactionIdIsNormal(feedbackCatalogXmin)
1988  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
1989  MyPgXact->xmin = feedbackCatalogXmin;
1990  else
1991  MyPgXact->xmin = feedbackXmin;
1992  }
1993 }
1994 
1995 /*
1996  * Compute how long send/receive loops should sleep.
1997  *
1998  * If wal_sender_timeout is enabled we want to wake up in time to send
1999  * keepalives and to abort the connection if wal_sender_timeout has been
2000  * reached.
2001  */
2002 static long
2004 {
2005  long sleeptime = 10000; /* 10 s */
2006 
2008  {
2009  TimestampTz wakeup_time;
2010  long sec_to_timeout;
2011  int microsec_to_timeout;
2012 
2013  /*
2014  * At the latest stop sleeping once wal_sender_timeout has been
2015  * reached.
2016  */
2019 
2020  /*
2021  * If no ping has been sent yet, wakeup when it's time to do so.
2022  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2023  * the timeout passed without a response.
2024  */
2027  wal_sender_timeout / 2);
2028 
2029  /* Compute relative time until wakeup. */
2030  TimestampDifference(now, wakeup_time,
2031  &sec_to_timeout, &microsec_to_timeout);
2032 
2033  sleeptime = sec_to_timeout * 1000 +
2034  microsec_to_timeout / 1000;
2035  }
2036 
2037  return sleeptime;
2038 }
2039 
2040 /*
2041  * Check whether there have been responses by the client within
2042  * wal_sender_timeout and shutdown if not.
2043  */
2044 static void
2046 {
2047  TimestampTz timeout;
2048 
2049  /* don't bail out if we're doing something that doesn't require timeouts */
2050  if (last_reply_timestamp <= 0)
2051  return;
2052 
2055 
2056  if (wal_sender_timeout > 0 && now >= timeout)
2057  {
2058  /*
2059  * Since typically expiration of replication timeout means
2060  * communication problem, we don't send the error message to the
2061  * standby.
2062  */
2064  (errmsg("terminating walsender process due to replication timeout")));
2065 
2066  WalSndShutdown();
2067  }
2068 }
2069 
2070 /* Main loop of walsender process that streams the WAL over Copy messages. */
2071 static void
2073 {
2074  /*
2075  * Initialize the last reply timestamp. That enables timeout processing
2076  * from hereon.
2077  */
2079  waiting_for_ping_response = false;
2080 
2081  /* Report to pgstat that this process is running */
2083 
2084  /*
2085  * Loop until we reach the end of this timeline or the client requests to
2086  * stop streaming.
2087  */
2088  for (;;)
2089  {
2090  TimestampTz now;
2091 
2092  /*
2093  * Emergency bailout if postmaster has died. This is to avoid the
2094  * necessity for manual cleanup of all postmaster children.
2095  */
2096  if (!PostmasterIsAlive())
2097  exit(1);
2098 
2099  /* Clear any already-pending wakeups */
2101 
2103 
2104  /* Process any requests or signals received recently */
2105  if (got_SIGHUP)
2106  {
2107  got_SIGHUP = false;
2110  }
2111 
2112  /* Check for input from the client */
2114 
2115  /*
2116  * If we have received CopyDone from the client, sent CopyDone
2117  * ourselves, and the output buffer is empty, it's time to exit
2118  * streaming.
2119  */
2121  break;
2122 
2123  /*
2124  * If we don't have any pending data in the output buffer, try to send
2125  * some more. If there is some, we don't bother to call send_data
2126  * again until we've flushed it ... but we'd better assume we are not
2127  * caught up.
2128  */
2129  if (!pq_is_send_pending())
2130  send_data();
2131  else
2132  WalSndCaughtUp = false;
2133 
2134  /* Try to flush pending output to the client */
2135  if (pq_flush_if_writable() != 0)
2136  WalSndShutdown();
2137 
2138  /* If nothing remains to be sent right now ... */
2140  {
2141  /*
2142  * If we're in catchup state, move to streaming. This is an
2143  * important state change for users to know about, since before
2144  * this point data loss might occur if the primary dies and we
2145  * need to failover to the standby. The state change is also
2146  * important for synchronous replication, since commits that
2147  * started to wait at that point might wait for some time.
2148  */
2149  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2150  {
2151  ereport(DEBUG1,
2152  (errmsg("standby \"%s\" has now caught up with primary",
2153  application_name)));
2155  }
2156 
2157  /*
2158  * At the reception of SIGUSR2, switch the WAL sender to the
2159  * stopping state.
2160  */
2161  if (got_SIGUSR2)
2163 
2164  /*
2165  * When SIGINT arrives, we send any outstanding logs up to the
2166  * shutdown checkpoint record (i.e., the latest record), wait for
2167  * them to be replicated to the standby, and exit. This may be a
2168  * normal termination at shutdown, or a promotion, the walsender
2169  * is not sure which.
2170  */
2171  if (got_SIGINT)
2172  WalSndDone(send_data);
2173  }
2174 
2175  now = GetCurrentTimestamp();
2176 
2177  /* Check for replication timeout. */
2178  WalSndCheckTimeOut(now);
2179 
2180  /* Send keepalive if the time has come */
2182 
2183  /*
2184  * We don't block if not caught up, unless there is unsent data
2185  * pending in which case we'd better block until the socket is
2186  * write-ready. This test is only needed for the case where the
2187  * send_data callback handled a subset of the available data but then
2188  * pq_flush_if_writable flushed it all --- we should immediately try
2189  * to send more.
2190  */
2192  {
2193  long sleeptime;
2194  int wakeEvents;
2195 
2196  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2198 
2199  sleeptime = WalSndComputeSleeptime(now);
2200 
2201  if (pq_is_send_pending())
2202  wakeEvents |= WL_SOCKET_WRITEABLE;
2203 
2204  /* Sleep until something happens or we time out */
2205  WaitLatchOrSocket(MyLatch, wakeEvents,
2206  MyProcPort->sock, sleeptime,
2208  }
2209  }
2210  return;
2211 }
2212 
2213 /* Initialize a per-walsender data structure for this walsender process */
2214 static void
2216 {
2217  int i;
2218 
2219  /*
2220  * WalSndCtl should be set up already (we inherit this by fork() or
2221  * EXEC_BACKEND mechanism from the postmaster).
2222  */
2223  Assert(WalSndCtl != NULL);
2224  Assert(MyWalSnd == NULL);
2225 
2226  /*
2227  * Find a free walsender slot and reserve it. If this fails, we must be
2228  * out of WalSnd structures.
2229  */
2230  for (i = 0; i < max_wal_senders; i++)
2231  {
2232  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2233 
2234  SpinLockAcquire(&walsnd->mutex);
2235 
2236  if (walsnd->pid != 0)
2237  {
2238  SpinLockRelease(&walsnd->mutex);
2239  continue;
2240  }
2241  else
2242  {
2243  /*
2244  * Found a free slot. Reserve it for us.
2245  */
2246  walsnd->pid = MyProcPid;
2247  walsnd->sentPtr = InvalidXLogRecPtr;
2248  walsnd->write = InvalidXLogRecPtr;
2249  walsnd->flush = InvalidXLogRecPtr;
2250  walsnd->apply = InvalidXLogRecPtr;
2251  walsnd->writeLag = -1;
2252  walsnd->flushLag = -1;
2253  walsnd->applyLag = -1;
2254  walsnd->state = WALSNDSTATE_STARTUP;
2255  walsnd->latch = &MyProc->procLatch;
2256  SpinLockRelease(&walsnd->mutex);
2257  /* don't need the lock anymore */
2258  MyWalSnd = (WalSnd *) walsnd;
2259 
2260  break;
2261  }
2262  }
2263  if (MyWalSnd == NULL)
2264  ereport(FATAL,
2265  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2266  errmsg("number of requested standby connections "
2267  "exceeds max_wal_senders (currently %d)",
2268  max_wal_senders)));
2269 
2270  /* Arrange to clean up at walsender exit */
2272 }
2273 
2274 /* Destroy the per-walsender data structure for this walsender process */
2275 static void
2277 {
2278  WalSnd *walsnd = MyWalSnd;
2279 
2280  Assert(walsnd != NULL);
2281 
2282  MyWalSnd = NULL;
2283 
2284  SpinLockAcquire(&walsnd->mutex);
2285  /* clear latch while holding the spinlock, so it can safely be read */
2286  walsnd->latch = NULL;
2287  /* Mark WalSnd struct as no longer being in use. */
2288  walsnd->pid = 0;
2289  SpinLockRelease(&walsnd->mutex);
2290 }
2291 
2292 /*
2293  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2294  *
2295  * XXX probably this should be improved to suck data directly from the
2296  * WAL buffers when possible.
2297  *
2298  * Will open, and keep open, one WAL segment stored in the global file
2299  * descriptor sendFile. This means if XLogRead is used once, there will
2300  * always be one descriptor left open until the process ends, but never
2301  * more than one.
2302  */
2303 static void
2304 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2305 {
2306  char *p;
2307  XLogRecPtr recptr;
2308  Size nbytes;
2309  XLogSegNo segno;
2310 
2311 retry:
2312  p = buf;
2313  recptr = startptr;
2314  nbytes = count;
2315 
2316  while (nbytes > 0)
2317  {
2318  uint32 startoff;
2319  int segbytes;
2320  int readbytes;
2321 
2322  startoff = recptr % XLogSegSize;
2323 
2324  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2325  {
2326  char path[MAXPGPATH];
2327 
2328  /* Switch to another logfile segment */
2329  if (sendFile >= 0)
2330  close(sendFile);
2331 
2332  XLByteToSeg(recptr, sendSegNo);
2333 
2334  /*-------
2335  * When reading from a historic timeline, and there is a timeline
2336  * switch within this segment, read from the WAL segment belonging
2337  * to the new timeline.
2338  *
2339  * For example, imagine that this server is currently on timeline
2340  * 5, and we're streaming timeline 4. The switch from timeline 4
2341  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2342  *
2343  * ...
2344  * 000000040000000000000012
2345  * 000000040000000000000013
2346  * 000000050000000000000013
2347  * 000000050000000000000014
2348  * ...
2349  *
2350  * In this situation, when requested to send the WAL from
2351  * segment 0x13, on timeline 4, we read the WAL from file
2352  * 000000050000000000000013. Archive recovery prefers files from
2353  * newer timelines, so if the segment was restored from the
2354  * archive on this server, the file belonging to the old timeline,
2355  * 000000040000000000000013, might not exist. Their contents are
2356  * equal up to the switchpoint, because at a timeline switch, the
2357  * used portion of the old segment is copied to the new file.
2358  *-------
2359  */
2362  {
2363  XLogSegNo endSegNo;
2364 
2366  if (sendSegNo == endSegNo)
2368  }
2369 
2371 
2372  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2373  if (sendFile < 0)
2374  {
2375  /*
2376  * If the file is not found, assume it's because the standby
2377  * asked for a too old WAL segment that has already been
2378  * removed or recycled.
2379  */
2380  if (errno == ENOENT)
2381  ereport(ERROR,
2383  errmsg("requested WAL segment %s has already been removed",
2385  else
2386  ereport(ERROR,
2388  errmsg("could not open file \"%s\": %m",
2389  path)));
2390  }
2391  sendOff = 0;
2392  }
2393 
2394  /* Need to seek in the file? */
2395  if (sendOff != startoff)
2396  {
2397  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2398  ereport(ERROR,
2400  errmsg("could not seek in log segment %s to offset %u: %m",
2402  startoff)));
2403  sendOff = startoff;
2404  }
2405 
2406  /* How many bytes are within this segment? */
2407  if (nbytes > (XLogSegSize - startoff))
2408  segbytes = XLogSegSize - startoff;
2409  else
2410  segbytes = nbytes;
2411 
2413  readbytes = read(sendFile, p, segbytes);
2415  if (readbytes <= 0)
2416  {
2417  ereport(ERROR,
2419  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2421  sendOff, (unsigned long) segbytes)));
2422  }
2423 
2424  /* Update state for read */
2425  recptr += readbytes;
2426 
2427  sendOff += readbytes;
2428  nbytes -= readbytes;
2429  p += readbytes;
2430  }
2431 
2432  /*
2433  * After reading into the buffer, check that what we read was valid. We do
2434  * this after reading, because even though the segment was present when we
2435  * opened it, it might get recycled or removed while we read it. The
2436  * read() succeeds in that case, but the data we tried to read might
2437  * already have been overwritten with new WAL records.
2438  */
2439  XLByteToSeg(startptr, segno);
2441 
2442  /*
2443  * During recovery, the currently-open WAL file might be replaced with the
2444  * file of the same name retrieved from archive. So we always need to
2445  * check what we read was valid after reading into the buffer. If it's
2446  * invalid, we try to open and read the file again.
2447  */
2449  {
2450  WalSnd *walsnd = MyWalSnd;
2451  bool reload;
2452 
2453  SpinLockAcquire(&walsnd->mutex);
2454  reload = walsnd->needreload;
2455  walsnd->needreload = false;
2456  SpinLockRelease(&walsnd->mutex);
2457 
2458  if (reload && sendFile >= 0)
2459  {
2460  close(sendFile);
2461  sendFile = -1;
2462 
2463  goto retry;
2464  }
2465  }
2466 }
2467 
2468 /*
2469  * Send out the WAL in its normal physical/stored form.
2470  *
2471  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2472  * but not yet sent to the client, and buffer it in the libpq output
2473  * buffer.
2474  *
2475  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2476  * otherwise WalSndCaughtUp is set to false.
2477  */
2478 static void
2480 {
2481  XLogRecPtr SendRqstPtr;
2482  XLogRecPtr startptr;
2483  XLogRecPtr endptr;
2484  Size nbytes;
2485 
2487  {
2488  WalSndCaughtUp = true;
2489  return;
2490  }
2491 
2492  /* Figure out how far we can safely send the WAL. */
2494  {
2495  /*
2496  * Streaming an old timeline that's in this server's history, but is
2497  * not the one we're currently inserting or replaying. It can be
2498  * streamed up to the point where we switched off that timeline.
2499  */
2500  SendRqstPtr = sendTimeLineValidUpto;
2501  }
2502  else if (am_cascading_walsender)
2503  {
2504  /*
2505  * Streaming the latest timeline on a standby.
2506  *
2507  * Attempt to send all WAL that has already been replayed, so that we
2508  * know it's valid. If we're receiving WAL through streaming
2509  * replication, it's also OK to send any WAL that has been received
2510  * but not replayed.
2511  *
2512  * The timeline we're recovering from can change, or we can be
2513  * promoted. In either case, the current timeline becomes historic. We
2514  * need to detect that so that we don't try to stream past the point
2515  * where we switched to another timeline. We check for promotion or
2516  * timeline switch after calculating FlushPtr, to avoid a race
2517  * condition: if the timeline becomes historic just after we checked
2518  * that it was still current, it's still be OK to stream it up to the
2519  * FlushPtr that was calculated before it became historic.
2520  */
2521  bool becameHistoric = false;
2522 
2523  SendRqstPtr = GetStandbyFlushRecPtr();
2524 
2525  if (!RecoveryInProgress())
2526  {
2527  /*
2528  * We have been promoted. RecoveryInProgress() updated
2529  * ThisTimeLineID to the new current timeline.
2530  */
2531  am_cascading_walsender = false;
2532  becameHistoric = true;
2533  }
2534  else
2535  {
2536  /*
2537  * Still a cascading standby. But is the timeline we're sending
2538  * still the one recovery is recovering from? ThisTimeLineID was
2539  * updated by the GetStandbyFlushRecPtr() call above.
2540  */
2542  becameHistoric = true;
2543  }
2544 
2545  if (becameHistoric)
2546  {
2547  /*
2548  * The timeline we were sending has become historic. Read the
2549  * timeline history file of the new timeline to see where exactly
2550  * we forked off from the timeline we were sending.
2551  */
2552  List *history;
2553 
2556 
2558  list_free_deep(history);
2559 
2560  sendTimeLineIsHistoric = true;
2561 
2562  SendRqstPtr = sendTimeLineValidUpto;
2563  }
2564  }
2565  else
2566  {
2567  /*
2568  * Streaming the current timeline on a master.
2569  *
2570  * Attempt to send all data that's already been written out and
2571  * fsync'd to disk. We cannot go further than what's been written out
2572  * given the current implementation of XLogRead(). And in any case
2573  * it's unsafe to send WAL that is not securely down to disk on the
2574  * master: if the master subsequently crashes and restarts, slaves
2575  * must not have applied any WAL that got lost on the master.
2576  */
2577  SendRqstPtr = GetFlushRecPtr();
2578  }
2579 
2580  /*
2581  * Record the current system time as an approximation of the time at which
2582  * this WAL location was written for the purposes of lag tracking.
2583  *
2584  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2585  * is flushed and we could get that time as well as the LSN when we call
2586  * GetFlushRecPtr() above (and likewise for the cascading standby
2587  * equivalent), but rather than putting any new code into the hot WAL path
2588  * it seems good enough to capture the time here. We should reach this
2589  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2590  * may take some time, we read the WAL flush pointer and take the time
2591  * very close to together here so that we'll get a later position if it is
2592  * still moving.
2593  *
2594  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2595  * this gives us a cheap approximation for the WAL flush time for this
2596  * LSN.
2597  *
2598  * Note that the LSN is not necessarily the LSN for the data contained in
2599  * the present message; it's the end of the WAL, which might be further
2600  * ahead. All the lag tracking machinery cares about is finding out when
2601  * that arbitrary LSN is eventually reported as written, flushed and
2602  * applied, so that it can measure the elapsed time.
2603  */
2604  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2605 
2606  /*
2607  * If this is a historic timeline and we've reached the point where we
2608  * forked to the next timeline, stop streaming.
2609  *
2610  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2611  * startup process will normally replay all WAL that has been received
2612  * from the master, before promoting, but if the WAL streaming is
2613  * terminated at a WAL page boundary, the valid portion of the timeline
2614  * might end in the middle of a WAL record. We might've already sent the
2615  * first half of that partial WAL record to the cascading standby, so that
2616  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2617  * replay the partial WAL record either, so it can still follow our
2618  * timeline switch.
2619  */
2621  {
2622  /* close the current file. */
2623  if (sendFile >= 0)
2624  close(sendFile);
2625  sendFile = -1;
2626 
2627  /* Send CopyDone */
2628  pq_putmessage_noblock('c', NULL, 0);
2629  streamingDoneSending = true;
2630 
2631  WalSndCaughtUp = true;
2632 
2633  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2635  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2636  return;
2637  }
2638 
2639  /* Do we have any work to do? */
2640  Assert(sentPtr <= SendRqstPtr);
2641  if (SendRqstPtr <= sentPtr)
2642  {
2643  WalSndCaughtUp = true;
2644  return;
2645  }
2646 
2647  /*
2648  * Figure out how much to send in one message. If there's no more than
2649  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2650  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2651  *
2652  * The rounding is not only for performance reasons. Walreceiver relies on
2653  * the fact that we never split a WAL record across two messages. Since a
2654  * long WAL record is split at page boundary into continuation records,
2655  * page boundary is always a safe cut-off point. We also assume that
2656  * SendRqstPtr never points to the middle of a WAL record.
2657  */
2658  startptr = sentPtr;
2659  endptr = startptr;
2660  endptr += MAX_SEND_SIZE;
2661 
2662  /* if we went beyond SendRqstPtr, back off */
2663  if (SendRqstPtr <= endptr)
2664  {
2665  endptr = SendRqstPtr;
2667  WalSndCaughtUp = false;
2668  else
2669  WalSndCaughtUp = true;
2670  }
2671  else
2672  {
2673  /* round down to page boundary. */
2674  endptr -= (endptr % XLOG_BLCKSZ);
2675  WalSndCaughtUp = false;
2676  }
2677 
2678  nbytes = endptr - startptr;
2679  Assert(nbytes <= MAX_SEND_SIZE);
2680 
2681  /*
2682  * OK to read and send the slice.
2683  */
2684  resetStringInfo(&output_message);
2685  pq_sendbyte(&output_message, 'w');
2686 
2687  pq_sendint64(&output_message, startptr); /* dataStart */
2688  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2689  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2690 
2691  /*
2692  * Read the log directly into the output buffer to avoid extra memcpy
2693  * calls.
2694  */
2695  enlargeStringInfo(&output_message, nbytes);
2696  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2697  output_message.len += nbytes;
2698  output_message.data[output_message.len] = '\0';
2699 
2700  /*
2701  * Fill the send timestamp last, so that it is taken as late as possible.
2702  */
2703  resetStringInfo(&tmpbuf);
2704  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2705  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2706  tmpbuf.data, sizeof(int64));
2707 
2708  pq_putmessage_noblock('d', output_message.data, output_message.len);
2709 
2710  sentPtr = endptr;
2711 
2712  /* Update shared memory status */
2713  {
2714  WalSnd *walsnd = MyWalSnd;
2715 
2716  SpinLockAcquire(&walsnd->mutex);
2717  walsnd->sentPtr = sentPtr;
2718  SpinLockRelease(&walsnd->mutex);
2719  }
2720 
2721  /* Report progress of XLOG streaming in PS display */
2723  {
2724  char activitymsg[50];
2725 
2726  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2727  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2728  set_ps_display(activitymsg, false);
2729  }
2730 
2731  return;
2732 }
2733 
2734 /*
2735  * Stream out logically decoded data.
2736  */
2737 static void
2739 {
2740  XLogRecord *record;
2741  char *errm;
2742 
2743  /*
2744  * Don't know whether we've caught up yet. We'll set it to true in
2745  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2746  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2747  * i.e. when we're shutting down.
2748  */
2749  WalSndCaughtUp = false;
2750 
2751  record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2753 
2754  /* xlog record was invalid */
2755  if (errm != NULL)
2756  elog(ERROR, "%s", errm);
2757 
2758  if (record != NULL)
2759  {
2760  /*
2761  * Note the lack of any call to LagTrackerWrite() which is handled by
2762  * WalSndUpdateProgress which is called by output plugin through
2763  * logical decoding write api.
2764  */
2765  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2766 
2767  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2768  }
2769  else
2770  {
2771  /*
2772  * If the record we just wanted read is at or beyond the flushed
2773  * point, then we're caught up.
2774  */
2775  if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
2776  WalSndCaughtUp = true;
2777  }
2778 
2779  /* Update shared memory status */
2780  {
2781  WalSnd *walsnd = MyWalSnd;
2782 
2783  SpinLockAcquire(&walsnd->mutex);
2784  walsnd->sentPtr = sentPtr;
2785  SpinLockRelease(&walsnd->mutex);
2786  }
2787 }
2788 
2789 /*
2790  * Shutdown if the sender is caught up.
2791  *
2792  * NB: This should only be called when the shutdown signal has been received
2793  * from postmaster.
2794  *
2795  * Note that if we determine that there's still more data to send, this
2796  * function will return control to the caller.
2797  */
2798 static void
2800 {
2801  XLogRecPtr replicatedPtr;
2802 
2803  /* ... let's just be real sure we're caught up ... */
2804  send_data();
2805 
2806  /*
2807  * To figure out whether all WAL has successfully been replicated, check
2808  * flush location if valid, write otherwise. Tools like pg_receivewal will
2809  * usually (unless in synchronous mode) return an invalid flush location.
2810  */
2811  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2812  MyWalSnd->write : MyWalSnd->flush;
2813 
2814  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2815  !pq_is_send_pending())
2816  {
2817  /* Inform the standby that XLOG streaming is done */
2818  EndCommand("COPY 0", DestRemote);
2819  pq_flush();
2820 
2821  proc_exit(0);
2822  }
2824  {
2825  WalSndKeepalive(true);
2827  }
2828 }
2829 
2830 /*
2831  * Returns the latest point in WAL that has been safely flushed to disk, and
2832  * can be sent to the standby. This should only be called when in recovery,
2833  * ie. we're streaming to a cascaded standby.
2834  *
2835  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2836  * replayed WAL record.
2837  */
2838 static XLogRecPtr
2840 {
2841  XLogRecPtr replayPtr;
2842  TimeLineID replayTLI;
2843  XLogRecPtr receivePtr;
2846 
2847  /*
2848  * We can safely send what's already been replayed. Also, if walreceiver
2849  * is streaming WAL from the same timeline, we can send anything that it
2850  * has streamed, but hasn't been replayed yet.
2851  */
2852 
2853  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2854  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2855 
2856  ThisTimeLineID = replayTLI;
2857 
2858  result = replayPtr;
2859  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2860  result = receivePtr;
2861 
2862  return result;
2863 }
2864 
2865 /*
2866  * Request walsenders to reload the currently-open WAL file
2867  */
2868 void
2870 {
2871  int i;
2872 
2873  for (i = 0; i < max_wal_senders; i++)
2874  {
2875  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2876 
2877  if (walsnd->pid == 0)
2878  continue;
2879 
2880  SpinLockAcquire(&walsnd->mutex);
2881  walsnd->needreload = true;
2882  SpinLockRelease(&walsnd->mutex);
2883  }
2884 }
2885 
2886 /* SIGHUP: set flag to re-read config file at next convenient time */
2887 static void
2889 {
2890  int save_errno = errno;
2891 
2892  got_SIGHUP = true;
2893 
2894  SetLatch(MyLatch);
2895 
2896  errno = save_errno;
2897 }
2898 
2899 /* SIGUSR1: set flag to send WAL records */
2900 static void
2902 {
2903  int save_errno = errno;
2904 
2906 
2907  errno = save_errno;
2908 }
2909 
2910 /* SIGUSR2: set flag to switch to stopping state */
2911 static void
2913 {
2914  int save_errno = errno;
2915 
2916  got_SIGUSR2 = true;
2917  SetLatch(MyLatch);
2918 
2919  errno = save_errno;
2920 }
2921 
2922 /*
2923  * SIGINT: set flag to do a last cycle and shut down afterwards. The WAL
2924  * sender should already have been switched to WALSNDSTATE_STOPPING at
2925  * this point.
2926  */
2927 static void
2929 {
2930  int save_errno = errno;
2931 
2932  /*
2933  * If replication has not yet started, die like with SIGTERM. If
2934  * replication is active, only set a flag and wake up the main loop. It
2935  * will send any outstanding WAL, wait for it to be replicated to the
2936  * standby, and then exit gracefully.
2937  */
2938  if (!replication_active)
2939  kill(MyProcPid, SIGTERM);
2940 
2941  got_SIGINT = true;
2942  SetLatch(MyLatch);
2943 
2944  errno = save_errno;
2945 }
2946 
2947 /* Set up signal handlers */
2948 void
2950 {
2951  /* Set up signal handlers */
2952  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2953  * file */
2954  pqsignal(SIGINT, WalSndLastCycleHandler); /* request a last cycle and
2955  * shutdown */
2956  pqsignal(SIGTERM, die); /* request shutdown */
2957  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2958  InitializeTimeouts(); /* establishes SIGALRM handler */
2960  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2961  pqsignal(SIGUSR2, WalSndSwitchStopping); /* switch to stopping state */
2962 
2963  /* Reset some signals that are accepted by postmaster but not here */
2969 }
2970 
2971 /* Report shared-memory space needed by WalSndShmemInit */
2972 Size
2974 {
2975  Size size = 0;
2976 
2977  size = offsetof(WalSndCtlData, walsnds);
2978  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2979 
2980  return size;
2981 }
2982 
2983 /* Allocate and initialize walsender-related shared memory */
2984 void
2986 {
2987  bool found;
2988  int i;
2989 
2990  WalSndCtl = (WalSndCtlData *)
2991  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2992 
2993  if (!found)
2994  {
2995  /* First time through, so initialize */
2996  MemSet(WalSndCtl, 0, WalSndShmemSize());
2997 
2998  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2999  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3000 
3001  for (i = 0; i < max_wal_senders; i++)
3002  {
3003  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3004 
3005  SpinLockInit(&walsnd->mutex);
3006  }
3007  }
3008 }
3009 
3010 /*
3011  * Wake up all walsenders
3012  *
3013  * This will be called inside critical sections, so throwing an error is not
3014  * advisable.
3015  */
3016 void
3018 {
3019  int i;
3020 
3021  for (i = 0; i < max_wal_senders; i++)
3022  {
3023  Latch *latch;
3024  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3025 
3026  /*
3027  * Get latch pointer with spinlock held, for the unlikely case that
3028  * pointer reads aren't atomic (as they're 8 bytes).
3029  */
3030  SpinLockAcquire(&walsnd->mutex);
3031  latch = walsnd->latch;
3032  SpinLockRelease(&walsnd->mutex);
3033 
3034  if (latch != NULL)
3035  SetLatch(latch);
3036  }
3037 }
3038 
3039 /*
3040  * Wait that all the WAL senders have reached the stopping state. This is
3041  * used by the checkpointer to control when shutdown checkpoints can
3042  * safely begin.
3043  */
3044 void
3046 {
3047  for (;;)
3048  {
3049  int i;
3050  bool all_stopped = true;
3051 
3052  for (i = 0; i < max_wal_senders; i++)
3053  {
3055  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3056 
3057  SpinLockAcquire(&walsnd->mutex);
3058 
3059  if (walsnd->pid == 0)
3060  {
3061  SpinLockRelease(&walsnd->mutex);
3062  continue;
3063  }
3064 
3065  state = walsnd->state;
3066  SpinLockRelease(&walsnd->mutex);
3067 
3068  if (state != WALSNDSTATE_STOPPING)
3069  {
3070  all_stopped = false;
3071  break;
3072  }
3073  }
3074 
3075  /* safe to leave if confirmation is done for all WAL senders */
3076  if (all_stopped)
3077  return;
3078 
3079  pg_usleep(10000L); /* wait for 10 msec */
3080  }
3081 }
3082 
3083 /* Set state for current walsender (only called in walsender) */
3084 void
3086 {
3087  WalSnd *walsnd = MyWalSnd;
3088 
3090 
3091  if (walsnd->state == state)
3092  return;
3093 
3094  SpinLockAcquire(&walsnd->mutex);
3095  walsnd->state = state;
3096  SpinLockRelease(&walsnd->mutex);
3097 }
3098 
3099 /*
3100  * Return a string constant representing the state. This is used
3101  * in system views, and should *not* be translated.
3102  */
3103 static const char *
3105 {
3106  switch (state)
3107  {
3108  case WALSNDSTATE_STARTUP:
3109  return "startup";
3110  case WALSNDSTATE_BACKUP:
3111  return "backup";
3112  case WALSNDSTATE_CATCHUP:
3113  return "catchup";
3114  case WALSNDSTATE_STREAMING:
3115  return "streaming";
3116  case WALSNDSTATE_STOPPING:
3117  return "stopping";
3118  }
3119  return "UNKNOWN";
3120 }
3121 
3122 static Interval *
3124 {
3125  Interval *result = palloc(sizeof(Interval));
3126 
3127  result->month = 0;
3128  result->day = 0;
3129  result->time = offset;
3130 
3131  return result;
3132 }
3133 
3134 /*
3135  * Returns activity of walsenders, including pids and xlog locations sent to
3136  * standby servers.
3137  */
3138 Datum
3140 {
3141 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3142  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3143  TupleDesc tupdesc;
3144  Tuplestorestate *tupstore;
3145  MemoryContext per_query_ctx;
3146  MemoryContext oldcontext;
3147  List *sync_standbys;
3148  int i;
3149 
3150  /* check to see if caller supports us returning a tuplestore */
3151  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3152  ereport(ERROR,
3153  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3154  errmsg("set-valued function called in context that cannot accept a set")));
3155  if (!(rsinfo->allowedModes & SFRM_Materialize))
3156  ereport(ERROR,
3157  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3158  errmsg("materialize mode required, but it is not " \
3159  "allowed in this context")));
3160 
3161  /* Build a tuple descriptor for our result type */
3162  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3163  elog(ERROR, "return type must be a row type");
3164 
3165  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3166  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3167 
3168  tupstore = tuplestore_begin_heap(true, false, work_mem);
3169  rsinfo->returnMode = SFRM_Materialize;
3170  rsinfo->setResult = tupstore;
3171  rsinfo->setDesc = tupdesc;
3172 
3173  MemoryContextSwitchTo(oldcontext);
3174 
3175  /*
3176  * Get the currently active synchronous standbys.
3177  */
3178  LWLockAcquire(SyncRepLock, LW_SHARED);
3179  sync_standbys = SyncRepGetSyncStandbys(NULL);
3180  LWLockRelease(SyncRepLock);
3181 
3182  for (i = 0; i < max_wal_senders; i++)
3183  {
3184  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3186  XLogRecPtr write;
3187  XLogRecPtr flush;
3188  XLogRecPtr apply;
3189  TimeOffset writeLag;
3190  TimeOffset flushLag;
3191  TimeOffset applyLag;
3192  int priority;
3195  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3196 
3197  if (walsnd->pid == 0)
3198  continue;
3199 
3200  SpinLockAcquire(&walsnd->mutex);
3201  sentPtr = walsnd->sentPtr;
3202  state = walsnd->state;
3203  write = walsnd->write;
3204  flush = walsnd->flush;
3205  apply = walsnd->apply;
3206  writeLag = walsnd->writeLag;
3207  flushLag = walsnd->flushLag;
3208  applyLag = walsnd->applyLag;
3209  priority = walsnd->sync_standby_priority;
3210  SpinLockRelease(&walsnd->mutex);
3211 
3212  memset(nulls, 0, sizeof(nulls));
3213  values[0] = Int32GetDatum(walsnd->pid);
3214 
3215  if (!superuser())
3216  {
3217  /*
3218  * Only superusers can see details. Other users only get the pid
3219  * value to know it's a walsender, but no details.
3220  */
3221  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3222  }
3223  else
3224  {
3225  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3226 
3227  if (XLogRecPtrIsInvalid(sentPtr))
3228  nulls[2] = true;
3229  values[2] = LSNGetDatum(sentPtr);
3230 
3231  if (XLogRecPtrIsInvalid(write))
3232  nulls[3] = true;
3233  values[3] = LSNGetDatum(write);
3234 
3235  if (XLogRecPtrIsInvalid(flush))
3236  nulls[4] = true;
3237  values[4] = LSNGetDatum(flush);
3238 
3239  if (XLogRecPtrIsInvalid(apply))
3240  nulls[5] = true;
3241  values[5] = LSNGetDatum(apply);
3242 
3243  /*
3244  * Treat a standby such as a pg_basebackup background process
3245  * which always returns an invalid flush location, as an
3246  * asynchronous standby.
3247  */
3248  priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
3249 
3250  if (writeLag < 0)
3251  nulls[6] = true;
3252  else
3253  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3254 
3255  if (flushLag < 0)
3256  nulls[7] = true;
3257  else
3258  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3259 
3260  if (applyLag < 0)
3261  nulls[8] = true;
3262  else
3263  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3264 
3265  values[9] = Int32GetDatum(priority);
3266 
3267  /*
3268  * More easily understood version of standby state. This is purely
3269  * informational.
3270  *
3271  * In quorum-based sync replication, the role of each standby
3272  * listed in synchronous_standby_names can be changing very
3273  * frequently. Any standbys considered as "sync" at one moment can
3274  * be switched to "potential" ones at the next moment. So, it's
3275  * basically useless to report "sync" or "potential" as their sync
3276  * states. We report just "quorum" for them.
3277  */
3278  if (priority == 0)
3279  values[10] = CStringGetTextDatum("async");
3280  else if (list_member_int(sync_standbys, i))
3282  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3283  else
3284  values[10] = CStringGetTextDatum("potential");
3285  }
3286 
3287  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3288  }
3289 
3290  /* clean up and return the tuplestore */
3291  tuplestore_donestoring(tupstore);
3292 
3293  return (Datum) 0;
3294 }
3295 
3296 /*
3297  * This function is used to send a keepalive message to standby.
3298  * If requestReply is set, sets a flag in the message requesting the standby
3299  * to send a message back to us, for heartbeat purposes.
3300  */
3301 static void
3302 WalSndKeepalive(bool requestReply)
3303 {
3304  elog(DEBUG2, "sending replication keepalive");
3305 
3306  /* construct the message... */
3307  resetStringInfo(&output_message);
3308  pq_sendbyte(&output_message, 'k');
3309  pq_sendint64(&output_message, sentPtr);
3310  pq_sendint64(&output_message, GetCurrentTimestamp());
3311  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3312 
3313  /* ... and send it wrapped in CopyData */
3314  pq_putmessage_noblock('d', output_message.data, output_message.len);
3315 }
3316 
3317 /*
3318  * Send keepalive message if too much time has elapsed.
3319  */
3320 static void
3322 {
3323  TimestampTz ping_time;
3324 
3325  /*
3326  * Don't send keepalive messages if timeouts are globally disabled or
3327  * we're doing something not partaking in timeouts.
3328  */
3329  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3330  return;
3331 
3333  return;
3334 
3335  /*
3336  * If half of wal_sender_timeout has lapsed without receiving any reply
3337  * from the standby, send a keep-alive message to the standby requesting
3338  * an immediate reply.
3339  */
3341  wal_sender_timeout / 2);
3342  if (now >= ping_time)
3343  {
3344  WalSndKeepalive(true);
3346 
3347  /* Try to flush pending output to the client */
3348  if (pq_flush_if_writable() != 0)
3349  WalSndShutdown();
3350  }
3351 }
3352 
3353 /*
3354  * Record the end of the WAL and the time it was flushed locally, so that
3355  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3356  * eventually reported to have been written, flushed and applied by the
3357  * standby in a reply message.
3358  */
3359 static void
3361 {
3362  bool buffer_full;
3363  int new_write_head;
3364  int i;
3365 
3366  if (!am_walsender)
3367  return;
3368 
3369  /*
3370  * If the lsn hasn't advanced since last time, then do nothing. This way
3371  * we only record a new sample when new WAL has been written.
3372  */
3373  if (LagTracker.last_lsn == lsn)
3374  return;
3375  LagTracker.last_lsn = lsn;
3376 
3377  /*
3378  * If advancing the write head of the circular buffer would crash into any
3379  * of the read heads, then the buffer is full. In other words, the
3380  * slowest reader (presumably apply) is the one that controls the release
3381  * of space.
3382  */
3383  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3384  buffer_full = false;
3385  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3386  {
3387  if (new_write_head == LagTracker.read_heads[i])
3388  buffer_full = true;
3389  }
3390 
3391  /*
3392  * If the buffer is full, for now we just rewind by one slot and overwrite
3393  * the last sample, as a simple (if somewhat uneven) way to lower the
3394  * sampling rate. There may be better adaptive compaction algorithms.
3395  */
3396  if (buffer_full)
3397  {
3398  new_write_head = LagTracker.write_head;
3399  if (LagTracker.write_head > 0)
3400  LagTracker.write_head--;
3401  else
3402  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3403  }
3404 
3405  /* Store a sample at the current write head position. */
3406  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3407  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3408  LagTracker.write_head = new_write_head;
3409 }
3410 
3411 /*
3412  * Find out how much time has elapsed between the moment WAL location 'lsn'
3413  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3414  * We have a separate read head for each of the reported LSN locations we
3415  * receive in replies from standby; 'head' controls which read head is
3416  * used. Whenever a read head crosses an LSN which was written into the
3417  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3418  * find out the time this LSN (or an earlier one) was flushed locally, and
3419  * therefore compute the lag.
3420  *
3421  * Return -1 if no new sample data is available, and otherwise the elapsed
3422  * time in microseconds.
3423  */
3424 static TimeOffset
3426 {
3427  TimestampTz time = 0;
3428 
3429  /* Read all unread samples up to this LSN or end of buffer. */
3430  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3431  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3432  {
3433  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3434  LagTracker.last_read[head] =
3435  LagTracker.buffer[LagTracker.read_heads[head]];
3436  LagTracker.read_heads[head] =
3437  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3438  }
3439 
3440  if (time > now)
3441  {
3442  /* If the clock somehow went backwards, treat as not found. */
3443  return -1;
3444  }
3445  else if (time == 0)
3446  {
3447  /*
3448  * We didn't cross a time. If there is a future sample that we
3449  * haven't reached yet, and we've already reached at least one sample,
3450  * let's interpolate the local flushed time. This is mainly useful
3451  * for reporting a completely stuck apply position as having
3452  * increasing lag, since otherwise we'd have to wait for it to
3453  * eventually start moving again and cross one of our samples before
3454  * we can show the lag increasing.
3455  */
3456  if (LagTracker.read_heads[head] != LagTracker.write_head &&
3457  LagTracker.last_read[head].time != 0)
3458  {
3459  double fraction;
3460  WalTimeSample prev = LagTracker.last_read[head];
3461  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3462 
3463  if (lsn < prev.lsn)
3464  {
3465  /*
3466  * Reported LSNs shouldn't normally go backwards, but it's
3467  * possible when there is a timeline change. Treat as not
3468  * found.
3469  */
3470  return -1;
3471  }
3472 
3473  Assert(prev.lsn < next.lsn);
3474 
3475  if (prev.time > next.time)
3476  {
3477  /* If the clock somehow went backwards, treat as not found. */
3478  return -1;
3479  }
3480 
3481  /* See how far we are between the previous and next samples. */
3482  fraction =
3483  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3484 
3485  /* Scale the local flush time proportionally. */
3486  time = (TimestampTz)
3487  ((double) prev.time + (next.time - prev.time) * fraction);
3488  }
3489  else
3490  {
3491  /* Couldn't interpolate due to lack of data. */
3492  return -1;
3493  }
3494  }
3495 
3496  /* Return the elapsed time since local flush time in microseconds. */
3497  Assert(time != 0);
3498  return now - time;
3499 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2901
static void InitWalSenderSlot(void)
Definition: walsender.c:2215
void InitializeTimeouts(void)
Definition: timeout.c:340
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:69
#define XLogSegSize
Definition: xlog_internal.h:92
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void ProcessStandbyMessage(void)
Definition: walsender.c:1672
#define SIGUSR1
Definition: win32.h:202
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
XLogRecPtr startpoint
Definition: replnodes.h:84
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:38
int wal_sender_timeout
Definition: walsender.c:118
#define SIGCONT
Definition: win32.h:197
uint32 TimeLineID
Definition: xlogdefs.h:45
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:210
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
struct Port * MyProcPort
Definition: globals.c:40
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
uint32 TransactionId
Definition: c.h:397
bool wake_wal_senders
Definition: walsender.c:125
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1413
bool update_process_title
Definition: ps_status.c:35
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
TransactionId xmin
Definition: proc.h:213
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2928
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:429
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:789
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1036
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
#define SIGWINCH
Definition: win32.h:201
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
Size WalSndShmemSize(void)
Definition: walsender.c:2973
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
void CommitTransactionCommand(void)
Definition: xact.c:2749
#define XACT_REPEATABLE_READ
Definition: xact.h:30
#define SpinLockInit(lock)
Definition: spin.h:60
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2142
uint8 syncrep_method
Definition: syncrep.h:51
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static int sendFile
Definition: walsender.c:131
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:370
static void XLogSendPhysical(void)
Definition: walsender.c:2479
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
#define SIGTTIN
Definition: win32.h:199
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:220
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
#define INT4OID
Definition: pg_type.h:316
Definition: nodes.h:509
static StringInfoData output_message
Definition: walsender.c:156
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
struct cursor * cur
Definition: ecpg.c:28
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:857
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2276
return result
Definition: formatting.c:1632
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2799
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:579
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8222
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:677
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:216
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:214
TimestampTz time
Definition: walsender.c:201
int sleeptime
Definition: pg_standby.c:40
ReplicationSlotPersistentData data
Definition: slot.h:115
TimeOffset writeLag
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7873
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
XLogRecPtr last_lsn
Definition: walsender.c:210
void list_free_deep(List *list)
Definition: list.c:1147
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:118
#define PG_BINARY
Definition: c.h:1038
#define SIGQUIT
Definition: win32.h:189
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7894
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
Latch procLatch
Definition: proc.h:103
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:799
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr confirmed_flush
Definition: slot.h:76
PGXACT * MyPgXact
Definition: proc.c:68
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
int32 day
Definition: timestamp.h:47
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1876
TimeLineID timeline
Definition: replnodes.h:96
ReplicationKind kind
Definition: replnodes.h:56
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:754
bool IsTransactionBlock(void)
Definition: xact.c:4306
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2888
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:635
bool am_walsender
Definition: walsender.c:111
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void ReplicationSlotReserveWal(void)
Definition: slot.c:926
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:432
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:682
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8291
ReplicationKind kind
Definition: replnodes.h:81
void WalSndRqstFileReload(void)
Definition: walsender.c:2869
#define SIG_IGN
Definition: win32.h:185
void ConditionVariableCancelSleep(void)
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:182
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3302
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:568
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
void pq_startmsgread(void)
Definition: pqcomm.c:1191
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1703
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3766
bool FirstSnapshotSet
Definition: snapmgr.c:203
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void WalSndSignals(void)
Definition: walsender.c:2949
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:686
Latch * latch
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11084
void ReplicationSlotPersist(void)
Definition: slot.c:614
TransactionId effective_xmin
Definition: slot.h:111
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1021
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:194
static TimeLineID curFileTimeLine
Definition: walsender.c:136
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define XLogFilePath(path, tli, logSegNo)
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2056
#define DEBUG2
Definition: elog.h:24
bool list_member_int(const List *list, int datum)
Definition: list.c:485
Definition: latch.h:110
Definition: dest.h:88
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
WalSndState state
NodeTag type
Definition: nodes.h:511
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10126
static char * buf
Definition: pg_test_fsync.c:66
void WalSndWaitStopping(void)
Definition: walsender.c:3045
static bool streamingDoneSending
Definition: walsender.c:175
uint64 XLogSegNo
Definition: xlogdefs.h:34
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:598
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:525
TransactionId catalog_xmin
Definition: slot.h:65
#define pq_flush_if_writable()
Definition: libpq.h:40
TimeOffset time
Definition: timestamp.h:45
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:175
#define InvalidTransactionId
Definition: transam.h:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:245
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1162
static XLogRecPtr logical_startptr
Definition: walsender.c:195
unsigned int uint32
Definition: c.h:268
void ReplicationSlotRelease(void)
Definition: slot.c:375
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2304
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TransactionId xmin
Definition: slot.h:57
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:184
#define SlotIsLogical(slot)
Definition: slot.h:134
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1791
void SyncRepInitConfig(void)
Definition: syncrep.c:381
#define ereport(elevel, rest)
Definition: elog.h:122
static struct @27 LagTracker
static bool WalSndCaughtUp
Definition: walsender.c:179
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3360
XLogRecPtr lsn
Definition: walsender.c:200
int replication_yyparse(void)
Definition: guc.h:72
int32 month
Definition: timestamp.h:48
int max_wal_senders
Definition: walsender.c:117
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2072
int CloseTransientFile(int fd)
Definition: fd.c:2305
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1135
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1253
static StringInfoData reply_message
Definition: walsender.c:157
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:181
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1274
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3425
void WalSndErrorCleanup(void)
Definition: walsender.c:291
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:224
static volatile sig_atomic_t replication_active
Definition: walsender.c:192
uintptr_t Datum
Definition: postgres.h:372
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3104
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
bool am_db_walsender
Definition: walsender.c:114
TransactionId effective_catalog_xmin
Definition: slot.h:112
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1047
Oid MyDatabaseId
Definition: globals.c:76
static void WalSndShutdown(void)
Definition: walsender.c:228
static TimeLineID receiveTLI
Definition: xlog.c:201
int work_mem
Definition: globals.c:112
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
WalSnd * MyWalSnd
Definition: walsender.c:108
static XLogRecPtr sentPtr
Definition: walsender.c:153
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
void ReplicationSlotDrop(const char *name)
Definition: slot.c:456
void pq_endmsgread(void)
Definition: pqcomm.c:1215
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:179
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:213
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define SIG_DFL
Definition: win32.h:183
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:343
int allowedModes
Definition: execnodes.h:268
TimeLineID currTLI
Definition: xlogreader.h:165
#define INT8OID
Definition: pg_type.h:304
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3321
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:46
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2003
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:229
static TimeLineID sendTimeLine
Definition: walsender.c:144
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2045
#define lfirst(lc)
Definition: pg_list.h:106
void WalSndSetState(WalSndState state)
Definition: walsender.c:3085
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
int sync_standby_priority
Definition: regguts.h:298
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:475
static void XLogSendLogical(void)
Definition: walsender.c:2738
XLogRecPtr restart_lsn
Definition: slot.h:68
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:326
void StartTransactionCommand(void)
Definition: xact.c:2679
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:211
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1735
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
char * dbname
Definition: streamutil.c:38
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:205
int XactIsoLevel
Definition: xact.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:145
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:147
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:914
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
WalSndState
static XLogSegNo sendSegNo
Definition: walsender.c:132
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3139
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:842
static bool streamingDoneReceiving
Definition: walsender.c:176
#define BYTEAOID
Definition: pg_type.h:292
Tuplestorestate * setResult
Definition: execnodes.h:273
#define pg_attribute_noreturn()
Definition: c.h:653
static StringInfoData tmpbuf
Definition: walsender.c:158
#define SIGTTOU
Definition: win32.h:200
bool IsSubTransaction(void)
Definition: xact.c:4378
static void WalSndSwitchStopping(SIGNAL_ARGS)
Definition: walsender.c:2912
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:545
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:266
bool log_replication_commands
Definition: walsender.c:120
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4690
#define Int32GetDatum(X)
Definition: postgres.h:485
char * application_name
Definition: guc.c:469
TupleDesc setDesc
Definition: execnodes.h:274
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:115
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogReaderState * reader
Definition: logical.h:44
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
void die(SIGNAL_ARGS)
Definition: postgres.c:2623
CRSSnapshotAction
Definition: walsender.h:22
StringInfo out
Definition: logical.h:66
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
int i
void WalSndShmemInit(void)
Definition: walsender.c:2985
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define NameStr(name)
Definition: c.h:499
TimeLineID timeline
Definition: replnodes.h:83
static volatile sig_atomic_t got_SIGINT
Definition: walsender.c:183
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1827
struct Latch * MyLatch
Definition: globals.c:51
void ReplicationSlotCleanup(void)
Definition: slot.c:429
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
char * defname
Definition: parsenodes.h:720
static uint32 sendOff
Definition: walsender.c:133
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2839
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
#define elog
Definition: elog.h:219
void LWLockReleaseAll(void)
Definition: lwlock.c:1814
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3123
slock_t mutex
Definition: slot.h:88
#define close(a)
Definition: win32.h:12
void latch_sigusr1_handler(void)
Definition: latch.c:1467
CommandDest whereToSendOutput
Definition: postgres.c:86
#define SIGCHLD
Definition: win32.h:198
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
#define XLByteInSeg(xlrp, logSegNo)
static bool waiting_for_ping_response
Definition: walsender.c:167
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:400
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:340
Definition: pg_list.h:45
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2562
#define UINT64_FORMAT
Definition: c.h:316
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:636
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:409
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
int write_head
Definition: walsender.c:212
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1570
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
#define MAX_SEND_SIZE
Definition: walsender.c:102
#define read(a, b, c)
Definition: win32.h:13
#define SIGUSR2
Definition: win32.h:203
static bool sendTimeLineIsHistoric
Definition: walsender.c:146
#define offsetof(type, field)
Definition: c.h:555
void WalSndWakeup(void)
Definition: walsender.c:3017
void ReplicationSlotMarkDirty(void)
Definition: slot.c:597
bool am_cascading_walsender
Definition: walsender.c:112
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1904
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:696
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1252
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3156
int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:936
void replication_scanner_init(const char *query_string)
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define TLHistoryFilePath(path, tli)