PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walreceiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walreceiver.c
4  *
5  * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6  * is the process in the standby server that takes charge of receiving
7  * XLOG records from a primary server during streaming replication.
8  *
9  * When the startup process determines that it's time to start streaming,
10  * it instructs postmaster to start walreceiver. Walreceiver first connects
11  * to the primary server (it will be served by a walsender process
12  * in the primary server), and then keeps receiving XLOG records and
13  * writing them to the disk as long as the connection is alive. As XLOG
14  * records are received and flushed to disk, it updates the
15  * WalRcv->receivedUpto variable in shared memory, to inform the startup
16  * process of how far it can proceed with XLOG replay.
17  *
18  * If the primary server ends streaming, but doesn't disconnect, walreceiver
19  * goes into "waiting" mode, and waits for the startup process to give new
20  * instructions. The startup process will treat that the same as
21  * disconnection, and will rescan the archive/pg_wal directory. But when the
22  * startup process wants to try streaming replication again, it will just
23  * nudge the existing walreceiver process that's waiting, instead of launching
24  * a new one.
25  *
26  * Normal termination is by SIGTERM, which instructs the walreceiver to
27  * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
28  * process, the walreceiver will simply abort and exit on SIGQUIT. A close
29  * of the connection and a FATAL error are treated not as a crash but as
30  * normal operation.
31  *
32  * This file contains the server-facing parts of walreceiver. The libpq-
33  * specific parts are in the libpqwalreceiver module. It's loaded
34  * dynamically to avoid linking the server with libpq.
35  *
36  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
37  *
38  *
39  * IDENTIFICATION
40  * src/backend/replication/walreceiver.c
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45 
46 #include <signal.h>
47 #include <unistd.h>
48 
49 #include "access/htup_details.h"
50 #include "access/timeline.h"
51 #include "access/transam.h"
52 #include "access/xlog_internal.h"
53 #include "catalog/pg_authid.h"
54 #include "catalog/pg_type.h"
55 #include "funcapi.h"
56 #include "libpq/pqformat.h"
57 #include "libpq/pqsignal.h"
58 #include "miscadmin.h"
59 #include "pgstat.h"
61 #include "replication/walsender.h"
62 #include "storage/ipc.h"
63 #include "storage/pmsignal.h"
64 #include "storage/procarray.h"
65 #include "utils/builtins.h"
66 #include "utils/guc.h"
67 #include "utils/pg_lsn.h"
68 #include "utils/ps_status.h"
69 #include "utils/resowner.h"
70 #include "utils/timestamp.h"
71 
72 
73 /* GUC variables */
77 
78 /* libpqwalreceiver connection */
79 static WalReceiverConn *wrconn = NULL;
81 
82 #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
83 
84 /*
85  * These variables are used similarly to openLogFile/SegNo/Off,
86  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
87  * corresponding the filename of recvFile.
88  */
89 static int recvFile = -1;
91 static XLogSegNo recvSegNo = 0;
92 static uint32 recvOff = 0;
93 
94 /*
95  * Flags set by interrupt handlers of walreceiver for later service in the
96  * main loop.
97  */
98 static volatile sig_atomic_t got_SIGHUP = false;
99 static volatile sig_atomic_t got_SIGTERM = false;
100 
101 /*
102  * LogstreamResult indicates the byte positions that we have already
103  * written/fsynced.
104  */
105 static struct
106 {
107  XLogRecPtr Write; /* last byte + 1 written out in the standby */
108  XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
110 
113 
114 /*
115  * About SIGTERM handling:
116  *
117  * We can't just exit(1) within SIGTERM signal handler, because the signal
118  * might arrive in the middle of some critical operation, like while we're
119  * holding a spinlock. We also can't just set a flag in signal handler and
120  * check it in the main loop, because we perform some blocking operations
121  * like libpqrcv_PQexec(), which can take a long time to finish.
122  *
123  * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
124  * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
125  * sets got_SIGTERM flag, which is checked in the main loop when convenient.
126  *
127  * This is very much like what regular backends do with ImmediateInterruptOK,
128  * ProcessInterrupts() etc.
129  */
130 static volatile bool WalRcvImmediateInterruptOK = false;
131 
132 /* Prototypes for private functions */
133 static void ProcessWalRcvInterrupts(void);
134 static void EnableWalRcvImmediateExit(void);
135 static void DisableWalRcvImmediateExit(void);
136 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
137 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
138 static void WalRcvDie(int code, Datum arg);
139 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
140 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
141 static void XLogWalRcvFlush(bool dying);
142 static void XLogWalRcvSendReply(bool force, bool requestReply);
143 static void XLogWalRcvSendHSFeedback(bool immed);
144 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
145 
146 /* Signal handlers */
147 static void WalRcvSigHupHandler(SIGNAL_ARGS);
148 static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
151 
152 
153 static void
155 {
156  /*
157  * Although walreceiver interrupt handling doesn't use the same scheme as
158  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
159  * any incoming signals on Win32.
160  */
162 
163  if (got_SIGTERM)
164  {
166  ereport(FATAL,
167  (errcode(ERRCODE_ADMIN_SHUTDOWN),
168  errmsg("terminating walreceiver process due to administrator command")));
169  }
170 }
171 
172 static void
174 {
177 }
178 
179 static void
181 {
184 }
185 
186 /* Main entry point for walreceiver process */
187 void
189 {
190  char conninfo[MAXCONNINFO];
191  char *tmp_conninfo;
192  char slotname[NAMEDATALEN];
193  XLogRecPtr startpoint;
194  TimeLineID startpointTLI;
195  TimeLineID primaryTLI;
196  bool first_stream;
197  WalRcvData *walrcv = WalRcv;
198  TimestampTz last_recv_timestamp;
200  bool ping_sent;
201  char *err;
202 
203  /*
204  * WalRcv should be set up already (if we are a backend, we inherit this
205  * by fork() or EXEC_BACKEND mechanism from the postmaster).
206  */
207  Assert(walrcv != NULL);
208 
209  now = GetCurrentTimestamp();
210 
211  /*
212  * Mark walreceiver as running in shared memory.
213  *
214  * Do this as early as possible, so that if we fail later on, we'll set
215  * state to STOPPED. If we die before this, the startup process will keep
216  * waiting for us to start up, until it times out.
217  */
218  SpinLockAcquire(&walrcv->mutex);
219  Assert(walrcv->pid == 0);
220  switch (walrcv->walRcvState)
221  {
222  case WALRCV_STOPPING:
223  /* If we've already been requested to stop, don't start up. */
224  walrcv->walRcvState = WALRCV_STOPPED;
225  /* fall through */
226 
227  case WALRCV_STOPPED:
228  SpinLockRelease(&walrcv->mutex);
229  proc_exit(1);
230  break;
231 
232  case WALRCV_STARTING:
233  /* The usual case */
234  break;
235 
236  case WALRCV_WAITING:
237  case WALRCV_STREAMING:
238  case WALRCV_RESTARTING:
239  default:
240  /* Shouldn't happen */
241  SpinLockRelease(&walrcv->mutex);
242  elog(PANIC, "walreceiver still running according to shared memory state");
243  }
244  /* Advertise our PID so that the startup process can kill us */
245  walrcv->pid = MyProcPid;
246  walrcv->walRcvState = WALRCV_STREAMING;
247 
248  /* Fetch information required to start streaming */
249  walrcv->ready_to_display = false;
250  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
251  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
252  startpoint = walrcv->receiveStart;
253  startpointTLI = walrcv->receiveStartTLI;
254 
255  /* Initialise to a sanish value */
256  walrcv->lastMsgSendTime =
257  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
258 
259  /* Report the latch to use to awaken this process */
260  walrcv->latch = &MyProc->procLatch;
261 
262  SpinLockRelease(&walrcv->mutex);
263 
264  /* Arrange to clean up at walreceiver exit */
266 
267  /* Properly accept or ignore signals the postmaster might send us */
268  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
269  pqsignal(SIGINT, SIG_IGN);
270  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
271  pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
276 
277  /* Reset some signals that are accepted by postmaster but not here */
283 
284  /* We allow SIGQUIT (quickdie) at all times */
285  sigdelset(&BlockSig, SIGQUIT);
286 
287  /* Load the libpq-specific functions */
288  load_file("libpqwalreceiver", false);
289  if (WalReceiverFunctions == NULL)
290  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
291 
292  /*
293  * Create a resource owner to keep track of our resources (not clear that
294  * we need this, but may as well have one).
295  */
296  CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
297 
298  /* Unblock signals (they were blocked when the postmaster forked us) */
300 
301  /* Establish the connection to the primary for XLOG streaming */
303  wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
304  if (!wrconn)
305  ereport(ERROR,
306  (errmsg("could not connect to the primary server: %s", err)));
308 
309  /*
310  * Save user-visible connection string. This clobbers the original
311  * conninfo, for security.
312  */
313  tmp_conninfo = walrcv_get_conninfo(wrconn);
314  SpinLockAcquire(&walrcv->mutex);
315  memset(walrcv->conninfo, 0, MAXCONNINFO);
316  if (tmp_conninfo)
317  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
318  walrcv->ready_to_display = true;
319  SpinLockRelease(&walrcv->mutex);
320 
321  if (tmp_conninfo)
322  pfree(tmp_conninfo);
323 
324  first_stream = true;
325  for (;;)
326  {
327  char *primary_sysid;
328  char standby_sysid[32];
329  int server_version;
331 
332  /*
333  * Check that we're connected to a valid server using the
334  * IDENTIFY_SYSTEM replication command.
335  */
337  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
338  &server_version);
339 
340  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
342  if (strcmp(primary_sysid, standby_sysid) != 0)
343  {
344  ereport(ERROR,
345  (errmsg("database system identifier differs between the primary and standby"),
346  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
347  primary_sysid, standby_sysid)));
348  }
350 
351  /*
352  * Confirm that the current timeline of the primary is the same or
353  * ahead of ours.
354  */
355  if (primaryTLI < startpointTLI)
356  ereport(ERROR,
357  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
358  primaryTLI, startpointTLI)));
359 
360  /*
361  * Get any missing history files. We do this always, even when we're
362  * not interested in that timeline, so that if we're promoted to
363  * become the master later on, we don't select the same timeline that
364  * was already used in the current master. This isn't bullet-proof -
365  * you'll need some external software to manage your cluster if you
366  * need to ensure that a unique timeline id is chosen in every case,
367  * but let's avoid the confusion of timeline id collisions where we
368  * can.
369  */
370  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
371 
372  /*
373  * Start streaming.
374  *
375  * We'll try to start at the requested starting point and timeline,
376  * even if it's different from the server's latest timeline. In case
377  * we've already reached the end of the old timeline, the server will
378  * finish the streaming immediately, and we will go back to await
379  * orders from the startup process. If recovery_target_timeline is
380  * 'latest', the startup process will scan pg_wal and find the new
381  * history file, bump recovery target timeline, and ask us to restart
382  * on the new timeline.
383  */
384  options.logical = false;
385  options.startpoint = startpoint;
386  options.slotname = slotname[0] != '\0' ? slotname : NULL;
387  options.proto.physical.startpointTLI = startpointTLI;
388  ThisTimeLineID = startpointTLI;
389  if (walrcv_startstreaming(wrconn, &options))
390  {
391  if (first_stream)
392  ereport(LOG,
393  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
394  (uint32) (startpoint >> 32), (uint32) startpoint,
395  startpointTLI)));
396  else
397  ereport(LOG,
398  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
399  (uint32) (startpoint >> 32), (uint32) startpoint,
400  startpointTLI)));
401  first_stream = false;
402 
403  /* Initialize LogstreamResult and buffers for processing messages */
405  initStringInfo(&reply_message);
406  initStringInfo(&incoming_message);
407 
408  /* Initialize the last recv timestamp */
409  last_recv_timestamp = GetCurrentTimestamp();
410  ping_sent = false;
411 
412  /* Loop until end-of-streaming or error */
413  for (;;)
414  {
415  char *buf;
416  int len;
417  bool endofwal = false;
418  pgsocket wait_fd = PGINVALID_SOCKET;
419  int rc;
420 
421  /*
422  * Exit walreceiver if we're not in recovery. This should not
423  * happen, but cross-check the status here.
424  */
425  if (!RecoveryInProgress())
426  ereport(FATAL,
427  (errmsg("cannot continue WAL streaming, recovery has already ended")));
428 
429  /* Process any requests or signals received recently */
431 
432  if (got_SIGHUP)
433  {
434  got_SIGHUP = false;
437  }
438 
439  /* See if we can read data immediately */
440  len = walrcv_receive(wrconn, &buf, &wait_fd);
441  if (len != 0)
442  {
443  /*
444  * Process the received data, and any subsequent data we
445  * can read without blocking.
446  */
447  for (;;)
448  {
449  if (len > 0)
450  {
451  /*
452  * Something was received from master, so reset
453  * timeout
454  */
455  last_recv_timestamp = GetCurrentTimestamp();
456  ping_sent = false;
457  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
458  }
459  else if (len == 0)
460  break;
461  else if (len < 0)
462  {
463  ereport(LOG,
464  (errmsg("replication terminated by primary server"),
465  errdetail("End of WAL reached on timeline %u at %X/%X.",
466  startpointTLI,
467  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
468  endofwal = true;
469  break;
470  }
471  len = walrcv_receive(wrconn, &buf, &wait_fd);
472  }
473 
474  /* Let the master know that we received some data. */
475  XLogWalRcvSendReply(false, false);
476 
477  /*
478  * If we've written some records, flush them to disk and
479  * let the startup process and primary server know about
480  * them.
481  */
482  XLogWalRcvFlush(false);
483  }
484 
485  /* Check if we need to exit the streaming loop. */
486  if (endofwal)
487  break;
488 
489  /*
490  * Ideally we would reuse a WaitEventSet object repeatedly
491  * here to avoid the overheads of WaitLatchOrSocket on epoll
492  * systems, but we can't be sure that libpq (or any other
493  * walreceiver implementation) has the same socket (even if
494  * the fd is the same number, it may have been closed and
495  * reopened since the last time). In future, if there is a
496  * function for removing sockets from WaitEventSet, then we
497  * could add and remove just the socket each time, potentially
498  * avoiding some system calls.
499  */
500  Assert(wait_fd != PGINVALID_SOCKET);
501  rc = WaitLatchOrSocket(walrcv->latch,
504  wait_fd,
507  if (rc & WL_LATCH_SET)
508  {
509  ResetLatch(walrcv->latch);
510  if (walrcv->force_reply)
511  {
512  /*
513  * The recovery process has asked us to send apply
514  * feedback now. Make sure the flag is really set to
515  * false in shared memory before sending the reply, so
516  * we don't miss a new request for a reply.
517  */
518  walrcv->force_reply = false;
520  XLogWalRcvSendReply(true, false);
521  }
522  }
523  if (rc & WL_POSTMASTER_DEATH)
524  {
525  /*
526  * Emergency bailout if postmaster has died. This is to
527  * avoid the necessity for manual cleanup of all
528  * postmaster children.
529  */
530  exit(1);
531  }
532  if (rc & WL_TIMEOUT)
533  {
534  /*
535  * We didn't receive anything new. If we haven't heard
536  * anything from the server for more than
537  * wal_receiver_timeout / 2, ping the server. Also, if
538  * it's been longer than wal_receiver_status_interval
539  * since the last update we sent, send a status update to
540  * the master anyway, to report any progress in applying
541  * WAL.
542  */
543  bool requestReply = false;
544 
545  /*
546  * Check if time since last receive from standby has
547  * reached the configured limit.
548  */
549  if (wal_receiver_timeout > 0)
550  {
552  TimestampTz timeout;
553 
554  timeout =
555  TimestampTzPlusMilliseconds(last_recv_timestamp,
557 
558  if (now >= timeout)
559  ereport(ERROR,
560  (errmsg("terminating walreceiver due to timeout")));
561 
562  /*
563  * We didn't receive anything new, for half of
564  * receiver replication timeout. Ping the server.
565  */
566  if (!ping_sent)
567  {
568  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
569  (wal_receiver_timeout / 2));
570  if (now >= timeout)
571  {
572  requestReply = true;
573  ping_sent = true;
574  }
575  }
576  }
577 
578  XLogWalRcvSendReply(requestReply, requestReply);
580  }
581  }
582 
583  /*
584  * The backend finished streaming. Exit streaming COPY-mode from
585  * our side, too.
586  */
588  walrcv_endstreaming(wrconn, &primaryTLI);
590 
591  /*
592  * If the server had switched to a new timeline that we didn't
593  * know about when we began streaming, fetch its timeline history
594  * file now.
595  */
596  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
597  }
598  else
599  ereport(LOG,
600  (errmsg("primary server contains no more WAL on requested timeline %u",
601  startpointTLI)));
602 
603  /*
604  * End of WAL reached on the requested timeline. Close the last
605  * segment, and await for new orders from the startup process.
606  */
607  if (recvFile >= 0)
608  {
609  char xlogfname[MAXFNAMELEN];
610 
611  XLogWalRcvFlush(false);
612  if (close(recvFile) != 0)
613  ereport(PANIC,
615  errmsg("could not close log segment %s: %m",
617 
618  /*
619  * Create .done file forcibly to prevent the streamed segment from
620  * being archived later.
621  */
624  XLogArchiveForceDone(xlogfname);
625  else
626  XLogArchiveNotify(xlogfname);
627  }
628  recvFile = -1;
629 
630  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
631  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
632  }
633  /* not reached */
634 }
635 
636 /*
637  * Wait for startup process to set receiveStart and receiveStartTLI.
638  */
639 static void
641 {
642  WalRcvData *walrcv = WalRcv;
643  int state;
644 
645  SpinLockAcquire(&walrcv->mutex);
646  state = walrcv->walRcvState;
647  if (state != WALRCV_STREAMING)
648  {
649  SpinLockRelease(&walrcv->mutex);
650  if (state == WALRCV_STOPPING)
651  proc_exit(0);
652  else
653  elog(FATAL, "unexpected walreceiver state");
654  }
655  walrcv->walRcvState = WALRCV_WAITING;
657  walrcv->receiveStartTLI = 0;
658  SpinLockRelease(&walrcv->mutex);
659 
661  set_ps_display("idle", false);
662 
663  /*
664  * nudge startup process to notice that we've stopped streaming and are
665  * now waiting for instructions.
666  */
667  WakeupRecovery();
668  for (;;)
669  {
670  ResetLatch(walrcv->latch);
671 
672  /*
673  * Emergency bailout if postmaster has died. This is to avoid the
674  * necessity for manual cleanup of all postmaster children.
675  */
676  if (!PostmasterIsAlive())
677  exit(1);
678 
680 
681  SpinLockAcquire(&walrcv->mutex);
682  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
683  walrcv->walRcvState == WALRCV_WAITING ||
684  walrcv->walRcvState == WALRCV_STOPPING);
685  if (walrcv->walRcvState == WALRCV_RESTARTING)
686  {
687  /* we don't expect primary_conninfo to change */
688  *startpoint = walrcv->receiveStart;
689  *startpointTLI = walrcv->receiveStartTLI;
690  walrcv->walRcvState = WALRCV_STREAMING;
691  SpinLockRelease(&walrcv->mutex);
692  break;
693  }
694  if (walrcv->walRcvState == WALRCV_STOPPING)
695  {
696  /*
697  * We should've received SIGTERM if the startup process wants us
698  * to die, but might as well check it here too.
699  */
700  SpinLockRelease(&walrcv->mutex);
701  exit(1);
702  }
703  SpinLockRelease(&walrcv->mutex);
704 
707  }
708 
710  {
711  char activitymsg[50];
712 
713  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
714  (uint32) (*startpoint >> 32),
715  (uint32) *startpoint);
716  set_ps_display(activitymsg, false);
717  }
718 }
719 
720 /*
721  * Fetch any missing timeline history files between 'first' and 'last'
722  * (inclusive) from the server.
723  */
724 static void
726 {
727  TimeLineID tli;
728 
729  for (tli = first; tli <= last; tli++)
730  {
731  /* there's no history file for timeline 1 */
732  if (tli != 1 && !existsTimeLineHistory(tli))
733  {
734  char *fname;
735  char *content;
736  int len;
737  char expectedfname[MAXFNAMELEN];
738 
739  ereport(LOG,
740  (errmsg("fetching timeline history file for timeline %u from primary server",
741  tli)));
742 
744  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
746 
747  /*
748  * Check that the filename on the master matches what we
749  * calculated ourselves. This is just a sanity check, it should
750  * always match.
751  */
752  TLHistoryFileName(expectedfname, tli);
753  if (strcmp(fname, expectedfname) != 0)
754  ereport(ERROR,
755  (errcode(ERRCODE_PROTOCOL_VIOLATION),
756  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
757  tli)));
758 
759  /*
760  * Write the file to pg_wal.
761  */
762  writeTimeLineHistoryFile(tli, content, len);
763 
764  pfree(fname);
765  pfree(content);
766  }
767  }
768 }
769 
770 /*
771  * Mark us as STOPPED in shared memory at exit.
772  */
773 static void
774 WalRcvDie(int code, Datum arg)
775 {
776  WalRcvData *walrcv = WalRcv;
777 
778  /* Ensure that all WAL records received are flushed to disk */
779  XLogWalRcvFlush(true);
780 
781  /* Mark ourselves inactive in shared memory */
782  SpinLockAcquire(&walrcv->mutex);
783  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
784  walrcv->walRcvState == WALRCV_RESTARTING ||
785  walrcv->walRcvState == WALRCV_STARTING ||
786  walrcv->walRcvState == WALRCV_WAITING ||
787  walrcv->walRcvState == WALRCV_STOPPING);
788  Assert(walrcv->pid == MyProcPid);
789  walrcv->walRcvState = WALRCV_STOPPED;
790  walrcv->pid = 0;
791  walrcv->ready_to_display = false;
792  walrcv->latch = NULL;
793  SpinLockRelease(&walrcv->mutex);
794 
795  /* Terminate the connection gracefully. */
796  if (wrconn != NULL)
797  walrcv_disconnect(wrconn);
798 
799  /* Wake up the startup process to notice promptly that we're gone */
800  WakeupRecovery();
801 }
802 
803 /* SIGHUP: set flag to re-read config file at next convenient time */
804 static void
806 {
807  got_SIGHUP = true;
808 }
809 
810 
811 /* SIGUSR1: used by latch mechanism */
812 static void
814 {
815  int save_errno = errno;
816 
818 
819  errno = save_errno;
820 }
821 
822 /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
823 static void
825 {
826  int save_errno = errno;
827 
828  got_SIGTERM = true;
829 
830  if (WalRcv->latch)
832 
833  /* Don't joggle the elbow of proc_exit */
836 
837  errno = save_errno;
838 }
839 
840 /*
841  * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
842  *
843  * Some backend has bought the farm, so we need to stop what we're doing and
844  * exit.
845  */
846 static void
848 {
850 
851  /*
852  * We DO NOT want to run proc_exit() callbacks -- we're here because
853  * shared memory may be corrupted, so we don't want to try to clean up our
854  * transaction. Just nail the windows shut and get out of town. Now that
855  * there's an atexit callback to prevent third-party code from breaking
856  * things by calling exit() directly, we have to reset the callbacks
857  * explicitly to make this work as intended.
858  */
859  on_exit_reset();
860 
861  /*
862  * Note we do exit(2) not exit(0). This is to force the postmaster into a
863  * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
864  * backend. This is necessary precisely because we don't clean up our
865  * shared memory state. (The "dead man switch" mechanism in pmsignal.c
866  * should ensure the postmaster sees this as a crash, too, but no harm in
867  * being doubly sure.)
868  */
869  exit(2);
870 }
871 
872 /*
873  * Accept the message from XLOG stream, and process it.
874  */
875 static void
876 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
877 {
878  int hdrlen;
879  XLogRecPtr dataStart;
880  XLogRecPtr walEnd;
881  TimestampTz sendTime;
882  bool replyRequested;
883 
884  resetStringInfo(&incoming_message);
885 
886  switch (type)
887  {
888  case 'w': /* WAL records */
889  {
890  /* copy message to StringInfo */
891  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
892  if (len < hdrlen)
893  ereport(ERROR,
894  (errcode(ERRCODE_PROTOCOL_VIOLATION),
895  errmsg_internal("invalid WAL message received from primary")));
896  appendBinaryStringInfo(&incoming_message, buf, hdrlen);
897 
898  /* read the fields */
899  dataStart = pq_getmsgint64(&incoming_message);
900  walEnd = pq_getmsgint64(&incoming_message);
901  sendTime = pq_getmsgint64(&incoming_message);
902  ProcessWalSndrMessage(walEnd, sendTime);
903 
904  buf += hdrlen;
905  len -= hdrlen;
906  XLogWalRcvWrite(buf, len, dataStart);
907  break;
908  }
909  case 'k': /* Keepalive */
910  {
911  /* copy message to StringInfo */
912  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
913  if (len != hdrlen)
914  ereport(ERROR,
915  (errcode(ERRCODE_PROTOCOL_VIOLATION),
916  errmsg_internal("invalid keepalive message received from primary")));
917  appendBinaryStringInfo(&incoming_message, buf, hdrlen);
918 
919  /* read the fields */
920  walEnd = pq_getmsgint64(&incoming_message);
921  sendTime = pq_getmsgint64(&incoming_message);
922  replyRequested = pq_getmsgbyte(&incoming_message);
923 
924  ProcessWalSndrMessage(walEnd, sendTime);
925 
926  /* If the primary requested a reply, send one immediately */
927  if (replyRequested)
928  XLogWalRcvSendReply(true, false);
929  break;
930  }
931  default:
932  ereport(ERROR,
933  (errcode(ERRCODE_PROTOCOL_VIOLATION),
934  errmsg_internal("invalid replication message type %d",
935  type)));
936  }
937 }
938 
939 /*
940  * Write XLOG data to disk.
941  */
942 static void
943 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
944 {
945  int startoff;
946  int byteswritten;
947 
948  while (nbytes > 0)
949  {
950  int segbytes;
951 
952  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
953  {
954  bool use_existent;
955 
956  /*
957  * fsync() and close current file before we switch to next one. We
958  * would otherwise have to reopen this file to fsync it later
959  */
960  if (recvFile >= 0)
961  {
962  char xlogfname[MAXFNAMELEN];
963 
964  XLogWalRcvFlush(false);
965 
966  /*
967  * XLOG segment files will be re-read by recovery in startup
968  * process soon, so we don't advise the OS to release cache
969  * pages associated with the file like XLogFileClose() does.
970  */
971  if (close(recvFile) != 0)
972  ereport(PANIC,
974  errmsg("could not close log segment %s: %m",
976 
977  /*
978  * Create .done file forcibly to prevent the streamed segment
979  * from being archived later.
980  */
983  XLogArchiveForceDone(xlogfname);
984  else
985  XLogArchiveNotify(xlogfname);
986  }
987  recvFile = -1;
988 
989  /* Create/use new log file */
991  use_existent = true;
992  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
994  recvOff = 0;
995  }
996 
997  /* Calculate the start offset of the received logs */
998  startoff = XLogSegmentOffset(recptr, wal_segment_size);
999 
1000  if (startoff + nbytes > wal_segment_size)
1001  segbytes = wal_segment_size - startoff;
1002  else
1003  segbytes = nbytes;
1004 
1005  /* Need to seek in the file? */
1006  if (recvOff != startoff)
1007  {
1008  if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
1009  ereport(PANIC,
1011  errmsg("could not seek in log segment %s to offset %u: %m",
1013  startoff)));
1014  recvOff = startoff;
1015  }
1016 
1017  /* OK to write the logs */
1018  errno = 0;
1019 
1020  byteswritten = write(recvFile, buf, segbytes);
1021  if (byteswritten <= 0)
1022  {
1023  /* if write didn't set errno, assume no disk space */
1024  if (errno == 0)
1025  errno = ENOSPC;
1026  ereport(PANIC,
1028  errmsg("could not write to log segment %s "
1029  "at offset %u, length %lu: %m",
1031  recvOff, (unsigned long) segbytes)));
1032  }
1033 
1034  /* Update state for write */
1035  recptr += byteswritten;
1036 
1037  recvOff += byteswritten;
1038  nbytes -= byteswritten;
1039  buf += byteswritten;
1040 
1041  LogstreamResult.Write = recptr;
1042  }
1043 }
1044 
1045 /*
1046  * Flush the log to disk.
1047  *
1048  * If we're in the midst of dying, it's unwise to do anything that might throw
1049  * an error, so we skip sending a reply in that case.
1050  */
1051 static void
1052 XLogWalRcvFlush(bool dying)
1053 {
1054  if (LogstreamResult.Flush < LogstreamResult.Write)
1055  {
1056  WalRcvData *walrcv = WalRcv;
1057 
1059 
1060  LogstreamResult.Flush = LogstreamResult.Write;
1061 
1062  /* Update shared-memory status */
1063  SpinLockAcquire(&walrcv->mutex);
1064  if (walrcv->receivedUpto < LogstreamResult.Flush)
1065  {
1066  walrcv->latestChunkStart = walrcv->receivedUpto;
1067  walrcv->receivedUpto = LogstreamResult.Flush;
1068  walrcv->receivedTLI = ThisTimeLineID;
1069  }
1070  SpinLockRelease(&walrcv->mutex);
1071 
1072  /* Signal the startup process and walsender that new WAL has arrived */
1073  WakeupRecovery();
1075  WalSndWakeup();
1076 
1077  /* Report XLOG streaming progress in PS display */
1079  {
1080  char activitymsg[50];
1081 
1082  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1083  (uint32) (LogstreamResult.Write >> 32),
1084  (uint32) LogstreamResult.Write);
1085  set_ps_display(activitymsg, false);
1086  }
1087 
1088  /* Also let the master know that we made some progress */
1089  if (!dying)
1090  {
1091  XLogWalRcvSendReply(false, false);
1092  XLogWalRcvSendHSFeedback(false);
1093  }
1094  }
1095 }
1096 
1097 /*
1098  * Send reply message to primary, indicating our current WAL locations, oldest
1099  * xmin and the current time.
1100  *
1101  * If 'force' is not set, the message is only sent if enough time has
1102  * passed since last status update to reach wal_receiver_status_interval.
1103  * If wal_receiver_status_interval is disabled altogether and 'force' is
1104  * false, this is a no-op.
1105  *
1106  * If 'requestReply' is true, requests the server to reply immediately upon
1107  * receiving this message. This is used for heartbearts, when approaching
1108  * wal_receiver_timeout.
1109  */
1110 static void
1111 XLogWalRcvSendReply(bool force, bool requestReply)
1112 {
1113  static XLogRecPtr writePtr = 0;
1114  static XLogRecPtr flushPtr = 0;
1115  XLogRecPtr applyPtr;
1116  static TimestampTz sendTime = 0;
1117  TimestampTz now;
1118 
1119  /*
1120  * If the user doesn't want status to be reported to the master, be sure
1121  * to exit before doing anything at all.
1122  */
1123  if (!force && wal_receiver_status_interval <= 0)
1124  return;
1125 
1126  /* Get current timestamp. */
1127  now = GetCurrentTimestamp();
1128 
1129  /*
1130  * We can compare the write and flush positions to the last message we
1131  * sent without taking any lock, but the apply position requires a spin
1132  * lock, so we don't check that unless something else has changed or 10
1133  * seconds have passed. This means that the apply WAL location will
1134  * appear, from the master's point of view, to lag slightly, but since
1135  * this is only for reporting purposes and only on idle systems, that's
1136  * probably OK.
1137  */
1138  if (!force
1139  && writePtr == LogstreamResult.Write
1140  && flushPtr == LogstreamResult.Flush
1141  && !TimestampDifferenceExceeds(sendTime, now,
1143  return;
1144  sendTime = now;
1145 
1146  /* Construct a new message */
1147  writePtr = LogstreamResult.Write;
1148  flushPtr = LogstreamResult.Flush;
1149  applyPtr = GetXLogReplayRecPtr(NULL);
1150 
1151  resetStringInfo(&reply_message);
1152  pq_sendbyte(&reply_message, 'r');
1153  pq_sendint64(&reply_message, writePtr);
1154  pq_sendint64(&reply_message, flushPtr);
1155  pq_sendint64(&reply_message, applyPtr);
1156  pq_sendint64(&reply_message, GetCurrentTimestamp());
1157  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1158 
1159  /* Send it */
1160  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1161  (uint32) (writePtr >> 32), (uint32) writePtr,
1162  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1163  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1164  requestReply ? " (reply requested)" : "");
1165 
1166  walrcv_send(wrconn, reply_message.data, reply_message.len);
1167 }
1168 
1169 /*
1170  * Send hot standby feedback message to primary, plus the current time,
1171  * in case they don't have a watch.
1172  *
1173  * If the user disables feedback, send one final message to tell sender
1174  * to forget about the xmin on this standby. We also send this message
1175  * on first connect because a previous connection might have set xmin
1176  * on a replication slot. (If we're not using a slot it's harmless to
1177  * send a feedback message explicitly setting InvalidTransactionId).
1178  */
1179 static void
1181 {
1182  TimestampTz now;
1183  TransactionId nextXid;
1184  uint32 xmin_epoch,
1185  catalog_xmin_epoch;
1186  TransactionId xmin,
1187  catalog_xmin;
1188  static TimestampTz sendTime = 0;
1189 
1190  /* initially true so we always send at least one feedback message */
1191  static bool master_has_standby_xmin = true;
1192 
1193  /*
1194  * If the user doesn't want status to be reported to the master, be sure
1195  * to exit before doing anything at all.
1196  */
1198  !master_has_standby_xmin)
1199  return;
1200 
1201  /* Get current timestamp. */
1202  now = GetCurrentTimestamp();
1203 
1204  if (!immed)
1205  {
1206  /*
1207  * Send feedback at most once per wal_receiver_status_interval.
1208  */
1209  if (!TimestampDifferenceExceeds(sendTime, now,
1211  return;
1212  sendTime = now;
1213  }
1214 
1215  /*
1216  * If Hot Standby is not yet accepting connections there is nothing to
1217  * send. Check this after the interval has expired to reduce number of
1218  * calls.
1219  *
1220  * Bailing out here also ensures that we don't send feedback until we've
1221  * read our own replication slot state, so we don't tell the master to
1222  * discard needed xmin or catalog_xmin from any slots that may exist on
1223  * this replica.
1224  */
1225  if (!HotStandbyActive())
1226  return;
1227 
1228  /*
1229  * Make the expensive call to get the oldest xmin once we are certain
1230  * everything else has been checked.
1231  */
1233  {
1234  TransactionId slot_xmin;
1235 
1236  /*
1237  * Usually GetOldestXmin() would include both global replication slot
1238  * xmin and catalog_xmin in its calculations, but we want to derive
1239  * separate values for each of those. So we ask for an xmin that
1240  * excludes the catalog_xmin.
1241  */
1242  xmin = GetOldestXmin(NULL,
1244 
1245  ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1246 
1247  if (TransactionIdIsValid(slot_xmin) &&
1248  TransactionIdPrecedes(slot_xmin, xmin))
1249  xmin = slot_xmin;
1250  }
1251  else
1252  {
1253  xmin = InvalidTransactionId;
1254  catalog_xmin = InvalidTransactionId;
1255  }
1256 
1257  /*
1258  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1259  * the epoch boundary.
1260  */
1261  GetNextXidAndEpoch(&nextXid, &xmin_epoch);
1262  catalog_xmin_epoch = xmin_epoch;
1263  if (nextXid < xmin)
1264  xmin_epoch--;
1265  if (nextXid < catalog_xmin)
1266  catalog_xmin_epoch--;
1267 
1268  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1269  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1270 
1271  /* Construct the message and send it. */
1272  resetStringInfo(&reply_message);
1273  pq_sendbyte(&reply_message, 'h');
1274  pq_sendint64(&reply_message, GetCurrentTimestamp());
1275  pq_sendint32(&reply_message, xmin);
1276  pq_sendint32(&reply_message, xmin_epoch);
1277  pq_sendint32(&reply_message, catalog_xmin);
1278  pq_sendint32(&reply_message, catalog_xmin_epoch);
1279  walrcv_send(wrconn, reply_message.data, reply_message.len);
1280  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1281  master_has_standby_xmin = true;
1282  else
1283  master_has_standby_xmin = false;
1284 }
1285 
1286 /*
1287  * Update shared memory status upon receiving a message from primary.
1288  *
1289  * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1290  * message, reported by primary.
1291  */
1292 static void
1294 {
1295  WalRcvData *walrcv = WalRcv;
1296 
1297  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1298 
1299  /* Update shared-memory status */
1300  SpinLockAcquire(&walrcv->mutex);
1301  if (walrcv->latestWalEnd < walEnd)
1302  walrcv->latestWalEndTime = sendTime;
1303  walrcv->latestWalEnd = walEnd;
1304  walrcv->lastMsgSendTime = sendTime;
1305  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1306  SpinLockRelease(&walrcv->mutex);
1307 
1308  if (log_min_messages <= DEBUG2)
1309  {
1310  char *sendtime;
1311  char *receipttime;
1312  int applyDelay;
1313 
1314  /* Copy because timestamptz_to_str returns a static buffer */
1315  sendtime = pstrdup(timestamptz_to_str(sendTime));
1316  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1317  applyDelay = GetReplicationApplyDelay();
1318 
1319  /* apply delay is not available */
1320  if (applyDelay == -1)
1321  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1322  sendtime,
1323  receipttime,
1325  else
1326  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1327  sendtime,
1328  receipttime,
1329  applyDelay,
1331 
1332  pfree(sendtime);
1333  pfree(receipttime);
1334  }
1335 }
1336 
1337 /*
1338  * Wake up the walreceiver main loop.
1339  *
1340  * This is called by the startup process whenever interesting xlog records
1341  * are applied, so that walreceiver can check if it needs to send an apply
1342  * notification back to the master which may be waiting in a COMMIT with
1343  * synchronous_commit = remote_apply.
1344  */
1345 void
1347 {
1348  Latch *latch;
1349 
1350  WalRcv->force_reply = true;
1351  /* fetching the latch pointer might not be atomic, so use spinlock */
1353  latch = WalRcv->latch;
1355  if (latch)
1356  SetLatch(latch);
1357 }
1358 
1359 /*
1360  * Return a string constant representing the state. This is used
1361  * in system functions and views, and should *not* be translated.
1362  */
1363 static const char *
1365 {
1366  switch (state)
1367  {
1368  case WALRCV_STOPPED:
1369  return "stopped";
1370  case WALRCV_STARTING:
1371  return "starting";
1372  case WALRCV_STREAMING:
1373  return "streaming";
1374  case WALRCV_WAITING:
1375  return "waiting";
1376  case WALRCV_RESTARTING:
1377  return "restarting";
1378  case WALRCV_STOPPING:
1379  return "stopping";
1380  }
1381  return "UNKNOWN";
1382 }
1383 
1384 /*
1385  * Returns activity of WAL receiver, including pid, state and xlog locations
1386  * received from the WAL sender of another server.
1387  */
1388 Datum
1390 {
1391  TupleDesc tupdesc;
1392  Datum *values;
1393  bool *nulls;
1394  int pid;
1395  bool ready_to_display;
1397  XLogRecPtr receive_start_lsn;
1398  TimeLineID receive_start_tli;
1399  XLogRecPtr received_lsn;
1400  TimeLineID received_tli;
1401  TimestampTz last_send_time;
1402  TimestampTz last_receipt_time;
1403  XLogRecPtr latest_end_lsn;
1404  TimestampTz latest_end_time;
1405  char slotname[NAMEDATALEN];
1406  char conninfo[MAXCONNINFO];
1407 
1408  /* Take a lock to ensure value consistency */
1410  pid = (int) WalRcv->pid;
1411  ready_to_display = WalRcv->ready_to_display;
1412  state = WalRcv->walRcvState;
1413  receive_start_lsn = WalRcv->receiveStart;
1414  receive_start_tli = WalRcv->receiveStartTLI;
1415  received_lsn = WalRcv->receivedUpto;
1416  received_tli = WalRcv->receivedTLI;
1417  last_send_time = WalRcv->lastMsgSendTime;
1418  last_receipt_time = WalRcv->lastMsgReceiptTime;
1419  latest_end_lsn = WalRcv->latestWalEnd;
1420  latest_end_time = WalRcv->latestWalEndTime;
1421  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1422  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1424 
1425  /*
1426  * No WAL receiver (or not ready yet), just return a tuple with NULL
1427  * values
1428  */
1429  if (pid == 0 || !ready_to_display)
1430  PG_RETURN_NULL();
1431 
1432  /* determine result type */
1433  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1434  elog(ERROR, "return type must be a row type");
1435 
1436  values = palloc0(sizeof(Datum) * tupdesc->natts);
1437  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1438 
1439  /* Fetch values */
1440  values[0] = Int32GetDatum(pid);
1441 
1443  {
1444  /*
1445  * Only superusers can see details. Other users only get the pid value
1446  * to know whether it is a WAL receiver, but no details.
1447  */
1448  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1449  }
1450  else
1451  {
1452  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1453 
1454  if (XLogRecPtrIsInvalid(receive_start_lsn))
1455  nulls[2] = true;
1456  else
1457  values[2] = LSNGetDatum(receive_start_lsn);
1458  values[3] = Int32GetDatum(receive_start_tli);
1459  if (XLogRecPtrIsInvalid(received_lsn))
1460  nulls[4] = true;
1461  else
1462  values[4] = LSNGetDatum(received_lsn);
1463  values[5] = Int32GetDatum(received_tli);
1464  if (last_send_time == 0)
1465  nulls[6] = true;
1466  else
1467  values[6] = TimestampTzGetDatum(last_send_time);
1468  if (last_receipt_time == 0)
1469  nulls[7] = true;
1470  else
1471  values[7] = TimestampTzGetDatum(last_receipt_time);
1472  if (XLogRecPtrIsInvalid(latest_end_lsn))
1473  nulls[8] = true;
1474  else
1475  values[8] = LSNGetDatum(latest_end_lsn);
1476  if (latest_end_time == 0)
1477  nulls[9] = true;
1478  else
1479  values[9] = TimestampTzGetDatum(latest_end_time);
1480  if (*slotname == '\0')
1481  nulls[10] = true;
1482  else
1483  values[10] = CStringGetTextDatum(slotname);
1484  if (*conninfo == '\0')
1485  nulls[11] = true;
1486  else
1487  values[11] = CStringGetTextDatum(conninfo);
1488  }
1489 
1490  /* Returns the record as Datum */
1491  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1492 }
static struct @25 LogstreamResult
#define SIGUSR1
Definition: win32.h:202
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:725
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:247
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1364
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:255
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1293
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:39
#define SIGCONT
Definition: win32.h:197
uint32 TimeLineID
Definition: xlogdefs.h:45
bool hot_standby_feedback
Definition: walreceiver.c:76
slock_t mutex
Definition: walreceiver.h:129
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:113
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
uint32 TransactionId
Definition: c.h:391
Oid GetUserId(void)
Definition: miscinit.c:284
bool update_process_title
Definition: ps_status.c:35
XLogRecPtr Write
Definition: walreceiver.c:107
int GetReplicationTransferLatency(void)
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint32(StringInfo buf, int32 i)
Definition: pqformat.h:148
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:640
sig_atomic_t force_reply
Definition: walreceiver.h:136
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10172
WalRcvState walRcvState
Definition: walreceiver.h:63
#define SIGWINCH
Definition: win32.h:201
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
static StringInfoData incoming_message
Definition: walreceiver.c:112
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
char * pstrdup(const char *in)
Definition: mcxt.c:1076
bool HotStandbyActive(void)
Definition: xlog.c:8010
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1180
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3170
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:257
#define SIGTTIN
Definition: win32.h:199
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:805
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:253
void proc_exit(int code)
Definition: ipc.c:99
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define MemSet(start, val, len)
Definition: c.h:863
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:876
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7954
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
#define PANIC
Definition: elog.h:53
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2986
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:82
int natts
Definition: tupdesc.h:73
#define SIGQUIT
Definition: win32.h:189
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
Definition: walreceiver.c:1389
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:75
Latch procLatch
Definition: proc.h:104
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
WalRcvState
Definition: walreceiver.h:43
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:111
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:514
void WalReceiverMain(void)
Definition: walreceiver.c:188
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8375
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
void pfree(void *pointer)
Definition: mcxt.c:949
#define SIG_IGN
Definition: win32.h:185
#define ERROR
Definition: elog.h:43
static void WalRcvSigUsr1Handler(SIGNAL_ARGS)
Definition: walreceiver.c:813
void on_exit_reset(void)
Definition: ipc.c:396
#define PROCARRAY_SLOTS_XMIN
Definition: procarray.h:37
void WakeupRecovery(void)
Definition: xlog.c:12187
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11177
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
#define DEBUG2
Definition: elog.h:24
Definition: latch.h:110
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:249
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10215
static char * buf
Definition: pg_test_fsync.c:67
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:447
uint64 XLogSegNo
Definition: xlogdefs.h:34
int XLogArchiveMode
Definition: xlog.c:94
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:180
#define InvalidTransactionId
Definition: transam.h:31
#define AllowCascadeReplication()
Definition: walreceiver.h:38
XLogRecPtr startpoint
Definition: walreceiver.h:146
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:258
int pgsocket
Definition: port.h:22
sigset_t UnBlockSig
Definition: pqsignal.c:22
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:207
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:98
#define ereport(elevel, rest)
Definition: elog.h:122
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
XLogRecPtr Flush
Definition: walreceiver.c:108
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
static TimeLineID recvFileTLI
Definition: walreceiver.c:90
Definition: guc.h:72
Latch * latch
Definition: walreceiver.h:127
int GetReplicationApplyDelay(void)
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
#define MAXFNAMELEN
union WalRcvStreamOptions::@97 proto
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
void * palloc0(Size size)
Definition: mcxt.c:877
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:173
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:564
uintptr_t Datum
Definition: postgres.h:372
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:313
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:80
#define PGINVALID_SOCKET
Definition: port.h:24
#define PROCARRAY_FLAGS_DEFAULT
Definition: procarray.h:50
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1111
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
bool ready_to_display
Definition: walreceiver.h:118
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
#define DEFAULT_ROLE_READ_ALL_STATS
Definition: pg_authid.h:108
int log_min_messages
Definition: guc.c:451
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:824
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
TimeLineID ThisTimeLineID
Definition: xlog.c:181
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define SIG_DFL
Definition: win32.h:183
TransactionId GetOldestXmin(Relation rel, int flags)
Definition: procarray.c:1315
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4854
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define pg_memory_barrier()
Definition: atomics.h:148
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
#define SIGNAL_ARGS
Definition: c.h:1085
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:681
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
Definition: regguts.h:298
bool proc_exit_inprogress
Definition: ipc.c:40
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static int server_version
Definition: pg_dumpall.c:82
size_t Size
Definition: c.h:350
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define walrcv_disconnect(conn)
Definition: walreceiver.h:265
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:222
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:259
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
Definition: walreceiver.c:847
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:251
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1052
WalRcvData * WalRcv
#define SIGTTOU
Definition: win32.h:200
static Datum values[MAXATTR]
Definition: bootstrap.c:164
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4728
#define Int32GetDatum(X)
Definition: postgres.h:485
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:774
struct WalRcvStreamOptions::@97::@98 physical
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void * arg
static uint32 recvOff
Definition: walreceiver.c:92
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:72
void latch_sigusr1_handler(void)
Definition: latch.c:1473
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:99
static int recvFile
Definition: walreceiver.c:89
#define SIGCHLD
Definition: win32.h:198
#define WL_LATCH_SET
Definition: latch.h:124
#define SIGALRM
Definition: win32.h:194
#define UINT64_FORMAT
Definition: c.h:301
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:208
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
#define PG_RETURN_NULL()
Definition: fmgr.h:305
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
#define SIGUSR2
Definition: win32.h:203
void WalSndWakeup(void)
Definition: walsender.c:3004
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:943
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1710
void WalRcvForceReply(void)
Definition: walreceiver.c:1346
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:243