PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walreceiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walreceiver.c
4  *
5  * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6  * is the process in the standby server that takes charge of receiving
7  * XLOG records from a primary server during streaming replication.
8  *
9  * When the startup process determines that it's time to start streaming,
10  * it instructs postmaster to start walreceiver. Walreceiver first connects
11  * to the primary server (it will be served by a walsender process
12  * in the primary server), and then keeps receiving XLOG records and
13  * writing them to the disk as long as the connection is alive. As XLOG
14  * records are received and flushed to disk, it updates the
15  * WalRcv->receivedUpto variable in shared memory, to inform the startup
16  * process of how far it can proceed with XLOG replay.
17  *
18  * If the primary server ends streaming, but doesn't disconnect, walreceiver
19  * goes into "waiting" mode, and waits for the startup process to give new
20  * instructions. The startup process will treat that the same as
21  * disconnection, and will rescan the archive/pg_wal directory. But when the
22  * startup process wants to try streaming replication again, it will just
23  * nudge the existing walreceiver process that's waiting, instead of launching
24  * a new one.
25  *
26  * Normal termination is by SIGTERM, which instructs the walreceiver to
27  * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
28  * process, the walreceiver will simply abort and exit on SIGQUIT. A close
29  * of the connection and a FATAL error are treated not as a crash but as
30  * normal operation.
31  *
32  * This file contains the server-facing parts of walreceiver. The libpq-
33  * specific parts are in the libpqwalreceiver module. It's loaded
34  * dynamically to avoid linking the server with libpq.
35  *
36  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
37  *
38  *
39  * IDENTIFICATION
40  * src/backend/replication/walreceiver.c
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45 
46 #include <signal.h>
47 #include <unistd.h>
48 
49 #include "access/htup_details.h"
50 #include "access/timeline.h"
51 #include "access/transam.h"
52 #include "access/xlog_internal.h"
53 #include "catalog/pg_type.h"
54 #include "funcapi.h"
55 #include "libpq/pqformat.h"
56 #include "libpq/pqsignal.h"
57 #include "miscadmin.h"
58 #include "pgstat.h"
60 #include "replication/walsender.h"
61 #include "storage/ipc.h"
62 #include "storage/pmsignal.h"
63 #include "storage/procarray.h"
64 #include "utils/builtins.h"
65 #include "utils/guc.h"
66 #include "utils/pg_lsn.h"
67 #include "utils/ps_status.h"
68 #include "utils/resowner.h"
69 #include "utils/timestamp.h"
70 
71 
72 /* GUC variables */
76 
77 /* libpqwalreceiver connection */
80 
81 #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
82 
83 /*
84  * These variables are used similarly to openLogFile/SegNo/Off,
85  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
86  * corresponding the filename of recvFile.
87  */
88 static int recvFile = -1;
90 static XLogSegNo recvSegNo = 0;
91 static uint32 recvOff = 0;
92 
93 /*
94  * Flags set by interrupt handlers of walreceiver for later service in the
95  * main loop.
96  */
97 static volatile sig_atomic_t got_SIGHUP = false;
98 static volatile sig_atomic_t got_SIGTERM = false;
99 
100 /*
101  * LogstreamResult indicates the byte positions that we have already
102  * written/fsynced.
103  */
104 static struct
105 {
106  XLogRecPtr Write; /* last byte + 1 written out in the standby */
107  XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
109 
112 
113 /*
114  * About SIGTERM handling:
115  *
116  * We can't just exit(1) within SIGTERM signal handler, because the signal
117  * might arrive in the middle of some critical operation, like while we're
118  * holding a spinlock. We also can't just set a flag in signal handler and
119  * check it in the main loop, because we perform some blocking operations
120  * like libpqrcv_PQexec(), which can take a long time to finish.
121  *
122  * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
123  * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
124  * sets got_SIGTERM flag, which is checked in the main loop when convenient.
125  *
126  * This is very much like what regular backends do with ImmediateInterruptOK,
127  * ProcessInterrupts() etc.
128  */
129 static volatile bool WalRcvImmediateInterruptOK = false;
130 
131 /* Prototypes for private functions */
132 static void ProcessWalRcvInterrupts(void);
133 static void EnableWalRcvImmediateExit(void);
134 static void DisableWalRcvImmediateExit(void);
135 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
136 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
137 static void WalRcvDie(int code, Datum arg);
138 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
139 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
140 static void XLogWalRcvFlush(bool dying);
141 static void XLogWalRcvSendReply(bool force, bool requestReply);
142 static void XLogWalRcvSendHSFeedback(bool immed);
143 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
144 
145 /* Signal handlers */
146 static void WalRcvSigHupHandler(SIGNAL_ARGS);
147 static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
150 
151 
152 static void
154 {
155  /*
156  * Although walreceiver interrupt handling doesn't use the same scheme as
157  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
158  * any incoming signals on Win32.
159  */
161 
162  if (got_SIGTERM)
163  {
165  ereport(FATAL,
166  (errcode(ERRCODE_ADMIN_SHUTDOWN),
167  errmsg("terminating walreceiver process due to administrator command")));
168  }
169 }
170 
171 static void
173 {
176 }
177 
178 static void
180 {
183 }
184 
185 /* Main entry point for walreceiver process */
186 void
188 {
189  char conninfo[MAXCONNINFO];
190  char *tmp_conninfo;
191  char slotname[NAMEDATALEN];
192  XLogRecPtr startpoint;
193  TimeLineID startpointTLI;
194  TimeLineID primaryTLI;
195  bool first_stream;
196  WalRcvData *walrcv = WalRcv;
197  TimestampTz last_recv_timestamp;
198  bool ping_sent;
199  char *err;
200 
201  /*
202  * WalRcv should be set up already (if we are a backend, we inherit this
203  * by fork() or EXEC_BACKEND mechanism from the postmaster).
204  */
205  Assert(walrcv != NULL);
206 
207  /*
208  * Mark walreceiver as running in shared memory.
209  *
210  * Do this as early as possible, so that if we fail later on, we'll set
211  * state to STOPPED. If we die before this, the startup process will keep
212  * waiting for us to start up, until it times out.
213  */
214  SpinLockAcquire(&walrcv->mutex);
215  Assert(walrcv->pid == 0);
216  switch (walrcv->walRcvState)
217  {
218  case WALRCV_STOPPING:
219  /* If we've already been requested to stop, don't start up. */
220  walrcv->walRcvState = WALRCV_STOPPED;
221  /* fall through */
222 
223  case WALRCV_STOPPED:
224  SpinLockRelease(&walrcv->mutex);
225  proc_exit(1);
226  break;
227 
228  case WALRCV_STARTING:
229  /* The usual case */
230  break;
231 
232  case WALRCV_WAITING:
233  case WALRCV_STREAMING:
234  case WALRCV_RESTARTING:
235  default:
236  /* Shouldn't happen */
237  elog(PANIC, "walreceiver still running according to shared memory state");
238  }
239  /* Advertise our PID so that the startup process can kill us */
240  walrcv->pid = MyProcPid;
241  walrcv->walRcvState = WALRCV_STREAMING;
242 
243  /* Fetch information required to start streaming */
244  walrcv->ready_to_display = false;
245  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
246  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
247  startpoint = walrcv->receiveStart;
248  startpointTLI = walrcv->receiveStartTLI;
249 
250  /* Initialise to a sanish value */
252 
253  SpinLockRelease(&walrcv->mutex);
254 
255  /* Arrange to clean up at walreceiver exit */
257 
258  walrcv->latch = &MyProc->procLatch;
259 
260  /* Properly accept or ignore signals the postmaster might send us */
261  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config
262  * file */
263  pqsignal(SIGINT, SIG_IGN);
264  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
265  pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
270 
271  /* Reset some signals that are accepted by postmaster but not here */
277 
278  /* We allow SIGQUIT (quickdie) at all times */
279  sigdelset(&BlockSig, SIGQUIT);
280 
281  /* Load the libpq-specific functions */
282  load_file("libpqwalreceiver", false);
283  if (WalReceiverFunctions == NULL)
284  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
285 
286  /*
287  * Create a resource owner to keep track of our resources (not clear that
288  * we need this, but may as well have one).
289  */
290  CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
291 
292  /* Unblock signals (they were blocked when the postmaster forked us) */
294 
295  /* Establish the connection to the primary for XLOG streaming */
297  wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
298  if (!wrconn)
299  ereport(ERROR,
300  (errmsg("could not connect to the primary server: %s", err)));
302 
303  /*
304  * Save user-visible connection string. This clobbers the original
305  * conninfo, for security.
306  */
307  tmp_conninfo = walrcv_get_conninfo(wrconn);
308  SpinLockAcquire(&walrcv->mutex);
309  memset(walrcv->conninfo, 0, MAXCONNINFO);
310  if (tmp_conninfo)
311  {
312  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
313  pfree(tmp_conninfo);
314  }
315  walrcv->ready_to_display = true;
316  SpinLockRelease(&walrcv->mutex);
317 
318  first_stream = true;
319  for (;;)
320  {
321  char *primary_sysid;
322  char standby_sysid[32];
323  int server_version;
325 
326  /*
327  * Check that we're connected to a valid server using the
328  * IDENTIFY_SYSTEM replication command.
329  */
331  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
332  &server_version);
333 
334  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
336  if (strcmp(primary_sysid, standby_sysid) != 0)
337  {
338  ereport(ERROR,
339  (errmsg("database system identifier differs between the primary and standby"),
340  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
341  primary_sysid, standby_sysid)));
342  }
344 
345  /*
346  * Confirm that the current timeline of the primary is the same or
347  * ahead of ours.
348  */
349  if (primaryTLI < startpointTLI)
350  ereport(ERROR,
351  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
352  primaryTLI, startpointTLI)));
353 
354  /*
355  * Get any missing history files. We do this always, even when we're
356  * not interested in that timeline, so that if we're promoted to
357  * become the master later on, we don't select the same timeline that
358  * was already used in the current master. This isn't bullet-proof -
359  * you'll need some external software to manage your cluster if you
360  * need to ensure that a unique timeline id is chosen in every case,
361  * but let's avoid the confusion of timeline id collisions where we
362  * can.
363  */
364  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
365 
366  /*
367  * Start streaming.
368  *
369  * We'll try to start at the requested starting point and timeline,
370  * even if it's different from the server's latest timeline. In case
371  * we've already reached the end of the old timeline, the server will
372  * finish the streaming immediately, and we will go back to await
373  * orders from the startup process. If recovery_target_timeline is
374  * 'latest', the startup process will scan pg_wal and find the new
375  * history file, bump recovery target timeline, and ask us to restart
376  * on the new timeline.
377  */
378  options.logical = false;
379  options.startpoint = startpoint;
380  options.slotname = slotname[0] != '\0' ? slotname : NULL;
381  options.proto.physical.startpointTLI = startpointTLI;
382  ThisTimeLineID = startpointTLI;
383  if (walrcv_startstreaming(wrconn, &options))
384  {
385  if (first_stream)
386  ereport(LOG,
387  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
388  (uint32) (startpoint >> 32), (uint32) startpoint,
389  startpointTLI)));
390  else
391  ereport(LOG,
392  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
393  (uint32) (startpoint >> 32), (uint32) startpoint,
394  startpointTLI)));
395  first_stream = false;
396 
397  /* Initialize LogstreamResult and buffers for processing messages */
399  initStringInfo(&reply_message);
400  initStringInfo(&incoming_message);
401 
402  /* Initialize the last recv timestamp */
403  last_recv_timestamp = GetCurrentTimestamp();
404  ping_sent = false;
405 
406  /* Loop until end-of-streaming or error */
407  for (;;)
408  {
409  char *buf;
410  int len;
411  bool endofwal = false;
412  pgsocket wait_fd = PGINVALID_SOCKET;
413  int rc;
414 
415  /*
416  * Exit walreceiver if we're not in recovery. This should not
417  * happen, but cross-check the status here.
418  */
419  if (!RecoveryInProgress())
420  ereport(FATAL,
421  (errmsg("cannot continue WAL streaming, recovery has already ended")));
422 
423  /* Process any requests or signals received recently */
425 
426  if (got_SIGHUP)
427  {
428  got_SIGHUP = false;
431  }
432 
433  /* See if we can read data immediately */
434  len = walrcv_receive(wrconn, &buf, &wait_fd);
435  if (len != 0)
436  {
437  /*
438  * Process the received data, and any subsequent data we
439  * can read without blocking.
440  */
441  for (;;)
442  {
443  if (len > 0)
444  {
445  /*
446  * Something was received from master, so reset
447  * timeout
448  */
449  last_recv_timestamp = GetCurrentTimestamp();
450  ping_sent = false;
451  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
452  }
453  else if (len == 0)
454  break;
455  else if (len < 0)
456  {
457  ereport(LOG,
458  (errmsg("replication terminated by primary server"),
459  errdetail("End of WAL reached on timeline %u at %X/%X.",
460  startpointTLI,
461  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
462  endofwal = true;
463  break;
464  }
465  len = walrcv_receive(wrconn, &buf, &wait_fd);
466  }
467 
468  /* Let the master know that we received some data. */
469  XLogWalRcvSendReply(false, false);
470 
471  /*
472  * If we've written some records, flush them to disk and
473  * let the startup process and primary server know about
474  * them.
475  */
476  XLogWalRcvFlush(false);
477  }
478 
479  /* Check if we need to exit the streaming loop. */
480  if (endofwal)
481  break;
482 
483  /*
484  * Ideally we would reuse a WaitEventSet object repeatedly
485  * here to avoid the overheads of WaitLatchOrSocket on epoll
486  * systems, but we can't be sure that libpq (or any other
487  * walreceiver implementation) has the same socket (even if
488  * the fd is the same number, it may have been closed and
489  * reopened since the last time). In future, if there is a
490  * function for removing sockets from WaitEventSet, then we
491  * could add and remove just the socket each time, potentially
492  * avoiding some system calls.
493  */
494  Assert(wait_fd != PGINVALID_SOCKET);
495  rc = WaitLatchOrSocket(walrcv->latch,
498  wait_fd,
501  if (rc & WL_LATCH_SET)
502  {
503  ResetLatch(walrcv->latch);
504  if (walrcv->force_reply)
505  {
506  /*
507  * The recovery process has asked us to send apply
508  * feedback now. Make sure the flag is really set to
509  * false in shared memory before sending the reply, so
510  * we don't miss a new request for a reply.
511  */
512  walrcv->force_reply = false;
514  XLogWalRcvSendReply(true, false);
515  }
516  }
517  if (rc & WL_POSTMASTER_DEATH)
518  {
519  /*
520  * Emergency bailout if postmaster has died. This is to
521  * avoid the necessity for manual cleanup of all
522  * postmaster children.
523  */
524  exit(1);
525  }
526  if (rc & WL_TIMEOUT)
527  {
528  /*
529  * We didn't receive anything new. If we haven't heard
530  * anything from the server for more than
531  * wal_receiver_timeout / 2, ping the server. Also, if
532  * it's been longer than wal_receiver_status_interval
533  * since the last update we sent, send a status update to
534  * the master anyway, to report any progress in applying
535  * WAL.
536  */
537  bool requestReply = false;
538 
539  /*
540  * Check if time since last receive from standby has
541  * reached the configured limit.
542  */
543  if (wal_receiver_timeout > 0)
544  {
546  TimestampTz timeout;
547 
548  timeout =
549  TimestampTzPlusMilliseconds(last_recv_timestamp,
551 
552  if (now >= timeout)
553  ereport(ERROR,
554  (errmsg("terminating walreceiver due to timeout")));
555 
556  /*
557  * We didn't receive anything new, for half of
558  * receiver replication timeout. Ping the server.
559  */
560  if (!ping_sent)
561  {
562  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
563  (wal_receiver_timeout / 2));
564  if (now >= timeout)
565  {
566  requestReply = true;
567  ping_sent = true;
568  }
569  }
570  }
571 
572  XLogWalRcvSendReply(requestReply, requestReply);
574  }
575  }
576 
577  /*
578  * The backend finished streaming. Exit streaming COPY-mode from
579  * our side, too.
580  */
582  walrcv_endstreaming(wrconn, &primaryTLI);
584 
585  /*
586  * If the server had switched to a new timeline that we didn't
587  * know about when we began streaming, fetch its timeline history
588  * file now.
589  */
590  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
591  }
592  else
593  ereport(LOG,
594  (errmsg("primary server contains no more WAL on requested timeline %u",
595  startpointTLI)));
596 
597  /*
598  * End of WAL reached on the requested timeline. Close the last
599  * segment, and await for new orders from the startup process.
600  */
601  if (recvFile >= 0)
602  {
603  char xlogfname[MAXFNAMELEN];
604 
605  XLogWalRcvFlush(false);
606  if (close(recvFile) != 0)
607  ereport(PANIC,
609  errmsg("could not close log segment %s: %m",
611 
612  /*
613  * Create .done file forcibly to prevent the streamed segment from
614  * being archived later.
615  */
616  XLogFileName(xlogfname, recvFileTLI, recvSegNo);
618  XLogArchiveForceDone(xlogfname);
619  else
620  XLogArchiveNotify(xlogfname);
621  }
622  recvFile = -1;
623 
624  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
625  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
626  }
627  /* not reached */
628 }
629 
630 /*
631  * Wait for startup process to set receiveStart and receiveStartTLI.
632  */
633 static void
635 {
636  WalRcvData *walrcv = WalRcv;
637  int state;
638 
639  SpinLockAcquire(&walrcv->mutex);
640  state = walrcv->walRcvState;
641  if (state != WALRCV_STREAMING)
642  {
643  SpinLockRelease(&walrcv->mutex);
644  if (state == WALRCV_STOPPING)
645  proc_exit(0);
646  else
647  elog(FATAL, "unexpected walreceiver state");
648  }
649  walrcv->walRcvState = WALRCV_WAITING;
651  walrcv->receiveStartTLI = 0;
652  SpinLockRelease(&walrcv->mutex);
653 
655  set_ps_display("idle", false);
656 
657  /*
658  * nudge startup process to notice that we've stopped streaming and are
659  * now waiting for instructions.
660  */
661  WakeupRecovery();
662  for (;;)
663  {
664  ResetLatch(walrcv->latch);
665 
666  /*
667  * Emergency bailout if postmaster has died. This is to avoid the
668  * necessity for manual cleanup of all postmaster children.
669  */
670  if (!PostmasterIsAlive())
671  exit(1);
672 
674 
675  SpinLockAcquire(&walrcv->mutex);
676  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
677  walrcv->walRcvState == WALRCV_WAITING ||
678  walrcv->walRcvState == WALRCV_STOPPING);
679  if (walrcv->walRcvState == WALRCV_RESTARTING)
680  {
681  /* we don't expect primary_conninfo to change */
682  *startpoint = walrcv->receiveStart;
683  *startpointTLI = walrcv->receiveStartTLI;
684  walrcv->walRcvState = WALRCV_STREAMING;
685  SpinLockRelease(&walrcv->mutex);
686  break;
687  }
688  if (walrcv->walRcvState == WALRCV_STOPPING)
689  {
690  /*
691  * We should've received SIGTERM if the startup process wants us
692  * to die, but might as well check it here too.
693  */
694  SpinLockRelease(&walrcv->mutex);
695  exit(1);
696  }
697  SpinLockRelease(&walrcv->mutex);
698 
701  }
702 
704  {
705  char activitymsg[50];
706 
707  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
708  (uint32) (*startpoint >> 32),
709  (uint32) *startpoint);
710  set_ps_display(activitymsg, false);
711  }
712 }
713 
714 /*
715  * Fetch any missing timeline history files between 'first' and 'last'
716  * (inclusive) from the server.
717  */
718 static void
720 {
721  TimeLineID tli;
722 
723  for (tli = first; tli <= last; tli++)
724  {
725  /* there's no history file for timeline 1 */
726  if (tli != 1 && !existsTimeLineHistory(tli))
727  {
728  char *fname;
729  char *content;
730  int len;
731  char expectedfname[MAXFNAMELEN];
732 
733  ereport(LOG,
734  (errmsg("fetching timeline history file for timeline %u from primary server",
735  tli)));
736 
738  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
740 
741  /*
742  * Check that the filename on the master matches what we
743  * calculated ourselves. This is just a sanity check, it should
744  * always match.
745  */
746  TLHistoryFileName(expectedfname, tli);
747  if (strcmp(fname, expectedfname) != 0)
748  ereport(ERROR,
749  (errcode(ERRCODE_PROTOCOL_VIOLATION),
750  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
751  tli)));
752 
753  /*
754  * Write the file to pg_wal.
755  */
756  writeTimeLineHistoryFile(tli, content, len);
757 
758  pfree(fname);
759  pfree(content);
760  }
761  }
762 }
763 
764 /*
765  * Mark us as STOPPED in shared memory at exit.
766  */
767 static void
768 WalRcvDie(int code, Datum arg)
769 {
770  WalRcvData *walrcv = WalRcv;
771 
772  /* Ensure that all WAL records received are flushed to disk */
773  XLogWalRcvFlush(true);
774 
775  walrcv->latch = NULL;
776 
777  SpinLockAcquire(&walrcv->mutex);
778  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
779  walrcv->walRcvState == WALRCV_RESTARTING ||
780  walrcv->walRcvState == WALRCV_STARTING ||
781  walrcv->walRcvState == WALRCV_WAITING ||
782  walrcv->walRcvState == WALRCV_STOPPING);
783  Assert(walrcv->pid == MyProcPid);
784  walrcv->walRcvState = WALRCV_STOPPED;
785  walrcv->pid = 0;
786  walrcv->ready_to_display = false;
787  SpinLockRelease(&walrcv->mutex);
788 
789  /* Terminate the connection gracefully. */
790  if (wrconn != NULL)
791  walrcv_disconnect(wrconn);
792 
793  /* Wake up the startup process to notice promptly that we're gone */
794  WakeupRecovery();
795 }
796 
797 /* SIGHUP: set flag to re-read config file at next convenient time */
798 static void
800 {
801  got_SIGHUP = true;
802 }
803 
804 
805 /* SIGUSR1: used by latch mechanism */
806 static void
808 {
809  int save_errno = errno;
810 
812 
813  errno = save_errno;
814 }
815 
816 /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
817 static void
819 {
820  int save_errno = errno;
821 
822  got_SIGTERM = true;
823 
824  if (WalRcv->latch)
826 
827  /* Don't joggle the elbow of proc_exit */
830 
831  errno = save_errno;
832 }
833 
834 /*
835  * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
836  *
837  * Some backend has bought the farm, so we need to stop what we're doing and
838  * exit.
839  */
840 static void
842 {
844 
845  /*
846  * We DO NOT want to run proc_exit() callbacks -- we're here because
847  * shared memory may be corrupted, so we don't want to try to clean up our
848  * transaction. Just nail the windows shut and get out of town. Now that
849  * there's an atexit callback to prevent third-party code from breaking
850  * things by calling exit() directly, we have to reset the callbacks
851  * explicitly to make this work as intended.
852  */
853  on_exit_reset();
854 
855  /*
856  * Note we do exit(2) not exit(0). This is to force the postmaster into a
857  * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
858  * backend. This is necessary precisely because we don't clean up our
859  * shared memory state. (The "dead man switch" mechanism in pmsignal.c
860  * should ensure the postmaster sees this as a crash, too, but no harm in
861  * being doubly sure.)
862  */
863  exit(2);
864 }
865 
866 /*
867  * Accept the message from XLOG stream, and process it.
868  */
869 static void
870 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
871 {
872  int hdrlen;
873  XLogRecPtr dataStart;
874  XLogRecPtr walEnd;
875  TimestampTz sendTime;
876  bool replyRequested;
877 
878  resetStringInfo(&incoming_message);
879 
880  switch (type)
881  {
882  case 'w': /* WAL records */
883  {
884  /* copy message to StringInfo */
885  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
886  if (len < hdrlen)
887  ereport(ERROR,
888  (errcode(ERRCODE_PROTOCOL_VIOLATION),
889  errmsg_internal("invalid WAL message received from primary")));
890  appendBinaryStringInfo(&incoming_message, buf, hdrlen);
891 
892  /* read the fields */
893  dataStart = pq_getmsgint64(&incoming_message);
894  walEnd = pq_getmsgint64(&incoming_message);
895  sendTime = pq_getmsgint64(&incoming_message);
896  ProcessWalSndrMessage(walEnd, sendTime);
897 
898  buf += hdrlen;
899  len -= hdrlen;
900  XLogWalRcvWrite(buf, len, dataStart);
901  break;
902  }
903  case 'k': /* Keepalive */
904  {
905  /* copy message to StringInfo */
906  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
907  if (len != hdrlen)
908  ereport(ERROR,
909  (errcode(ERRCODE_PROTOCOL_VIOLATION),
910  errmsg_internal("invalid keepalive message received from primary")));
911  appendBinaryStringInfo(&incoming_message, buf, hdrlen);
912 
913  /* read the fields */
914  walEnd = pq_getmsgint64(&incoming_message);
915  sendTime = pq_getmsgint64(&incoming_message);
916  replyRequested = pq_getmsgbyte(&incoming_message);
917 
918  ProcessWalSndrMessage(walEnd, sendTime);
919 
920  /* If the primary requested a reply, send one immediately */
921  if (replyRequested)
922  XLogWalRcvSendReply(true, false);
923  break;
924  }
925  default:
926  ereport(ERROR,
927  (errcode(ERRCODE_PROTOCOL_VIOLATION),
928  errmsg_internal("invalid replication message type %d",
929  type)));
930  }
931 }
932 
933 /*
934  * Write XLOG data to disk.
935  */
936 static void
937 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
938 {
939  int startoff;
940  int byteswritten;
941 
942  while (nbytes > 0)
943  {
944  int segbytes;
945 
946  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
947  {
948  bool use_existent;
949 
950  /*
951  * fsync() and close current file before we switch to next one. We
952  * would otherwise have to reopen this file to fsync it later
953  */
954  if (recvFile >= 0)
955  {
956  char xlogfname[MAXFNAMELEN];
957 
958  XLogWalRcvFlush(false);
959 
960  /*
961  * XLOG segment files will be re-read by recovery in startup
962  * process soon, so we don't advise the OS to release cache
963  * pages associated with the file like XLogFileClose() does.
964  */
965  if (close(recvFile) != 0)
966  ereport(PANIC,
968  errmsg("could not close log segment %s: %m",
970 
971  /*
972  * Create .done file forcibly to prevent the streamed segment
973  * from being archived later.
974  */
975  XLogFileName(xlogfname, recvFileTLI, recvSegNo);
977  XLogArchiveForceDone(xlogfname);
978  else
979  XLogArchiveNotify(xlogfname);
980  }
981  recvFile = -1;
982 
983  /* Create/use new log file */
984  XLByteToSeg(recptr, recvSegNo);
985  use_existent = true;
986  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
988  recvOff = 0;
989  }
990 
991  /* Calculate the start offset of the received logs */
992  startoff = recptr % XLogSegSize;
993 
994  if (startoff + nbytes > XLogSegSize)
995  segbytes = XLogSegSize - startoff;
996  else
997  segbytes = nbytes;
998 
999  /* Need to seek in the file? */
1000  if (recvOff != startoff)
1001  {
1002  if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
1003  ereport(PANIC,
1005  errmsg("could not seek in log segment %s to offset %u: %m",
1007  startoff)));
1008  recvOff = startoff;
1009  }
1010 
1011  /* OK to write the logs */
1012  errno = 0;
1013 
1014  byteswritten = write(recvFile, buf, segbytes);
1015  if (byteswritten <= 0)
1016  {
1017  /* if write didn't set errno, assume no disk space */
1018  if (errno == 0)
1019  errno = ENOSPC;
1020  ereport(PANIC,
1022  errmsg("could not write to log segment %s "
1023  "at offset %u, length %lu: %m",
1025  recvOff, (unsigned long) segbytes)));
1026  }
1027 
1028  /* Update state for write */
1029  recptr += byteswritten;
1030 
1031  recvOff += byteswritten;
1032  nbytes -= byteswritten;
1033  buf += byteswritten;
1034 
1035  LogstreamResult.Write = recptr;
1036  }
1037 }
1038 
1039 /*
1040  * Flush the log to disk.
1041  *
1042  * If we're in the midst of dying, it's unwise to do anything that might throw
1043  * an error, so we skip sending a reply in that case.
1044  */
1045 static void
1046 XLogWalRcvFlush(bool dying)
1047 {
1048  if (LogstreamResult.Flush < LogstreamResult.Write)
1049  {
1050  WalRcvData *walrcv = WalRcv;
1051 
1053 
1054  LogstreamResult.Flush = LogstreamResult.Write;
1055 
1056  /* Update shared-memory status */
1057  SpinLockAcquire(&walrcv->mutex);
1058  if (walrcv->receivedUpto < LogstreamResult.Flush)
1059  {
1060  walrcv->latestChunkStart = walrcv->receivedUpto;
1061  walrcv->receivedUpto = LogstreamResult.Flush;
1062  walrcv->receivedTLI = ThisTimeLineID;
1063  }
1064  SpinLockRelease(&walrcv->mutex);
1065 
1066  /* Signal the startup process and walsender that new WAL has arrived */
1067  WakeupRecovery();
1069  WalSndWakeup();
1070 
1071  /* Report XLOG streaming progress in PS display */
1073  {
1074  char activitymsg[50];
1075 
1076  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1077  (uint32) (LogstreamResult.Write >> 32),
1078  (uint32) LogstreamResult.Write);
1079  set_ps_display(activitymsg, false);
1080  }
1081 
1082  /* Also let the master know that we made some progress */
1083  if (!dying)
1084  {
1085  XLogWalRcvSendReply(false, false);
1086  XLogWalRcvSendHSFeedback(false);
1087  }
1088  }
1089 }
1090 
1091 /*
1092  * Send reply message to primary, indicating our current XLOG positions, oldest
1093  * xmin and the current time.
1094  *
1095  * If 'force' is not set, the message is only sent if enough time has
1096  * passed since last status update to reach wal_receiver_status_interval.
1097  * If wal_receiver_status_interval is disabled altogether and 'force' is
1098  * false, this is a no-op.
1099  *
1100  * If 'requestReply' is true, requests the server to reply immediately upon
1101  * receiving this message. This is used for heartbearts, when approaching
1102  * wal_receiver_timeout.
1103  */
1104 static void
1105 XLogWalRcvSendReply(bool force, bool requestReply)
1106 {
1107  static XLogRecPtr writePtr = 0;
1108  static XLogRecPtr flushPtr = 0;
1109  XLogRecPtr applyPtr;
1110  static TimestampTz sendTime = 0;
1111  TimestampTz now;
1112 
1113  /*
1114  * If the user doesn't want status to be reported to the master, be sure
1115  * to exit before doing anything at all.
1116  */
1117  if (!force && wal_receiver_status_interval <= 0)
1118  return;
1119 
1120  /* Get current timestamp. */
1121  now = GetCurrentTimestamp();
1122 
1123  /*
1124  * We can compare the write and flush positions to the last message we
1125  * sent without taking any lock, but the apply position requires a spin
1126  * lock, so we don't check that unless something else has changed or 10
1127  * seconds have passed. This means that the apply log position will
1128  * appear, from the master's point of view, to lag slightly, but since
1129  * this is only for reporting purposes and only on idle systems, that's
1130  * probably OK.
1131  */
1132  if (!force
1133  && writePtr == LogstreamResult.Write
1134  && flushPtr == LogstreamResult.Flush
1135  && !TimestampDifferenceExceeds(sendTime, now,
1137  return;
1138  sendTime = now;
1139 
1140  /* Construct a new message */
1141  writePtr = LogstreamResult.Write;
1142  flushPtr = LogstreamResult.Flush;
1143  applyPtr = GetXLogReplayRecPtr(NULL);
1144 
1145  resetStringInfo(&reply_message);
1146  pq_sendbyte(&reply_message, 'r');
1147  pq_sendint64(&reply_message, writePtr);
1148  pq_sendint64(&reply_message, flushPtr);
1149  pq_sendint64(&reply_message, applyPtr);
1150  pq_sendint64(&reply_message, GetCurrentTimestamp());
1151  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1152 
1153  /* Send it */
1154  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1155  (uint32) (writePtr >> 32), (uint32) writePtr,
1156  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1157  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1158  requestReply ? " (reply requested)" : "");
1159 
1160  walrcv_send(wrconn, reply_message.data, reply_message.len);
1161 }
1162 
1163 /*
1164  * Send hot standby feedback message to primary, plus the current time,
1165  * in case they don't have a watch.
1166  *
1167  * If the user disables feedback, send one final message to tell sender
1168  * to forget about the xmin on this standby. We also send this message
1169  * on first connect because a previous connection might have set xmin
1170  * on a replication slot. (If we're not using a slot it's harmless to
1171  * send a feedback message explicitly setting InvalidTransactionId).
1172  */
1173 static void
1175 {
1176  TimestampTz now;
1177  TransactionId nextXid;
1178  uint32 nextEpoch;
1179  TransactionId xmin;
1180  static TimestampTz sendTime = 0;
1181  /* initially true so we always send at least one feedback message */
1182  static bool master_has_standby_xmin = true;
1183 
1184  /*
1185  * If the user doesn't want status to be reported to the master, be sure
1186  * to exit before doing anything at all.
1187  */
1189  !master_has_standby_xmin)
1190  return;
1191 
1192  /* Get current timestamp. */
1193  now = GetCurrentTimestamp();
1194 
1195  if (!immed)
1196  {
1197  /*
1198  * Send feedback at most once per wal_receiver_status_interval.
1199  */
1200  if (!TimestampDifferenceExceeds(sendTime, now,
1202  return;
1203  sendTime = now;
1204  }
1205 
1206  /*
1207  * If Hot Standby is not yet accepting connections there is nothing to
1208  * send. Check this after the interval has expired to reduce number of
1209  * calls.
1210  *
1211  * Bailing out here also ensures that we don't send feedback until we've
1212  * read our own replication slot state, so we don't tell the master to
1213  * discard needed xmin or catalog_xmin from any slots that may exist
1214  * on this replica.
1215  */
1216  if (!HotStandbyActive())
1217  return;
1218 
1219  /*
1220  * Make the expensive call to get the oldest xmin once we are certain
1221  * everything else has been checked.
1222  */
1224  xmin = GetOldestXmin(NULL, false);
1225  else
1226  xmin = InvalidTransactionId;
1227 
1228  /*
1229  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1230  * the epoch boundary.
1231  */
1232  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1233  if (nextXid < xmin)
1234  nextEpoch--;
1235 
1236  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
1237  xmin, nextEpoch);
1238 
1239  /* Construct the message and send it. */
1240  resetStringInfo(&reply_message);
1241  pq_sendbyte(&reply_message, 'h');
1242  pq_sendint64(&reply_message, GetCurrentTimestamp());
1243  pq_sendint(&reply_message, xmin, 4);
1244  pq_sendint(&reply_message, nextEpoch, 4);
1245  walrcv_send(wrconn, reply_message.data, reply_message.len);
1246  if (TransactionIdIsValid(xmin))
1247  master_has_standby_xmin = true;
1248  else
1249  master_has_standby_xmin = false;
1250 }
1251 
1252 /*
1253  * Update shared memory status upon receiving a message from primary.
1254  *
1255  * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1256  * message, reported by primary.
1257  */
1258 static void
1260 {
1261  WalRcvData *walrcv = WalRcv;
1262 
1263  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1264 
1265  /* Update shared-memory status */
1266  SpinLockAcquire(&walrcv->mutex);
1267  if (walrcv->latestWalEnd < walEnd)
1268  walrcv->latestWalEndTime = sendTime;
1269  walrcv->latestWalEnd = walEnd;
1270  walrcv->lastMsgSendTime = sendTime;
1271  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1272  SpinLockRelease(&walrcv->mutex);
1273 
1274  if (log_min_messages <= DEBUG2)
1275  {
1276  char *sendtime;
1277  char *receipttime;
1278  int applyDelay;
1279 
1280  /* Copy because timestamptz_to_str returns a static buffer */
1281  sendtime = pstrdup(timestamptz_to_str(sendTime));
1282  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1283  applyDelay = GetReplicationApplyDelay();
1284 
1285  /* apply delay is not available */
1286  if (applyDelay == -1)
1287  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1288  sendtime,
1289  receipttime,
1291  else
1292  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1293  sendtime,
1294  receipttime,
1295  applyDelay,
1297 
1298  pfree(sendtime);
1299  pfree(receipttime);
1300  }
1301 }
1302 
1303 /*
1304  * Wake up the walreceiver main loop.
1305  *
1306  * This is called by the startup process whenever interesting xlog records
1307  * are applied, so that walreceiver can check if it needs to send an apply
1308  * notification back to the master which may be waiting in a COMMIT with
1309  * synchronous_commit = remote_apply.
1310  */
1311 void
1313 {
1314  WalRcv->force_reply = true;
1315  if (WalRcv->latch)
1316  SetLatch(WalRcv->latch);
1317 }
1318 
1319 /*
1320  * Return a string constant representing the state. This is used
1321  * in system functions and views, and should *not* be translated.
1322  */
1323 static const char *
1325 {
1326  switch (state)
1327  {
1328  case WALRCV_STOPPED:
1329  return "stopped";
1330  case WALRCV_STARTING:
1331  return "starting";
1332  case WALRCV_STREAMING:
1333  return "streaming";
1334  case WALRCV_WAITING:
1335  return "waiting";
1336  case WALRCV_RESTARTING:
1337  return "restarting";
1338  case WALRCV_STOPPING:
1339  return "stopping";
1340  }
1341  return "UNKNOWN";
1342 }
1343 
1344 /*
1345  * Returns activity of WAL receiver, including pid, state and xlog locations
1346  * received from the WAL sender of another server.
1347  */
1348 Datum
1350 {
1351  TupleDesc tupdesc;
1352  Datum *values;
1353  bool *nulls;
1354  WalRcvData *walrcv = WalRcv;
1356  XLogRecPtr receive_start_lsn;
1357  TimeLineID receive_start_tli;
1358  XLogRecPtr received_lsn;
1359  TimeLineID received_tli;
1360  TimestampTz last_send_time;
1361  TimestampTz last_receipt_time;
1362  XLogRecPtr latest_end_lsn;
1363  TimestampTz latest_end_time;
1364  char *slotname;
1365  char *conninfo;
1366 
1367  /*
1368  * No WAL receiver (or not ready yet), just return a tuple with NULL
1369  * values
1370  */
1371  if (walrcv->pid == 0 || !walrcv->ready_to_display)
1372  PG_RETURN_NULL();
1373 
1374  /* determine result type */
1375  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1376  elog(ERROR, "return type must be a row type");
1377 
1378  values = palloc0(sizeof(Datum) * tupdesc->natts);
1379  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1380 
1381  /* Take a lock to ensure value consistency */
1382  SpinLockAcquire(&walrcv->mutex);
1383  state = walrcv->walRcvState;
1384  receive_start_lsn = walrcv->receiveStart;
1385  receive_start_tli = walrcv->receiveStartTLI;
1386  received_lsn = walrcv->receivedUpto;
1387  received_tli = walrcv->receivedTLI;
1388  last_send_time = walrcv->lastMsgSendTime;
1389  last_receipt_time = walrcv->lastMsgReceiptTime;
1390  latest_end_lsn = walrcv->latestWalEnd;
1391  latest_end_time = walrcv->latestWalEndTime;
1392  slotname = pstrdup(walrcv->slotname);
1393  conninfo = pstrdup(walrcv->conninfo);
1394  SpinLockRelease(&walrcv->mutex);
1395 
1396  /* Fetch values */
1397  values[0] = Int32GetDatum(walrcv->pid);
1398 
1399  if (!superuser())
1400  {
1401  /*
1402  * Only superusers can see details. Other users only get the pid value
1403  * to know whether it is a WAL receiver, but no details.
1404  */
1405  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1406  }
1407  else
1408  {
1409  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1410 
1411  if (XLogRecPtrIsInvalid(receive_start_lsn))
1412  nulls[2] = true;
1413  else
1414  values[2] = LSNGetDatum(receive_start_lsn);
1415  values[3] = Int32GetDatum(receive_start_tli);
1416  if (XLogRecPtrIsInvalid(received_lsn))
1417  nulls[4] = true;
1418  else
1419  values[4] = LSNGetDatum(received_lsn);
1420  values[5] = Int32GetDatum(received_tli);
1421  if (last_send_time == 0)
1422  nulls[6] = true;
1423  else
1424  values[6] = TimestampTzGetDatum(last_send_time);
1425  if (last_receipt_time == 0)
1426  nulls[7] = true;
1427  else
1428  values[7] = TimestampTzGetDatum(last_receipt_time);
1429  if (XLogRecPtrIsInvalid(latest_end_lsn))
1430  nulls[8] = true;
1431  else
1432  values[8] = LSNGetDatum(latest_end_lsn);
1433  if (latest_end_time == 0)
1434  nulls[9] = true;
1435  else
1436  values[9] = TimestampTzGetDatum(latest_end_time);
1437  if (*slotname == '\0')
1438  nulls[10] = true;
1439  else
1440  values[10] = CStringGetTextDatum(slotname);
1441  if (*conninfo == '\0')
1442  nulls[11] = true;
1443  else
1444  values[11] = CStringGetTextDatum(conninfo);
1445  }
1446 
1447  /* Returns the record as Datum */
1449  heap_form_tuple(tupdesc, values, nulls)));
1450 }
#define XLogSegSize
Definition: xlog_internal.h:92
#define SIGUSR1
Definition: win32.h:211
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:719
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:213
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1324
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:221
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1259
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:38
#define SIGCONT
Definition: win32.h:205
uint32 TimeLineID
Definition: xlogdefs.h:45
bool hot_standby_feedback
Definition: walreceiver.c:75
slock_t mutex
Definition: walreceiver.h:114
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:129
uint32 TransactionId
Definition: c.h:394
bool update_process_title
Definition: ps_status.c:35
XLogRecPtr Write
Definition: walreceiver.c:106
int GetReplicationTransferLatency(void)
#define write(a, b, c)
Definition: win32.h:19
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:634
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:9997
WalRcvState walRcvState
Definition: walreceiver.h:60
#define SIGWINCH
Definition: win32.h:209
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
static StringInfoData incoming_message
Definition: walreceiver.c:111
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:153
char * pstrdup(const char *in)
Definition: mcxt.c:1165
bool HotStandbyActive(void)
Definition: xlog.c:7861
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1174
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3141
union WalRcvStreamOptions::@53 proto
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:223
#define XLogFileName(fname, tli, logSegNo)
#define SIGTTIN
Definition: win32.h:207
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:799
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:219
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:80
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:853
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:870
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
void ResetLatch(volatile Latch *latch)
Definition: latch.c:462
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7805
int wal_receiver_status_interval
Definition: walreceiver.c:73
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1648
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:81
int natts
Definition: tupdesc.h:73
#define SIGQUIT
Definition: win32.h:197
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:94
Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
Definition: walreceiver.c:1349
TimestampTz lastMsgSendTime
Definition: walreceiver.h:93
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:74
Latch procLatch
Definition: proc.h:93
static WalReceiverConn * wrconn
Definition: walreceiver.c:78
WalRcvState
Definition: walreceiver.h:40
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:110
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:513
void WalReceiverMain(void)
Definition: walreceiver.c:187
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8223
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:301
void pfree(void *pointer)
Definition: mcxt.c:992
#define SIG_IGN
Definition: win32.h:193
#define ERROR
Definition: elog.h:43
static void WalRcvSigUsr1Handler(SIGNAL_ARGS)
Definition: walreceiver.c:807
void on_exit_reset(void)
Definition: ipc.c:396
void WakeupRecovery(void)
Definition: xlog.c:11975
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:10984
pid_t pid
Definition: walreceiver.h:59
#define MAXCONNINFO
Definition: walreceiver.h:32
XLogRecPtr latestChunkStart
Definition: walreceiver.h:88
#define DEBUG2
Definition: elog.h:24
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:215
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10040
static char * buf
Definition: pg_test_fsync.c:65
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:442
uint64 XLogSegNo
Definition: xlogdefs.h:34
int XLogArchiveMode
Definition: xlog.c:93
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:179
#define InvalidTransactionId
Definition: transam.h:31
#define AllowCascadeReplication()
Definition: walreceiver.h:35
XLogRecPtr startpoint
Definition: walreceiver.h:144
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
unsigned int uint32
Definition: c.h:265
struct WalRcvStreamOptions::@53::@54 physical
int pgsocket
Definition: port.h:22
sigset_t UnBlockSig
Definition: pqsignal.c:22
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:207
XLogRecPtr receivedUpto
Definition: walreceiver.h:79
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:97
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr Flush
Definition: walreceiver.c:107
TimeLineID receiveStartTLI
Definition: walreceiver.h:70
static TimeLineID recvFileTLI
Definition: walreceiver.c:89
Definition: guc.h:72
Latch * latch
Definition: walreceiver.h:132
int GetReplicationApplyDelay(void)
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
static XLogSegNo recvSegNo
Definition: walreceiver.c:90
bool force_reply
Definition: walreceiver.h:120
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
void * palloc0(Size size)
Definition: mcxt.c:920
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:172
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:563
uintptr_t Datum
Definition: postgres.h:374
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:297
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:79
#define PGINVALID_SOCKET
Definition: port.h:24
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1105
bool ready_to_display
Definition: walreceiver.h:123
TimestampTz latestWalEndTime
Definition: walreceiver.h:100
int log_min_messages
Definition: guc.c:453
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:818
#define SIGPIPE
Definition: win32.h:201
#define SIGHUP
Definition: win32.h:196
TimeLineID ThisTimeLineID
Definition: xlog.c:178
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define SIG_DFL
Definition: win32.h:191
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
XLogRecPtr latestWalEnd
Definition: walreceiver.h:99
TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum)
Definition: procarray.c:1305
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define pg_memory_barrier()
Definition: atomics.h:147
#define XLByteToSeg(xlrp, logSegNo)
void SetLatch(volatile Latch *latch)
Definition: latch.c:380
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:321
Definition: regguts.h:298
bool proc_exit_inprogress
Definition: ipc.c:40
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:133
static int server_version
Definition: pg_dumpall.c:81
size_t Size
Definition: c.h:353
#define walrcv_disconnect(conn)
Definition: walreceiver.h:231
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:222
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:225
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
Definition: walreceiver.c:841
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:217
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1046
WalRcvData * WalRcv
#define SIGTTOU
Definition: win32.h:208
static Datum values[MAXATTR]
Definition: bootstrap.c:162
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4661
#define Int32GetDatum(X)
Definition: postgres.h:487
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:768
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define CStringGetTextDatum(s)
Definition: builtins.h:90
void * arg
static uint32 recvOff
Definition: walreceiver.c:91
#define PG_FUNCTION_ARGS
Definition: fmgr.h:150
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:17
XLogRecPtr receiveStart
Definition: walreceiver.h:69
void latch_sigusr1_handler(void)
Definition: latch.c:1541
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:98
static int recvFile
Definition: walreceiver.c:88
#define SIGCHLD
Definition: win32.h:206
#define XLByteInSeg(xlrp, logSegNo)
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
#define SIGALRM
Definition: win32.h:202
#define UINT64_FORMAT
Definition: c.h:313
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:240
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
#define PG_RETURN_NULL()
Definition: fmgr.h:289
char slotname[NAMEDATALEN]
Definition: walreceiver.h:112
#define SIGUSR2
Definition: win32.h:212
void WalSndWakeup(void)
Definition: walsender.c:2631
static struct @24 LogstreamResult
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:106
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:937
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1709
void WalRcvForceReply(void)
Definition: walreceiver.c:1312
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:209