PostgreSQL Source Code  git master
receivelog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * receivelog.c - receive WAL files using the streaming
4  * replication protocol.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/receivelog.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22 
23 #include "access/xlog_internal.h"
24 #include "common/file_utils.h"
25 #include "common/logging.h"
26 #include "libpq-fe.h"
27 #include "receivelog.h"
28 #include "streamutil.h"
29 
30 /* fd and filename for currently open WAL file */
31 static Walfile *walfile = NULL;
32 static char current_walfile_name[MAXPGPATH] = "";
33 static bool reportFlushPosition = false;
35 
36 static bool still_sending = true; /* feedback still needs to be sent? */
37 
39  XLogRecPtr *stoppos);
40 static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
41 static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
42  char **buffer);
43 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
44  int len, XLogRecPtr blockpos, TimestampTz *last_status);
45 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
46  XLogRecPtr *blockpos);
48  XLogRecPtr blockpos, XLogRecPtr *stoppos);
49 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
50  XLogRecPtr *stoppos);
52  TimestampTz last_status);
53 
55  uint32 *timeline);
56 
57 static bool
58 mark_file_as_archived(StreamCtl *stream, const char *fname)
59 {
60  Walfile *f;
61  static char tmppath[MAXPGPATH];
62 
63  snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
64  fname);
65 
66  f = stream->walmethod->open_for_write(tmppath, NULL, 0);
67  if (f == NULL)
68  {
69  pg_log_error("could not create archive status file \"%s\": %s",
70  tmppath, stream->walmethod->getlasterror());
71  return false;
72  }
73 
74  stream->walmethod->close(f, CLOSE_NORMAL);
75 
76  return true;
77 }
78 
79 /*
80  * Open a new WAL file in the specified directory.
81  *
82  * Returns true if OK; on failure, returns false after printing an error msg.
83  * On success, 'walfile' is set to the FD for the file, and the base filename
84  * (without partial_suffix) is stored in 'current_walfile_name'.
85  *
86  * The file will be padded to 16Mb with zeroes.
87  */
88 static bool
89 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
90 {
91  Walfile *f;
92  char fn[MAXPGPATH];
93  ssize_t size;
94  XLogSegNo segno;
95 
96  XLByteToSeg(startpoint, segno, WalSegSz);
98 
99  snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
100  stream->partial_suffix ? stream->partial_suffix : "");
101 
102  /*
103  * When streaming to files, if an existing file exists we verify that it's
104  * either empty (just created), or a complete WalSegSz segment (in which
105  * case it has been created and padded). Anything else indicates a corrupt
106  * file.
107  *
108  * When streaming to tar, no file with this name will exist before, so we
109  * never have to verify a size.
110  */
111  if (stream->walmethod->existsfile(fn))
112  {
113  size = stream->walmethod->get_file_size(fn);
114  if (size < 0)
115  {
116  pg_log_error("could not get size of write-ahead log file \"%s\": %s",
117  fn, stream->walmethod->getlasterror());
118  return false;
119  }
120  if (size == WalSegSz)
121  {
122  /* Already padded file. Open it for use */
124  if (f == NULL)
125  {
126  pg_log_error("could not open existing write-ahead log file \"%s\": %s",
127  fn, stream->walmethod->getlasterror());
128  return false;
129  }
130 
131  /* fsync file in case of a previous crash */
132  if (stream->walmethod->sync(f) != 0)
133  {
134  pg_log_fatal("could not fsync existing write-ahead log file \"%s\": %s",
135  fn, stream->walmethod->getlasterror());
136  stream->walmethod->close(f, CLOSE_UNLINK);
137  exit(1);
138  }
139 
140  walfile = f;
141  return true;
142  }
143  if (size != 0)
144  {
145  /* if write didn't set errno, assume problem is no disk space */
146  if (errno == 0)
147  errno = ENOSPC;
148  pg_log_error(ngettext("write-ahead log file \"%s\" has %d byte, should be 0 or %d",
149  "write-ahead log file \"%s\" has %d bytes, should be 0 or %d",
150  size),
151  fn, (int) size, WalSegSz);
152  return false;
153  }
154  /* File existed and was empty, so fall through and open */
155  }
156 
157  /* No file existed, so create one */
158 
160  stream->partial_suffix, WalSegSz);
161  if (f == NULL)
162  {
163  pg_log_error("could not open write-ahead log file \"%s\": %s",
164  fn, stream->walmethod->getlasterror());
165  return false;
166  }
167 
168  walfile = f;
169  return true;
170 }
171 
172 /*
173  * Close the current WAL file (if open), and rename it to the correct
174  * filename if it's complete. On failure, prints an error message to stderr
175  * and returns false, otherwise returns true.
176  */
177 static bool
179 {
180  off_t currpos;
181  int r;
182 
183  if (walfile == NULL)
184  return true;
185 
186  currpos = stream->walmethod->get_current_pos(walfile);
187  if (currpos == -1)
188  {
189  pg_log_error("could not determine seek position in file \"%s\": %s",
191  stream->walmethod->close(walfile, CLOSE_UNLINK);
192  walfile = NULL;
193 
194  return false;
195  }
196 
197  if (stream->partial_suffix)
198  {
199  if (currpos == WalSegSz)
200  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
201  else
202  {
203  pg_log_info("not renaming \"%s%s\", segment is not complete",
205  r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
206  }
207  }
208  else
209  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
210 
211  walfile = NULL;
212 
213  if (r != 0)
214  {
215  pg_log_error("could not close file \"%s\": %s",
217  return false;
218  }
219 
220  /*
221  * Mark file as archived if requested by the caller - pg_basebackup needs
222  * to do so as files can otherwise get archived again after promotion of a
223  * new node. This is in line with walreceiver.c always doing a
224  * XLogArchiveForceDone() after a complete segment.
225  */
226  if (currpos == WalSegSz && stream->mark_done)
227  {
228  /* writes error message if failed */
230  return false;
231  }
232 
233  lastFlushPosition = pos;
234  return true;
235 }
236 
237 
238 /*
239  * Check if a timeline history file exists.
240  */
241 static bool
243 {
244  char histfname[MAXFNAMELEN];
245 
246  /*
247  * Timeline 1 never has a history file. We treat that as if it existed,
248  * since we never need to stream it.
249  */
250  if (stream->timeline == 1)
251  return true;
252 
253  TLHistoryFileName(histfname, stream->timeline);
254 
255  return stream->walmethod->existsfile(histfname);
256 }
257 
258 static bool
259 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
260 {
261  int size = strlen(content);
262  char histfname[MAXFNAMELEN];
263  Walfile *f;
264 
265  /*
266  * Check that the server's idea of how timeline history files should be
267  * named matches ours.
268  */
269  TLHistoryFileName(histfname, stream->timeline);
270  if (strcmp(histfname, filename) != 0)
271  {
272  pg_log_error("server reported unexpected history file name for timeline %u: %s",
273  stream->timeline, filename);
274  return false;
275  }
276 
277  f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
278  if (f == NULL)
279  {
280  pg_log_error("could not create timeline history file \"%s\": %s",
281  histfname, stream->walmethod->getlasterror());
282  return false;
283  }
284 
285  if ((int) stream->walmethod->write(f, content, size) != size)
286  {
287  pg_log_error("could not write timeline history file \"%s\": %s",
288  histfname, stream->walmethod->getlasterror());
289 
290  /*
291  * If we fail to make the file, delete it to release disk space
292  */
293  stream->walmethod->close(f, CLOSE_UNLINK);
294 
295  return false;
296  }
297 
298  if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
299  {
300  pg_log_error("could not close file \"%s\": %s",
301  histfname, stream->walmethod->getlasterror());
302  return false;
303  }
304 
305  /* Maintain archive_status, check close_walfile() for details. */
306  if (stream->mark_done)
307  {
308  /* writes error message if failed */
309  if (!mark_file_as_archived(stream, histfname))
310  return false;
311  }
312 
313  return true;
314 }
315 
316 /*
317  * Send a Standby Status Update message to server.
318  */
319 static bool
320 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
321 {
322  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
323  int len = 0;
324 
325  replybuf[len] = 'r';
326  len += 1;
327  fe_sendint64(blockpos, &replybuf[len]); /* write */
328  len += 8;
330  fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
331  else
332  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
333  len += 8;
334  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
335  len += 8;
336  fe_sendint64(now, &replybuf[len]); /* sendTime */
337  len += 8;
338  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
339  len += 1;
340 
341  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
342  {
343  pg_log_error("could not send feedback packet: %s",
344  PQerrorMessage(conn));
345  return false;
346  }
347 
348  return true;
349 }
350 
351 /*
352  * Check that the server version we're connected to is supported by
353  * ReceiveXlogStream().
354  *
355  * If it's not, an error message is printed to stderr, and false is returned.
356  */
357 bool
359 {
360  int minServerMajor,
361  maxServerMajor;
362  int serverMajor;
363 
364  /*
365  * The message format used in streaming replication changed in 9.3, so we
366  * cannot stream from older servers. And we don't support servers newer
367  * than the client; it might work, but we don't know, so err on the safe
368  * side.
369  */
370  minServerMajor = 903;
371  maxServerMajor = PG_VERSION_NUM / 100;
372  serverMajor = PQserverVersion(conn) / 100;
373  if (serverMajor < minServerMajor)
374  {
375  const char *serverver = PQparameterStatus(conn, "server_version");
376 
377  pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
378  serverver ? serverver : "'unknown'",
379  "9.3");
380  return false;
381  }
382  else if (serverMajor > maxServerMajor)
383  {
384  const char *serverver = PQparameterStatus(conn, "server_version");
385 
386  pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
387  serverver ? serverver : "'unknown'",
388  PG_VERSION);
389  return false;
390  }
391  return true;
392 }
393 
394 /*
395  * Receive a log stream starting at the specified position.
396  *
397  * Individual parameters are passed through the StreamCtl structure.
398  *
399  * If sysidentifier is specified, validate that both the system
400  * identifier and the timeline matches the specified ones
401  * (by sending an extra IDENTIFY_SYSTEM command)
402  *
403  * All received segments will be written to the directory
404  * specified by basedir. This will also fetch any missing timeline history
405  * files.
406  *
407  * The stream_stop callback will be called every time data
408  * is received, and whenever a segment is completed. If it returns
409  * true, the streaming will stop and the function
410  * return. As long as it returns false, streaming will continue
411  * indefinitely.
412  *
413  * If stream_stop() checks for external input, stop_socket should be set to
414  * the FD it checks. This will allow such input to be detected promptly
415  * rather than after standby_message_timeout (which might be indefinite).
416  * Note that signals will interrupt waits for input as well, but that is
417  * race-y since a signal received while busy won't interrupt the wait.
418  *
419  * standby_message_timeout controls how often we send a message
420  * back to the master letting it know our progress, in milliseconds.
421  * Zero means no messages are sent.
422  * This message will only contain the write location, and never
423  * flush or replay.
424  *
425  * If 'partial_suffix' is not NULL, files are initially created with the
426  * given suffix, and the suffix is removed once the file is finished. That
427  * allows you to tell the difference between partial and completed files,
428  * so that you can continue later where you left.
429  *
430  * If 'synchronous' is true, the received WAL is flushed as soon as written,
431  * otherwise only when the WAL file is closed.
432  *
433  * Note: The WAL location *must* be at a log segment start!
434  */
435 bool
437 {
438  char query[128];
439  char slotcmd[128];
440  PGresult *res;
441  XLogRecPtr stoppos;
442 
443  /*
444  * The caller should've checked the server version already, but doesn't do
445  * any harm to check it here too.
446  */
448  return false;
449 
450  /*
451  * Decide whether we want to report the flush position. If we report the
452  * flush position, the primary will know what WAL we'll possibly
453  * re-request, and it can then remove older WAL safely. We must always do
454  * that when we are using slots.
455  *
456  * Reporting the flush position makes one eligible as a synchronous
457  * replica. People shouldn't include generic names in
458  * synchronous_standby_names, but we've protected them against it so far,
459  * so let's continue to do so unless specifically requested.
460  */
461  if (stream->replication_slot != NULL)
462  {
463  reportFlushPosition = true;
464  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
465  }
466  else
467  {
468  if (stream->synchronous)
469  reportFlushPosition = true;
470  else
471  reportFlushPosition = false;
472  slotcmd[0] = 0;
473  }
474 
475  if (stream->sysidentifier != NULL)
476  {
477  /* Validate system identifier hasn't changed */
478  res = PQexec(conn, "IDENTIFY_SYSTEM");
479  if (PQresultStatus(res) != PGRES_TUPLES_OK)
480  {
481  pg_log_error("could not send replication command \"%s\": %s",
482  "IDENTIFY_SYSTEM", PQerrorMessage(conn));
483  PQclear(res);
484  return false;
485  }
486  if (PQntuples(res) != 1 || PQnfields(res) < 3)
487  {
488  pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
489  PQntuples(res), PQnfields(res), 1, 3);
490  PQclear(res);
491  return false;
492  }
493  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
494  {
495  pg_log_error("system identifier does not match between base backup and streaming connection");
496  PQclear(res);
497  return false;
498  }
499  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
500  {
501  pg_log_error("starting timeline %u is not present in the server",
502  stream->timeline);
503  PQclear(res);
504  return false;
505  }
506  PQclear(res);
507  }
508 
509  /*
510  * initialize flush position to starting point, it's the caller's
511  * responsibility that that's sane.
512  */
513  lastFlushPosition = stream->startpos;
514 
515  while (1)
516  {
517  /*
518  * Fetch the timeline history file for this timeline, if we don't have
519  * it already. When streaming log to tar, this will always return
520  * false, as we are never streaming into an existing file and
521  * therefore there can be no pre-existing timeline history file.
522  */
523  if (!existsTimeLineHistoryFile(stream))
524  {
525  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
526  res = PQexec(conn, query);
527  if (PQresultStatus(res) != PGRES_TUPLES_OK)
528  {
529  /* FIXME: we might send it ok, but get an error */
530  pg_log_error("could not send replication command \"%s\": %s",
531  "TIMELINE_HISTORY", PQresultErrorMessage(res));
532  PQclear(res);
533  return false;
534  }
535 
536  /*
537  * The response to TIMELINE_HISTORY is a single row result set
538  * with two fields: filename and content
539  */
540  if (PQnfields(res) != 2 || PQntuples(res) != 1)
541  {
542  pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
543  PQntuples(res), PQnfields(res), 1, 2);
544  }
545 
546  /* Write the history file to disk */
548  PQgetvalue(res, 0, 0),
549  PQgetvalue(res, 0, 1));
550 
551  PQclear(res);
552  }
553 
554  /*
555  * Before we start streaming from the requested location, check if the
556  * callback tells us to stop here.
557  */
558  if (stream->stream_stop(stream->startpos, stream->timeline, false))
559  return true;
560 
561  /* Initiate the replication stream at specified location */
562  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
563  slotcmd,
564  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
565  stream->timeline);
566  res = PQexec(conn, query);
567  if (PQresultStatus(res) != PGRES_COPY_BOTH)
568  {
569  pg_log_error("could not send replication command \"%s\": %s",
570  "START_REPLICATION", PQresultErrorMessage(res));
571  PQclear(res);
572  return false;
573  }
574  PQclear(res);
575 
576  /* Stream the WAL */
577  res = HandleCopyStream(conn, stream, &stoppos);
578  if (res == NULL)
579  goto error;
580 
581  /*
582  * Streaming finished.
583  *
584  * There are two possible reasons for that: a controlled shutdown, or
585  * we reached the end of the current timeline. In case of
586  * end-of-timeline, the server sends a result set after Copy has
587  * finished, containing information about the next timeline. Read
588  * that, and restart streaming from the next timeline. In case of
589  * controlled shutdown, stop here.
590  */
591  if (PQresultStatus(res) == PGRES_TUPLES_OK)
592  {
593  /*
594  * End-of-timeline. Read the next timeline's ID and starting
595  * position. Usually, the starting position will match the end of
596  * the previous timeline, but there are corner cases like if the
597  * server had sent us half of a WAL record, when it was promoted.
598  * The new timeline will begin at the end of the last complete
599  * record in that case, overlapping the partial WAL record on the
600  * old timeline.
601  */
602  uint32 newtimeline;
603  bool parsed;
604 
605  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
606  PQclear(res);
607  if (!parsed)
608  goto error;
609 
610  /* Sanity check the values the server gave us */
611  if (newtimeline <= stream->timeline)
612  {
613  pg_log_error("server reported unexpected next timeline %u, following timeline %u",
614  newtimeline, stream->timeline);
615  goto error;
616  }
617  if (stream->startpos > stoppos)
618  {
619  pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
620  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
621  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
622  goto error;
623  }
624 
625  /* Read the final result, which should be CommandComplete. */
626  res = PQgetResult(conn);
627  if (PQresultStatus(res) != PGRES_COMMAND_OK)
628  {
629  pg_log_error("unexpected termination of replication stream: %s",
630  PQresultErrorMessage(res));
631  PQclear(res);
632  goto error;
633  }
634  PQclear(res);
635 
636  /*
637  * Loop back to start streaming from the new timeline. Always
638  * start streaming at the beginning of a segment.
639  */
640  stream->timeline = newtimeline;
641  stream->startpos = stream->startpos -
643  continue;
644  }
645  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
646  {
647  PQclear(res);
648 
649  /*
650  * End of replication (ie. controlled shut down of the server).
651  *
652  * Check if the callback thinks it's OK to stop here. If not,
653  * complain.
654  */
655  if (stream->stream_stop(stoppos, stream->timeline, false))
656  return true;
657  else
658  {
659  pg_log_error("replication stream was terminated before stop point");
660  goto error;
661  }
662  }
663  else
664  {
665  /* Server returned an error. */
666  pg_log_error("unexpected termination of replication stream: %s",
667  PQresultErrorMessage(res));
668  PQclear(res);
669  goto error;
670  }
671  }
672 
673 error:
674  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
675  pg_log_error("could not close file \"%s\": %s",
677  walfile = NULL;
678  return false;
679 }
680 
681 /*
682  * Helper function to parse the result set returned by server after streaming
683  * has finished. On failure, prints an error to stderr and returns false.
684  */
685 static bool
687 {
688  uint32 startpos_xlogid,
689  startpos_xrecoff;
690 
691  /*----------
692  * The result set consists of one row and two columns, e.g:
693  *
694  * next_tli | next_tli_startpos
695  * ----------+-------------------
696  * 4 | 0/9949AE0
697  *
698  * next_tli is the timeline ID of the next timeline after the one that
699  * just finished streaming. next_tli_startpos is the WAL location where
700  * the server switched to it.
701  *----------
702  */
703  if (PQnfields(res) < 2 || PQntuples(res) != 1)
704  {
705  pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
706  PQntuples(res), PQnfields(res), 1, 2);
707  return false;
708  }
709 
710  *timeline = atoi(PQgetvalue(res, 0, 0));
711  if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
712  &startpos_xrecoff) != 2)
713  {
714  pg_log_error("could not parse next timeline's starting point \"%s\"",
715  PQgetvalue(res, 0, 1));
716  return false;
717  }
718  *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
719 
720  return true;
721 }
722 
723 /*
724  * The main loop of ReceiveXlogStream. Handles the COPY stream after
725  * initiating streaming with the START_REPLICATION command.
726  *
727  * If the COPY ends (not necessarily successfully) due a message from the
728  * server, returns a PGresult and sets *stoppos to the last byte written.
729  * On any other sort of error, returns NULL.
730  */
731 static PGresult *
733  XLogRecPtr *stoppos)
734 {
735  char *copybuf = NULL;
736  TimestampTz last_status = -1;
737  XLogRecPtr blockpos = stream->startpos;
738 
739  still_sending = true;
740 
741  while (1)
742  {
743  int r;
745  long sleeptime;
746 
747  /*
748  * Check if we should continue streaming, or abort at this point.
749  */
750  if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
751  goto error;
752 
753  now = feGetCurrentTimestamp();
754 
755  /*
756  * If synchronous option is true, issue sync command as soon as there
757  * are WAL data which has not been flushed yet.
758  */
759  if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
760  {
761  if (stream->walmethod->sync(walfile) != 0)
762  {
763  pg_log_fatal("could not fsync file \"%s\": %s",
765  exit(1);
766  }
767  lastFlushPosition = blockpos;
768 
769  /*
770  * Send feedback so that the server sees the latest WAL locations
771  * immediately.
772  */
773  if (!sendFeedback(conn, blockpos, now, false))
774  goto error;
775  last_status = now;
776  }
777 
778  /*
779  * Potentially send a status message to the master
780  */
781  if (still_sending && stream->standby_message_timeout > 0 &&
782  feTimestampDifferenceExceeds(last_status, now,
783  stream->standby_message_timeout))
784  {
785  /* Time to send feedback! */
786  if (!sendFeedback(conn, blockpos, now, false))
787  goto error;
788  last_status = now;
789  }
790 
791  /*
792  * Calculate how long send/receive loops should sleep
793  */
794  sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
795  last_status);
796 
797  r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
798  while (r != 0)
799  {
800  if (r == -1)
801  goto error;
802  if (r == -2)
803  {
804  PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
805 
806  if (res == NULL)
807  goto error;
808  else
809  return res;
810  }
811 
812  /* Check the message type. */
813  if (copybuf[0] == 'k')
814  {
815  if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
816  &last_status))
817  goto error;
818  }
819  else if (copybuf[0] == 'w')
820  {
821  if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
822  goto error;
823 
824  /*
825  * Check if we should continue streaming, or abort at this
826  * point.
827  */
828  if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
829  goto error;
830  }
831  else
832  {
833  pg_log_error("unrecognized streaming header: \"%c\"",
834  copybuf[0]);
835  goto error;
836  }
837 
838  /*
839  * Process the received data, and any subsequent data we can read
840  * without blocking.
841  */
842  r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
843  }
844  }
845 
846 error:
847  if (copybuf != NULL)
848  PQfreemem(copybuf);
849  return NULL;
850 }
851 
852 /*
853  * Wait until we can read a CopyData message,
854  * or timeout, or occurrence of a signal or input on the stop_socket.
855  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
856  *
857  * Returns 1 if data has become available for reading, 0 if timed out
858  * or interrupted by signal or stop_socket input, and -1 on an error.
859  */
860 static int
861 CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
862 {
863  int ret;
864  fd_set input_mask;
865  int connsocket;
866  int maxfd;
867  struct timeval timeout;
868  struct timeval *timeoutptr;
869 
870  connsocket = PQsocket(conn);
871  if (connsocket < 0)
872  {
873  pg_log_error("invalid socket: %s", PQerrorMessage(conn));
874  return -1;
875  }
876 
877  FD_ZERO(&input_mask);
878  FD_SET(connsocket, &input_mask);
879  maxfd = connsocket;
880  if (stop_socket != PGINVALID_SOCKET)
881  {
882  FD_SET(stop_socket, &input_mask);
883  maxfd = Max(maxfd, stop_socket);
884  }
885 
886  if (timeout_ms < 0)
887  timeoutptr = NULL;
888  else
889  {
890  timeout.tv_sec = timeout_ms / 1000L;
891  timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
892  timeoutptr = &timeout;
893  }
894 
895  ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
896 
897  if (ret < 0)
898  {
899  if (errno == EINTR)
900  return 0; /* Got a signal, so not an error */
901  pg_log_error("select() failed: %m");
902  return -1;
903  }
904  if (ret > 0 && FD_ISSET(connsocket, &input_mask))
905  return 1; /* Got input on connection socket */
906 
907  return 0; /* Got timeout or input on stop_socket */
908 }
909 
910 /*
911  * Receive CopyData message available from XLOG stream, blocking for
912  * maximum of 'timeout' ms.
913  *
914  * If data was received, returns the length of the data. *buffer is set to
915  * point to a buffer holding the received message. The buffer is only valid
916  * until the next CopyStreamReceive call.
917  *
918  * Returns 0 if no data was available within timeout, or if wait was
919  * interrupted by signal or stop_socket input.
920  * -1 on error. -2 if the server ended the COPY.
921  */
922 static int
923 CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
924  char **buffer)
925 {
926  char *copybuf = NULL;
927  int rawlen;
928 
929  if (*buffer != NULL)
930  PQfreemem(*buffer);
931  *buffer = NULL;
932 
933  /* Try to receive a CopyData message */
934  rawlen = PQgetCopyData(conn, &copybuf, 1);
935  if (rawlen == 0)
936  {
937  int ret;
938 
939  /*
940  * No data available. Wait for some to appear, but not longer than
941  * the specified timeout, so that we can ping the server. Also stop
942  * waiting if input appears on stop_socket.
943  */
944  ret = CopyStreamPoll(conn, timeout, stop_socket);
945  if (ret <= 0)
946  return ret;
947 
948  /* Now there is actually data on the socket */
949  if (PQconsumeInput(conn) == 0)
950  {
951  pg_log_error("could not receive data from WAL stream: %s",
952  PQerrorMessage(conn));
953  return -1;
954  }
955 
956  /* Now that we've consumed some input, try again */
957  rawlen = PQgetCopyData(conn, &copybuf, 1);
958  if (rawlen == 0)
959  return 0;
960  }
961  if (rawlen == -1) /* end-of-streaming or error */
962  return -2;
963  if (rawlen == -2)
964  {
965  pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
966  return -1;
967  }
968 
969  /* Return received messages to caller */
970  *buffer = copybuf;
971  return rawlen;
972 }
973 
974 /*
975  * Process the keepalive message.
976  */
977 static bool
978 ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
979  XLogRecPtr blockpos, TimestampTz *last_status)
980 {
981  int pos;
982  bool replyRequested;
984 
985  /*
986  * Parse the keepalive message, enclosed in the CopyData message. We just
987  * check if the server requested a reply, and ignore the rest.
988  */
989  pos = 1; /* skip msgtype 'k' */
990  pos += 8; /* skip walEnd */
991  pos += 8; /* skip sendTime */
992 
993  if (len < pos + 1)
994  {
995  pg_log_error("streaming header too small: %d", len);
996  return false;
997  }
998  replyRequested = copybuf[pos];
999 
1000  /* If the server requested an immediate reply, send one. */
1001  if (replyRequested && still_sending)
1002  {
1003  if (reportFlushPosition && lastFlushPosition < blockpos &&
1004  walfile != NULL)
1005  {
1006  /*
1007  * If a valid flush location needs to be reported, flush the
1008  * current WAL file so that the latest flush location is sent back
1009  * to the server. This is necessary to see whether the last WAL
1010  * data has been successfully replicated or not, at the normal
1011  * shutdown of the server.
1012  */
1013  if (stream->walmethod->sync(walfile) != 0)
1014  {
1015  pg_log_fatal("could not fsync file \"%s\": %s",
1017  exit(1);
1018  }
1019  lastFlushPosition = blockpos;
1020  }
1021 
1022  now = feGetCurrentTimestamp();
1023  if (!sendFeedback(conn, blockpos, now, false))
1024  return false;
1025  *last_status = now;
1026  }
1027 
1028  return true;
1029 }
1030 
1031 /*
1032  * Process XLogData message.
1033  */
1034 static bool
1035 ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1036  XLogRecPtr *blockpos)
1037 {
1038  int xlogoff;
1039  int bytes_left;
1040  int bytes_written;
1041  int hdr_len;
1042 
1043  /*
1044  * Once we've decided we don't want to receive any more, just ignore any
1045  * subsequent XLogData messages.
1046  */
1047  if (!(still_sending))
1048  return true;
1049 
1050  /*
1051  * Read the header of the XLogData message, enclosed in the CopyData
1052  * message. We only need the WAL location field (dataStart), the rest of
1053  * the header is ignored.
1054  */
1055  hdr_len = 1; /* msgtype 'w' */
1056  hdr_len += 8; /* dataStart */
1057  hdr_len += 8; /* walEnd */
1058  hdr_len += 8; /* sendTime */
1059  if (len < hdr_len)
1060  {
1061  pg_log_error("streaming header too small: %d", len);
1062  return false;
1063  }
1064  *blockpos = fe_recvint64(&copybuf[1]);
1065 
1066  /* Extract WAL location for this block */
1067  xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1068 
1069  /*
1070  * Verify that the initial location in the stream matches where we think
1071  * we are.
1072  */
1073  if (walfile == NULL)
1074  {
1075  /* No file open yet */
1076  if (xlogoff != 0)
1077  {
1078  pg_log_error("received write-ahead log record for offset %u with no file open",
1079  xlogoff);
1080  return false;
1081  }
1082  }
1083  else
1084  {
1085  /* More data in existing segment */
1086  if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1087  {
1088  pg_log_error("got WAL data offset %08x, expected %08x",
1089  xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1090  return false;
1091  }
1092  }
1093 
1094  bytes_left = len - hdr_len;
1095  bytes_written = 0;
1096 
1097  while (bytes_left)
1098  {
1099  int bytes_to_write;
1100 
1101  /*
1102  * If crossing a WAL boundary, only write up until we reach wal
1103  * segment size.
1104  */
1105  if (xlogoff + bytes_left > WalSegSz)
1106  bytes_to_write = WalSegSz - xlogoff;
1107  else
1108  bytes_to_write = bytes_left;
1109 
1110  if (walfile == NULL)
1111  {
1112  if (!open_walfile(stream, *blockpos))
1113  {
1114  /* Error logged by open_walfile */
1115  return false;
1116  }
1117  }
1118 
1119  if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1120  bytes_to_write) != bytes_to_write)
1121  {
1122  pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
1123  bytes_to_write, current_walfile_name,
1124  stream->walmethod->getlasterror());
1125  return false;
1126  }
1127 
1128  /* Write was successful, advance our position */
1129  bytes_written += bytes_to_write;
1130  bytes_left -= bytes_to_write;
1131  *blockpos += bytes_to_write;
1132  xlogoff += bytes_to_write;
1133 
1134  /* Did we reach the end of a WAL segment? */
1135  if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1136  {
1137  if (!close_walfile(stream, *blockpos))
1138  /* Error message written in close_walfile() */
1139  return false;
1140 
1141  xlogoff = 0;
1142 
1143  if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1144  {
1145  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1146  {
1147  pg_log_error("could not send copy-end packet: %s",
1148  PQerrorMessage(conn));
1149  return false;
1150  }
1151  still_sending = false;
1152  return true; /* ignore the rest of this XLogData packet */
1153  }
1154  }
1155  }
1156  /* No more data left to write, receive next copy packet */
1157 
1158  return true;
1159 }
1160 
1161 /*
1162  * Handle end of the copy stream.
1163  */
1164 static PGresult *
1166  XLogRecPtr blockpos, XLogRecPtr *stoppos)
1167 {
1168  PGresult *res = PQgetResult(conn);
1169 
1170  /*
1171  * The server closed its end of the copy stream. If we haven't closed
1172  * ours already, we need to do so now, unless the server threw an error,
1173  * in which case we don't.
1174  */
1175  if (still_sending)
1176  {
1177  if (!close_walfile(stream, blockpos))
1178  {
1179  /* Error message written in close_walfile() */
1180  PQclear(res);
1181  return NULL;
1182  }
1183  if (PQresultStatus(res) == PGRES_COPY_IN)
1184  {
1185  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1186  {
1187  pg_log_error("could not send copy-end packet: %s",
1188  PQerrorMessage(conn));
1189  PQclear(res);
1190  return NULL;
1191  }
1192  res = PQgetResult(conn);
1193  }
1194  still_sending = false;
1195  }
1196  if (copybuf != NULL)
1197  PQfreemem(copybuf);
1198  *stoppos = blockpos;
1199  return res;
1200 }
1201 
1202 /*
1203  * Check if we should continue streaming, or abort at this point.
1204  */
1205 static bool
1207  XLogRecPtr *stoppos)
1208 {
1209  if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1210  {
1211  if (!close_walfile(stream, blockpos))
1212  {
1213  /* Potential error message is written by close_walfile */
1214  return false;
1215  }
1216  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1217  {
1218  pg_log_error("could not send copy-end packet: %s",
1219  PQerrorMessage(conn));
1220  return false;
1221  }
1222  still_sending = false;
1223  }
1224 
1225  return true;
1226 }
1227 
1228 /*
1229  * Calculate how long send/receive loops should sleep
1230  */
1231 static long
1233  TimestampTz last_status)
1234 {
1235  TimestampTz status_targettime = 0;
1236  long sleeptime;
1237 
1238  if (standby_message_timeout && still_sending)
1239  status_targettime = last_status +
1240  (standby_message_timeout - 1) * ((int64) 1000);
1241 
1242  if (status_targettime > 0)
1243  {
1244  long secs;
1245  int usecs;
1246 
1248  status_targettime,
1249  &secs,
1250  &usecs);
1251  /* Always sleep at least 1 sec */
1252  if (secs <= 0)
1253  {
1254  secs = 1;
1255  usecs = 0;
1256  }
1257 
1258  sleeptime = secs * 1000 + usecs / 1000;
1259  }
1260  else
1261  sleeptime = -1;
1262 
1263  return sleeptime;
1264 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2317
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2777
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:89
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6617
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
Definition: receivelog.c:861
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:606
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3163
void * Walfile
Definition: walmethods.h:13
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
static void error(void)
Definition: sql-dyntest.c:147
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6582
static bool still_sending
Definition: receivelog.c:36
int64 TimestampTz
Definition: timestamp.h:39
#define pg_log_error(...)
Definition: logging.h:79
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1206
char * sysidentifier
Definition: receivelog.h:34
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2384
XLogRecPtr startpos
Definition: receivelog.h:32
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1232
int sleeptime
Definition: pg_standby.c:41
char * partial_suffix
Definition: receivelog.h:48
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6607
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2769
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:647
int(* sync)(Walfile f)
Definition: walmethods.h:67
TimeLineID timeline
Definition: receivelog.h:33
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2473
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:978
const char *(* getlasterror)(void)
Definition: walmethods.h:78
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:62
#define sprintf
Definition: port.h:194
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
Definition: receivelog.c:923
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:686
PGconn * conn
Definition: streamutil.c:54
#define MAXPGPATH
static Walfile * walfile
Definition: receivelog.c:31
int PQflush(PGconn *conn)
Definition: fe-exec.c:3283
char * replication_slot
Definition: receivelog.h:49
bool mark_done
Definition: receivelog.h:38
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:625
#define TLHistoryFileName(fname, tli)
static bool reportFlushPosition
Definition: receivelog.c:33
uint64 XLogSegNo
Definition: xlogdefs.h:41
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:59
#define select(n, r, w, e, timeout)
Definition: win32_port.h:436
unsigned int uint32
Definition: c.h:359
int pgsocket
Definition: port.h:31
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:178
stream_stop_callback stream_stop
Definition: receivelog.h:42
WalWriteMethod * walmethod
Definition: receivelog.h:47
#define MAXFNAMELEN
static int standby_message_timeout
Definition: pg_basebackup.c:96
#define ngettext(s, p, n)
Definition: c.h:1128
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:320
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1165
#define PGINVALID_SOCKET
Definition: port.h:33
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1035
StringInfo copybuf
Definition: tablesync.c:108
void PQclear(PGresult *res)
Definition: fe-exec.c:694
static void * fn(void *arg)
#define Max(x, y)
Definition: c.h:899
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:732
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:242
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
Definition: receivelog.c:436
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static XLogRecPtr startpos
bool synchronous
Definition: receivelog.h:37
pgsocket stop_socket
Definition: receivelog.h:44
int WalSegSz
Definition: pg_standby.c:38
static char * filename
Definition: pg_dumpall.c:90
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:50
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2708
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:41
int64 fe_recvint64(char *buf)
Definition: streamutil.c:671
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:58
int standby_message_timeout
Definition: receivelog.h:36
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1939
ssize_t(* get_file_size)(const char *pathname)
Definition: walmethods.h:53
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:660
#define EINTR
Definition: win32_port.h:323
#define pg_log_warning(...)
Definition: pgfnames.c:24
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
#define snprintf
Definition: port.h:192
void PQfreemem(void *ptr)
Definition: fe-exec.c:3296
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6635
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
#define pg_log_info(...)
Definition: logging.h:87
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:259
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:358
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define pg_log_fatal(...)
Definition: logging.h:75