PostgreSQL Source Code  git master
receivelog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * receivelog.c - receive WAL files using the streaming
4  * replication protocol.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/receivelog.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22 
23 #include "access/xlog_internal.h"
24 #include "common/file_utils.h"
25 #include "common/logging.h"
26 #include "libpq-fe.h"
27 #include "receivelog.h"
28 #include "streamutil.h"
29 
30 /* fd and filename for currently open WAL file */
31 static Walfile *walfile = NULL;
32 static char current_walfile_name[MAXPGPATH] = "";
33 static bool reportFlushPosition = false;
35 
36 static bool still_sending = true; /* feedback still needs to be sent? */
37 
39  XLogRecPtr *stoppos);
40 static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
41 static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
42  char **buffer);
43 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
44  int len, XLogRecPtr blockpos, TimestampTz *last_status);
45 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
46  XLogRecPtr *blockpos);
48  XLogRecPtr blockpos, XLogRecPtr *stoppos);
49 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
51  TimestampTz last_status);
52 
54  uint32 *timeline);
55 
56 static bool
57 mark_file_as_archived(StreamCtl *stream, const char *fname)
58 {
59  Walfile *f;
60  static char tmppath[MAXPGPATH];
61 
62  snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
63  fname);
64 
65  f = stream->walmethod->open_for_write(tmppath, NULL, 0);
66  if (f == NULL)
67  {
68  pg_log_error("could not create archive status file \"%s\": %s",
69  tmppath, stream->walmethod->getlasterror());
70  return false;
71  }
72 
73  stream->walmethod->close(f, CLOSE_NORMAL);
74 
75  return true;
76 }
77 
78 /*
79  * Open a new WAL file in the specified directory.
80  *
81  * Returns true if OK; on failure, returns false after printing an error msg.
82  * On success, 'walfile' is set to the FD for the file, and the base filename
83  * (without partial_suffix) is stored in 'current_walfile_name'.
84  *
85  * The file will be padded to 16Mb with zeroes.
86  */
87 static bool
88 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
89 {
90  Walfile *f;
91  char fn[MAXPGPATH];
92  ssize_t size;
93  XLogSegNo segno;
94 
95  XLByteToSeg(startpoint, segno, WalSegSz);
97 
98  snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
99  stream->partial_suffix ? stream->partial_suffix : "");
100 
101  /*
102  * When streaming to files, if an existing file exists we verify that it's
103  * either empty (just created), or a complete WalSegSz segment (in which
104  * case it has been created and padded). Anything else indicates a corrupt
105  * file.
106  *
107  * When streaming to tar, no file with this name will exist before, so we
108  * never have to verify a size.
109  */
110  if (stream->walmethod->existsfile(fn))
111  {
112  size = stream->walmethod->get_file_size(fn);
113  if (size < 0)
114  {
115  pg_log_error("could not get size of write-ahead log file \"%s\": %s",
116  fn, stream->walmethod->getlasterror());
117  return false;
118  }
119  if (size == WalSegSz)
120  {
121  /* Already padded file. Open it for use */
123  if (f == NULL)
124  {
125  pg_log_error("could not open existing write-ahead log file \"%s\": %s",
126  fn, stream->walmethod->getlasterror());
127  return false;
128  }
129 
130  /* fsync file in case of a previous crash */
131  if (stream->walmethod->sync(f) != 0)
132  {
133  pg_log_fatal("could not fsync existing write-ahead log file \"%s\": %s",
134  fn, stream->walmethod->getlasterror());
135  stream->walmethod->close(f, CLOSE_UNLINK);
136  exit(1);
137  }
138 
139  walfile = f;
140  return true;
141  }
142  if (size != 0)
143  {
144  /* if write didn't set errno, assume problem is no disk space */
145  if (errno == 0)
146  errno = ENOSPC;
147  pg_log_error(ngettext("write-ahead log file \"%s\" has %d byte, should be 0 or %d",
148  "write-ahead log file \"%s\" has %d bytes, should be 0 or %d",
149  size),
150  fn, (int) size, WalSegSz);
151  return false;
152  }
153  /* File existed and was empty, so fall through and open */
154  }
155 
156  /* No file existed, so create one */
157 
159  stream->partial_suffix, WalSegSz);
160  if (f == NULL)
161  {
162  pg_log_error("could not open write-ahead log file \"%s\": %s",
163  fn, stream->walmethod->getlasterror());
164  return false;
165  }
166 
167  walfile = f;
168  return true;
169 }
170 
171 /*
172  * Close the current WAL file (if open), and rename it to the correct
173  * filename if it's complete. On failure, prints an error message to stderr
174  * and returns false, otherwise returns true.
175  */
176 static bool
178 {
179  off_t currpos;
180  int r;
181 
182  if (walfile == NULL)
183  return true;
184 
185  currpos = stream->walmethod->get_current_pos(walfile);
186  if (currpos == -1)
187  {
188  pg_log_error("could not determine seek position in file \"%s\": %s",
190  stream->walmethod->close(walfile, CLOSE_UNLINK);
191  walfile = NULL;
192 
193  return false;
194  }
195 
196  if (stream->partial_suffix)
197  {
198  if (currpos == WalSegSz)
199  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
200  else
201  {
202  pg_log_info("not renaming \"%s%s\", segment is not complete",
204  r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
205  }
206  }
207  else
208  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
209 
210  walfile = NULL;
211 
212  if (r != 0)
213  {
214  pg_log_error("could not close file \"%s\": %s",
216  return false;
217  }
218 
219  /*
220  * Mark file as archived if requested by the caller - pg_basebackup needs
221  * to do so as files can otherwise get archived again after promotion of a
222  * new node. This is in line with walreceiver.c always doing a
223  * XLogArchiveForceDone() after a complete segment.
224  */
225  if (currpos == WalSegSz && stream->mark_done)
226  {
227  /* writes error message if failed */
229  return false;
230  }
231 
232  lastFlushPosition = pos;
233  return true;
234 }
235 
236 
237 /*
238  * Check if a timeline history file exists.
239  */
240 static bool
242 {
243  char histfname[MAXFNAMELEN];
244 
245  /*
246  * Timeline 1 never has a history file. We treat that as if it existed,
247  * since we never need to stream it.
248  */
249  if (stream->timeline == 1)
250  return true;
251 
252  TLHistoryFileName(histfname, stream->timeline);
253 
254  return stream->walmethod->existsfile(histfname);
255 }
256 
257 static bool
258 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
259 {
260  int size = strlen(content);
261  char histfname[MAXFNAMELEN];
262  Walfile *f;
263 
264  /*
265  * Check that the server's idea of how timeline history files should be
266  * named matches ours.
267  */
268  TLHistoryFileName(histfname, stream->timeline);
269  if (strcmp(histfname, filename) != 0)
270  {
271  pg_log_error("server reported unexpected history file name for timeline %u: %s",
272  stream->timeline, filename);
273  return false;
274  }
275 
276  f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
277  if (f == NULL)
278  {
279  pg_log_error("could not create timeline history file \"%s\": %s",
280  histfname, stream->walmethod->getlasterror());
281  return false;
282  }
283 
284  if ((int) stream->walmethod->write(f, content, size) != size)
285  {
286  pg_log_error("could not write timeline history file \"%s\": %s",
287  histfname, stream->walmethod->getlasterror());
288 
289  /*
290  * If we fail to make the file, delete it to release disk space
291  */
292  stream->walmethod->close(f, CLOSE_UNLINK);
293 
294  return false;
295  }
296 
297  if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
298  {
299  pg_log_error("could not close file \"%s\": %s",
300  histfname, stream->walmethod->getlasterror());
301  return false;
302  }
303 
304  /* Maintain archive_status, check close_walfile() for details. */
305  if (stream->mark_done)
306  {
307  /* writes error message if failed */
308  if (!mark_file_as_archived(stream, histfname))
309  return false;
310  }
311 
312  return true;
313 }
314 
315 /*
316  * Send a Standby Status Update message to server.
317  */
318 static bool
319 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
320 {
321  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
322  int len = 0;
323 
324  replybuf[len] = 'r';
325  len += 1;
326  fe_sendint64(blockpos, &replybuf[len]); /* write */
327  len += 8;
329  fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
330  else
331  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
332  len += 8;
333  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
334  len += 8;
335  fe_sendint64(now, &replybuf[len]); /* sendTime */
336  len += 8;
337  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
338  len += 1;
339 
340  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
341  {
342  pg_log_error("could not send feedback packet: %s",
343  PQerrorMessage(conn));
344  return false;
345  }
346 
347  return true;
348 }
349 
350 /*
351  * Check that the server version we're connected to is supported by
352  * ReceiveXlogStream().
353  *
354  * If it's not, an error message is printed to stderr, and false is returned.
355  */
356 bool
358 {
359  int minServerMajor,
360  maxServerMajor;
361  int serverMajor;
362 
363  /*
364  * The message format used in streaming replication changed in 9.3, so we
365  * cannot stream from older servers. And we don't support servers newer
366  * than the client; it might work, but we don't know, so err on the safe
367  * side.
368  */
369  minServerMajor = 903;
370  maxServerMajor = PG_VERSION_NUM / 100;
371  serverMajor = PQserverVersion(conn) / 100;
372  if (serverMajor < minServerMajor)
373  {
374  const char *serverver = PQparameterStatus(conn, "server_version");
375 
376  pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
377  serverver ? serverver : "'unknown'",
378  "9.3");
379  return false;
380  }
381  else if (serverMajor > maxServerMajor)
382  {
383  const char *serverver = PQparameterStatus(conn, "server_version");
384 
385  pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
386  serverver ? serverver : "'unknown'",
387  PG_VERSION);
388  return false;
389  }
390  return true;
391 }
392 
393 /*
394  * Receive a log stream starting at the specified position.
395  *
396  * Individual parameters are passed through the StreamCtl structure.
397  *
398  * If sysidentifier is specified, validate that both the system
399  * identifier and the timeline matches the specified ones
400  * (by sending an extra IDENTIFY_SYSTEM command)
401  *
402  * All received segments will be written to the directory
403  * specified by basedir. This will also fetch any missing timeline history
404  * files.
405  *
406  * The stream_stop callback will be called every time data
407  * is received, and whenever a segment is completed. If it returns
408  * true, the streaming will stop and the function
409  * return. As long as it returns false, streaming will continue
410  * indefinitely.
411  *
412  * If stream_stop() checks for external input, stop_socket should be set to
413  * the FD it checks. This will allow such input to be detected promptly
414  * rather than after standby_message_timeout (which might be indefinite).
415  * Note that signals will interrupt waits for input as well, but that is
416  * race-y since a signal received while busy won't interrupt the wait.
417  *
418  * standby_message_timeout controls how often we send a message
419  * back to the primary letting it know our progress, in milliseconds.
420  * Zero means no messages are sent.
421  * This message will only contain the write location, and never
422  * flush or replay.
423  *
424  * If 'partial_suffix' is not NULL, files are initially created with the
425  * given suffix, and the suffix is removed once the file is finished. That
426  * allows you to tell the difference between partial and completed files,
427  * so that you can continue later where you left.
428  *
429  * If 'synchronous' is true, the received WAL is flushed as soon as written,
430  * otherwise only when the WAL file is closed.
431  *
432  * Note: The WAL location *must* be at a log segment start!
433  */
434 bool
436 {
437  char query[128];
438  char slotcmd[128];
439  PGresult *res;
440  XLogRecPtr stoppos;
441 
442  /*
443  * The caller should've checked the server version already, but doesn't do
444  * any harm to check it here too.
445  */
447  return false;
448 
449  /*
450  * Decide whether we want to report the flush position. If we report the
451  * flush position, the primary will know what WAL we'll possibly
452  * re-request, and it can then remove older WAL safely. We must always do
453  * that when we are using slots.
454  *
455  * Reporting the flush position makes one eligible as a synchronous
456  * replica. People shouldn't include generic names in
457  * synchronous_standby_names, but we've protected them against it so far,
458  * so let's continue to do so unless specifically requested.
459  */
460  if (stream->replication_slot != NULL)
461  {
462  reportFlushPosition = true;
463  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
464  }
465  else
466  {
467  if (stream->synchronous)
468  reportFlushPosition = true;
469  else
470  reportFlushPosition = false;
471  slotcmd[0] = 0;
472  }
473 
474  if (stream->sysidentifier != NULL)
475  {
476  /* Validate system identifier hasn't changed */
477  res = PQexec(conn, "IDENTIFY_SYSTEM");
478  if (PQresultStatus(res) != PGRES_TUPLES_OK)
479  {
480  pg_log_error("could not send replication command \"%s\": %s",
481  "IDENTIFY_SYSTEM", PQerrorMessage(conn));
482  PQclear(res);
483  return false;
484  }
485  if (PQntuples(res) != 1 || PQnfields(res) < 3)
486  {
487  pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
488  PQntuples(res), PQnfields(res), 1, 3);
489  PQclear(res);
490  return false;
491  }
492  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
493  {
494  pg_log_error("system identifier does not match between base backup and streaming connection");
495  PQclear(res);
496  return false;
497  }
498  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
499  {
500  pg_log_error("starting timeline %u is not present in the server",
501  stream->timeline);
502  PQclear(res);
503  return false;
504  }
505  PQclear(res);
506  }
507 
508  /*
509  * initialize flush position to starting point, it's the caller's
510  * responsibility that that's sane.
511  */
512  lastFlushPosition = stream->startpos;
513 
514  while (1)
515  {
516  /*
517  * Fetch the timeline history file for this timeline, if we don't have
518  * it already. When streaming log to tar, this will always return
519  * false, as we are never streaming into an existing file and
520  * therefore there can be no pre-existing timeline history file.
521  */
522  if (!existsTimeLineHistoryFile(stream))
523  {
524  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
525  res = PQexec(conn, query);
526  if (PQresultStatus(res) != PGRES_TUPLES_OK)
527  {
528  /* FIXME: we might send it ok, but get an error */
529  pg_log_error("could not send replication command \"%s\": %s",
530  "TIMELINE_HISTORY", PQresultErrorMessage(res));
531  PQclear(res);
532  return false;
533  }
534 
535  /*
536  * The response to TIMELINE_HISTORY is a single row result set
537  * with two fields: filename and content
538  */
539  if (PQnfields(res) != 2 || PQntuples(res) != 1)
540  {
541  pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
542  PQntuples(res), PQnfields(res), 1, 2);
543  }
544 
545  /* Write the history file to disk */
547  PQgetvalue(res, 0, 0),
548  PQgetvalue(res, 0, 1));
549 
550  PQclear(res);
551  }
552 
553  /*
554  * Before we start streaming from the requested location, check if the
555  * callback tells us to stop here.
556  */
557  if (stream->stream_stop(stream->startpos, stream->timeline, false))
558  return true;
559 
560  /* Initiate the replication stream at specified location */
561  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
562  slotcmd,
563  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
564  stream->timeline);
565  res = PQexec(conn, query);
566  if (PQresultStatus(res) != PGRES_COPY_BOTH)
567  {
568  pg_log_error("could not send replication command \"%s\": %s",
569  "START_REPLICATION", PQresultErrorMessage(res));
570  PQclear(res);
571  return false;
572  }
573  PQclear(res);
574 
575  /* Stream the WAL */
576  res = HandleCopyStream(conn, stream, &stoppos);
577  if (res == NULL)
578  goto error;
579 
580  /*
581  * Streaming finished.
582  *
583  * There are two possible reasons for that: a controlled shutdown, or
584  * we reached the end of the current timeline. In case of
585  * end-of-timeline, the server sends a result set after Copy has
586  * finished, containing information about the next timeline. Read
587  * that, and restart streaming from the next timeline. In case of
588  * controlled shutdown, stop here.
589  */
590  if (PQresultStatus(res) == PGRES_TUPLES_OK)
591  {
592  /*
593  * End-of-timeline. Read the next timeline's ID and starting
594  * position. Usually, the starting position will match the end of
595  * the previous timeline, but there are corner cases like if the
596  * server had sent us half of a WAL record, when it was promoted.
597  * The new timeline will begin at the end of the last complete
598  * record in that case, overlapping the partial WAL record on the
599  * old timeline.
600  */
601  uint32 newtimeline;
602  bool parsed;
603 
604  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
605  PQclear(res);
606  if (!parsed)
607  goto error;
608 
609  /* Sanity check the values the server gave us */
610  if (newtimeline <= stream->timeline)
611  {
612  pg_log_error("server reported unexpected next timeline %u, following timeline %u",
613  newtimeline, stream->timeline);
614  goto error;
615  }
616  if (stream->startpos > stoppos)
617  {
618  pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
619  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
620  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
621  goto error;
622  }
623 
624  /* Read the final result, which should be CommandComplete. */
625  res = PQgetResult(conn);
626  if (PQresultStatus(res) != PGRES_COMMAND_OK)
627  {
628  pg_log_error("unexpected termination of replication stream: %s",
629  PQresultErrorMessage(res));
630  PQclear(res);
631  goto error;
632  }
633  PQclear(res);
634 
635  /*
636  * Loop back to start streaming from the new timeline. Always
637  * start streaming at the beginning of a segment.
638  */
639  stream->timeline = newtimeline;
640  stream->startpos = stream->startpos -
642  continue;
643  }
644  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
645  {
646  PQclear(res);
647 
648  /*
649  * End of replication (ie. controlled shut down of the server).
650  *
651  * Check if the callback thinks it's OK to stop here. If not,
652  * complain.
653  */
654  if (stream->stream_stop(stoppos, stream->timeline, false))
655  return true;
656  else
657  {
658  pg_log_error("replication stream was terminated before stop point");
659  goto error;
660  }
661  }
662  else
663  {
664  /* Server returned an error. */
665  pg_log_error("unexpected termination of replication stream: %s",
666  PQresultErrorMessage(res));
667  PQclear(res);
668  goto error;
669  }
670  }
671 
672 error:
673  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
674  pg_log_error("could not close file \"%s\": %s",
676  walfile = NULL;
677  return false;
678 }
679 
680 /*
681  * Helper function to parse the result set returned by server after streaming
682  * has finished. On failure, prints an error to stderr and returns false.
683  */
684 static bool
686 {
687  uint32 startpos_xlogid,
688  startpos_xrecoff;
689 
690  /*----------
691  * The result set consists of one row and two columns, e.g:
692  *
693  * next_tli | next_tli_startpos
694  * ----------+-------------------
695  * 4 | 0/9949AE0
696  *
697  * next_tli is the timeline ID of the next timeline after the one that
698  * just finished streaming. next_tli_startpos is the WAL location where
699  * the server switched to it.
700  *----------
701  */
702  if (PQnfields(res) < 2 || PQntuples(res) != 1)
703  {
704  pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
705  PQntuples(res), PQnfields(res), 1, 2);
706  return false;
707  }
708 
709  *timeline = atoi(PQgetvalue(res, 0, 0));
710  if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
711  &startpos_xrecoff) != 2)
712  {
713  pg_log_error("could not parse next timeline's starting point \"%s\"",
714  PQgetvalue(res, 0, 1));
715  return false;
716  }
717  *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
718 
719  return true;
720 }
721 
722 /*
723  * The main loop of ReceiveXlogStream. Handles the COPY stream after
724  * initiating streaming with the START_REPLICATION command.
725  *
726  * If the COPY ends (not necessarily successfully) due a message from the
727  * server, returns a PGresult and sets *stoppos to the last byte written.
728  * On any other sort of error, returns NULL.
729  */
730 static PGresult *
732  XLogRecPtr *stoppos)
733 {
734  char *copybuf = NULL;
735  TimestampTz last_status = -1;
736  XLogRecPtr blockpos = stream->startpos;
737 
738  still_sending = true;
739 
740  while (1)
741  {
742  int r;
744  long sleeptime;
745 
746  /*
747  * Check if we should continue streaming, or abort at this point.
748  */
749  if (!CheckCopyStreamStop(conn, stream, blockpos))
750  goto error;
751 
752  now = feGetCurrentTimestamp();
753 
754  /*
755  * If synchronous option is true, issue sync command as soon as there
756  * are WAL data which has not been flushed yet.
757  */
758  if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
759  {
760  if (stream->walmethod->sync(walfile) != 0)
761  {
762  pg_log_fatal("could not fsync file \"%s\": %s",
764  exit(1);
765  }
766  lastFlushPosition = blockpos;
767 
768  /*
769  * Send feedback so that the server sees the latest WAL locations
770  * immediately.
771  */
772  if (!sendFeedback(conn, blockpos, now, false))
773  goto error;
774  last_status = now;
775  }
776 
777  /*
778  * Potentially send a status message to the primary
779  */
780  if (still_sending && stream->standby_message_timeout > 0 &&
781  feTimestampDifferenceExceeds(last_status, now,
782  stream->standby_message_timeout))
783  {
784  /* Time to send feedback! */
785  if (!sendFeedback(conn, blockpos, now, false))
786  goto error;
787  last_status = now;
788  }
789 
790  /*
791  * Calculate how long send/receive loops should sleep
792  */
793  sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
794  last_status);
795 
796  r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
797  while (r != 0)
798  {
799  if (r == -1)
800  goto error;
801  if (r == -2)
802  {
803  PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
804 
805  if (res == NULL)
806  goto error;
807  else
808  return res;
809  }
810 
811  /* Check the message type. */
812  if (copybuf[0] == 'k')
813  {
814  if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
815  &last_status))
816  goto error;
817  }
818  else if (copybuf[0] == 'w')
819  {
820  if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
821  goto error;
822 
823  /*
824  * Check if we should continue streaming, or abort at this
825  * point.
826  */
827  if (!CheckCopyStreamStop(conn, stream, blockpos))
828  goto error;
829  }
830  else
831  {
832  pg_log_error("unrecognized streaming header: \"%c\"",
833  copybuf[0]);
834  goto error;
835  }
836 
837  /*
838  * Process the received data, and any subsequent data we can read
839  * without blocking.
840  */
841  r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
842  }
843  }
844 
845 error:
846  if (copybuf != NULL)
847  PQfreemem(copybuf);
848  return NULL;
849 }
850 
851 /*
852  * Wait until we can read a CopyData message,
853  * or timeout, or occurrence of a signal or input on the stop_socket.
854  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
855  *
856  * Returns 1 if data has become available for reading, 0 if timed out
857  * or interrupted by signal or stop_socket input, and -1 on an error.
858  */
859 static int
860 CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
861 {
862  int ret;
863  fd_set input_mask;
864  int connsocket;
865  int maxfd;
866  struct timeval timeout;
867  struct timeval *timeoutptr;
868 
869  connsocket = PQsocket(conn);
870  if (connsocket < 0)
871  {
872  pg_log_error("invalid socket: %s", PQerrorMessage(conn));
873  return -1;
874  }
875 
876  FD_ZERO(&input_mask);
877  FD_SET(connsocket, &input_mask);
878  maxfd = connsocket;
879  if (stop_socket != PGINVALID_SOCKET)
880  {
881  FD_SET(stop_socket, &input_mask);
882  maxfd = Max(maxfd, stop_socket);
883  }
884 
885  if (timeout_ms < 0)
886  timeoutptr = NULL;
887  else
888  {
889  timeout.tv_sec = timeout_ms / 1000L;
890  timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
891  timeoutptr = &timeout;
892  }
893 
894  ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
895 
896  if (ret < 0)
897  {
898  if (errno == EINTR)
899  return 0; /* Got a signal, so not an error */
900  pg_log_error("select() failed: %m");
901  return -1;
902  }
903  if (ret > 0 && FD_ISSET(connsocket, &input_mask))
904  return 1; /* Got input on connection socket */
905 
906  return 0; /* Got timeout or input on stop_socket */
907 }
908 
909 /*
910  * Receive CopyData message available from XLOG stream, blocking for
911  * maximum of 'timeout' ms.
912  *
913  * If data was received, returns the length of the data. *buffer is set to
914  * point to a buffer holding the received message. The buffer is only valid
915  * until the next CopyStreamReceive call.
916  *
917  * Returns 0 if no data was available within timeout, or if wait was
918  * interrupted by signal or stop_socket input.
919  * -1 on error. -2 if the server ended the COPY.
920  */
921 static int
922 CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
923  char **buffer)
924 {
925  char *copybuf = NULL;
926  int rawlen;
927 
928  if (*buffer != NULL)
929  PQfreemem(*buffer);
930  *buffer = NULL;
931 
932  /* Try to receive a CopyData message */
933  rawlen = PQgetCopyData(conn, &copybuf, 1);
934  if (rawlen == 0)
935  {
936  int ret;
937 
938  /*
939  * No data available. Wait for some to appear, but not longer than
940  * the specified timeout, so that we can ping the server. Also stop
941  * waiting if input appears on stop_socket.
942  */
943  ret = CopyStreamPoll(conn, timeout, stop_socket);
944  if (ret <= 0)
945  return ret;
946 
947  /* Now there is actually data on the socket */
948  if (PQconsumeInput(conn) == 0)
949  {
950  pg_log_error("could not receive data from WAL stream: %s",
951  PQerrorMessage(conn));
952  return -1;
953  }
954 
955  /* Now that we've consumed some input, try again */
956  rawlen = PQgetCopyData(conn, &copybuf, 1);
957  if (rawlen == 0)
958  return 0;
959  }
960  if (rawlen == -1) /* end-of-streaming or error */
961  return -2;
962  if (rawlen == -2)
963  {
964  pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
965  return -1;
966  }
967 
968  /* Return received messages to caller */
969  *buffer = copybuf;
970  return rawlen;
971 }
972 
973 /*
974  * Process the keepalive message.
975  */
976 static bool
977 ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
978  XLogRecPtr blockpos, TimestampTz *last_status)
979 {
980  int pos;
981  bool replyRequested;
983 
984  /*
985  * Parse the keepalive message, enclosed in the CopyData message. We just
986  * check if the server requested a reply, and ignore the rest.
987  */
988  pos = 1; /* skip msgtype 'k' */
989  pos += 8; /* skip walEnd */
990  pos += 8; /* skip sendTime */
991 
992  if (len < pos + 1)
993  {
994  pg_log_error("streaming header too small: %d", len);
995  return false;
996  }
997  replyRequested = copybuf[pos];
998 
999  /* If the server requested an immediate reply, send one. */
1000  if (replyRequested && still_sending)
1001  {
1002  if (reportFlushPosition && lastFlushPosition < blockpos &&
1003  walfile != NULL)
1004  {
1005  /*
1006  * If a valid flush location needs to be reported, flush the
1007  * current WAL file so that the latest flush location is sent back
1008  * to the server. This is necessary to see whether the last WAL
1009  * data has been successfully replicated or not, at the normal
1010  * shutdown of the server.
1011  */
1012  if (stream->walmethod->sync(walfile) != 0)
1013  {
1014  pg_log_fatal("could not fsync file \"%s\": %s",
1016  exit(1);
1017  }
1018  lastFlushPosition = blockpos;
1019  }
1020 
1021  now = feGetCurrentTimestamp();
1022  if (!sendFeedback(conn, blockpos, now, false))
1023  return false;
1024  *last_status = now;
1025  }
1026 
1027  return true;
1028 }
1029 
1030 /*
1031  * Process XLogData message.
1032  */
1033 static bool
1034 ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1035  XLogRecPtr *blockpos)
1036 {
1037  int xlogoff;
1038  int bytes_left;
1039  int bytes_written;
1040  int hdr_len;
1041 
1042  /*
1043  * Once we've decided we don't want to receive any more, just ignore any
1044  * subsequent XLogData messages.
1045  */
1046  if (!(still_sending))
1047  return true;
1048 
1049  /*
1050  * Read the header of the XLogData message, enclosed in the CopyData
1051  * message. We only need the WAL location field (dataStart), the rest of
1052  * the header is ignored.
1053  */
1054  hdr_len = 1; /* msgtype 'w' */
1055  hdr_len += 8; /* dataStart */
1056  hdr_len += 8; /* walEnd */
1057  hdr_len += 8; /* sendTime */
1058  if (len < hdr_len)
1059  {
1060  pg_log_error("streaming header too small: %d", len);
1061  return false;
1062  }
1063  *blockpos = fe_recvint64(&copybuf[1]);
1064 
1065  /* Extract WAL location for this block */
1066  xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1067 
1068  /*
1069  * Verify that the initial location in the stream matches where we think
1070  * we are.
1071  */
1072  if (walfile == NULL)
1073  {
1074  /* No file open yet */
1075  if (xlogoff != 0)
1076  {
1077  pg_log_error("received write-ahead log record for offset %u with no file open",
1078  xlogoff);
1079  return false;
1080  }
1081  }
1082  else
1083  {
1084  /* More data in existing segment */
1085  if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1086  {
1087  pg_log_error("got WAL data offset %08x, expected %08x",
1088  xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1089  return false;
1090  }
1091  }
1092 
1093  bytes_left = len - hdr_len;
1094  bytes_written = 0;
1095 
1096  while (bytes_left)
1097  {
1098  int bytes_to_write;
1099 
1100  /*
1101  * If crossing a WAL boundary, only write up until we reach wal
1102  * segment size.
1103  */
1104  if (xlogoff + bytes_left > WalSegSz)
1105  bytes_to_write = WalSegSz - xlogoff;
1106  else
1107  bytes_to_write = bytes_left;
1108 
1109  if (walfile == NULL)
1110  {
1111  if (!open_walfile(stream, *blockpos))
1112  {
1113  /* Error logged by open_walfile */
1114  return false;
1115  }
1116  }
1117 
1118  if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1119  bytes_to_write) != bytes_to_write)
1120  {
1121  pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
1122  bytes_to_write, current_walfile_name,
1123  stream->walmethod->getlasterror());
1124  return false;
1125  }
1126 
1127  /* Write was successful, advance our position */
1128  bytes_written += bytes_to_write;
1129  bytes_left -= bytes_to_write;
1130  *blockpos += bytes_to_write;
1131  xlogoff += bytes_to_write;
1132 
1133  /* Did we reach the end of a WAL segment? */
1134  if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1135  {
1136  if (!close_walfile(stream, *blockpos))
1137  /* Error message written in close_walfile() */
1138  return false;
1139 
1140  xlogoff = 0;
1141 
1142  if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1143  {
1144  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1145  {
1146  pg_log_error("could not send copy-end packet: %s",
1147  PQerrorMessage(conn));
1148  return false;
1149  }
1150  still_sending = false;
1151  return true; /* ignore the rest of this XLogData packet */
1152  }
1153  }
1154  }
1155  /* No more data left to write, receive next copy packet */
1156 
1157  return true;
1158 }
1159 
1160 /*
1161  * Handle end of the copy stream.
1162  */
1163 static PGresult *
1165  XLogRecPtr blockpos, XLogRecPtr *stoppos)
1166 {
1167  PGresult *res = PQgetResult(conn);
1168 
1169  /*
1170  * The server closed its end of the copy stream. If we haven't closed
1171  * ours already, we need to do so now, unless the server threw an error,
1172  * in which case we don't.
1173  */
1174  if (still_sending)
1175  {
1176  if (!close_walfile(stream, blockpos))
1177  {
1178  /* Error message written in close_walfile() */
1179  PQclear(res);
1180  return NULL;
1181  }
1182  if (PQresultStatus(res) == PGRES_COPY_IN)
1183  {
1184  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1185  {
1186  pg_log_error("could not send copy-end packet: %s",
1187  PQerrorMessage(conn));
1188  PQclear(res);
1189  return NULL;
1190  }
1191  res = PQgetResult(conn);
1192  }
1193  still_sending = false;
1194  }
1195  if (copybuf != NULL)
1196  PQfreemem(copybuf);
1197  *stoppos = blockpos;
1198  return res;
1199 }
1200 
1201 /*
1202  * Check if we should continue streaming, or abort at this point.
1203  */
1204 static bool
1206 {
1207  if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1208  {
1209  if (!close_walfile(stream, blockpos))
1210  {
1211  /* Potential error message is written by close_walfile */
1212  return false;
1213  }
1214  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1215  {
1216  pg_log_error("could not send copy-end packet: %s",
1217  PQerrorMessage(conn));
1218  return false;
1219  }
1220  still_sending = false;
1221  }
1222 
1223  return true;
1224 }
1225 
1226 /*
1227  * Calculate how long send/receive loops should sleep
1228  */
1229 static long
1231  TimestampTz last_status)
1232 {
1233  TimestampTz status_targettime = 0;
1234  long sleeptime;
1235 
1236  if (standby_message_timeout && still_sending)
1237  status_targettime = last_status +
1238  (standby_message_timeout - 1) * ((int64) 1000);
1239 
1240  if (status_targettime > 0)
1241  {
1242  long secs;
1243  int usecs;
1244 
1246  status_targettime,
1247  &secs,
1248  &usecs);
1249  /* Always sleep at least 1 sec */
1250  if (secs <= 0)
1251  {
1252  secs = 1;
1253  usecs = 0;
1254  }
1255 
1256  sleeptime = secs * 1000 + usecs / 1000;
1257  }
1258  else
1259  sleeptime = -1;
1260 
1261  return sleeptime;
1262 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2317
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2777
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:88
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6669
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
Definition: receivelog.c:860
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:608
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3163
void * Walfile
Definition: walmethods.h:13
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
static void error(void)
Definition: sql-dyntest.c:147
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6634
static bool still_sending
Definition: receivelog.c:36
int64 TimestampTz
Definition: timestamp.h:39
#define pg_log_error(...)
Definition: logging.h:80
char * sysidentifier
Definition: receivelog.h:33
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2384
XLogRecPtr startpos
Definition: receivelog.h:31
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1230
int sleeptime
Definition: pg_standby.c:41
char * partial_suffix
Definition: receivelog.h:47
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6659
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2769
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:649
int(* sync)(Walfile f)
Definition: walmethods.h:67
TimeLineID timeline
Definition: receivelog.h:32
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2473
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:977
const char *(* getlasterror)(void)
Definition: walmethods.h:78
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:62
#define sprintf
Definition: port.h:217
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
Definition: receivelog.c:922
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:685
PGconn * conn
Definition: streamutil.c:54
#define MAXPGPATH
static Walfile * walfile
Definition: receivelog.c:31
int PQflush(PGconn *conn)
Definition: fe-exec.c:3283
char * replication_slot
Definition: receivelog.h:48
bool mark_done
Definition: receivelog.h:37
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:627
#define TLHistoryFileName(fname, tli)
static bool reportFlushPosition
Definition: receivelog.c:33
uint64 XLogSegNo
Definition: xlogdefs.h:41
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:59
#define select(n, r, w, e, timeout)
Definition: win32_port.h:464
unsigned int uint32
Definition: c.h:429
int pgsocket
Definition: port.h:31
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:177
stream_stop_callback stream_stop
Definition: receivelog.h:41
WalWriteMethod * walmethod
Definition: receivelog.h:46
#define MAXFNAMELEN
static int standby_message_timeout
#define ngettext(s, p, n)
Definition: c.h:1178
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:319
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1164
#define PGINVALID_SOCKET
Definition: port.h:33
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1034
StringInfo copybuf
Definition: tablesync.c:113
void PQclear(PGresult *res)
Definition: fe-exec.c:694
static void * fn(void *arg)
#define Max(x, y)
Definition: c.h:976
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:731
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:241
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
Definition: receivelog.c:435
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static XLogRecPtr startpos
bool synchronous
Definition: receivelog.h:36
pgsocket stop_socket
Definition: receivelog.h:43
int WalSegSz
Definition: pg_standby.c:38
static char * filename
Definition: pg_dumpall.c:91
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:50
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2708
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:41
int64 fe_recvint64(char *buf)
Definition: streamutil.c:673
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:57
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
Definition: receivelog.c:1205
int standby_message_timeout
Definition: receivelog.h:35
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1939
ssize_t(* get_file_size)(const char *pathname)
Definition: walmethods.h:53
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:662
#define EINTR
Definition: win32_port.h:343
#define pg_log_warning(...)
Definition: pgfnames.c:24
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
#define snprintf
Definition: port.h:215
void PQfreemem(void *ptr)
Definition: fe-exec.c:3296
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6687
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
#define pg_log_info(...)
Definition: logging.h:88
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:258
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:357
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define pg_log_fatal(...)
Definition: logging.h:76