PostgreSQL Source Code  git master
receivelog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * receivelog.c - receive WAL files using the streaming
4  * replication protocol.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/receivelog.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22 
23 /* local includes */
24 #include "receivelog.h"
25 #include "streamutil.h"
26 
27 #include "libpq-fe.h"
28 #include "access/xlog_internal.h"
29 #include "common/file_utils.h"
30 
31 
32 /* fd and filename for currently open WAL file */
33 static Walfile *walfile = NULL;
34 static char current_walfile_name[MAXPGPATH] = "";
35 static bool reportFlushPosition = false;
37 
38 static bool still_sending = true; /* feedback still needs to be sent? */
39 
41  XLogRecPtr *stoppos);
42 static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
43 static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
44  char **buffer);
45 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
46  int len, XLogRecPtr blockpos, TimestampTz *last_status);
47 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
48  XLogRecPtr *blockpos);
50  XLogRecPtr blockpos, XLogRecPtr *stoppos);
51 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
52  XLogRecPtr *stoppos);
54  TimestampTz last_status);
55 
57  uint32 *timeline);
58 
59 static bool
60 mark_file_as_archived(StreamCtl *stream, const char *fname)
61 {
62  Walfile *f;
63  static char tmppath[MAXPGPATH];
64 
65  snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
66  fname);
67 
68  f = stream->walmethod->open_for_write(tmppath, NULL, 0);
69  if (f == NULL)
70  {
71  fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
72  progname, tmppath, stream->walmethod->getlasterror());
73  return false;
74  }
75 
76  stream->walmethod->close(f, CLOSE_NORMAL);
77 
78  return true;
79 }
80 
81 /*
82  * Open a new WAL file in the specified directory.
83  *
84  * Returns true if OK; on failure, returns false after printing an error msg.
85  * On success, 'walfile' is set to the FD for the file, and the base filename
86  * (without partial_suffix) is stored in 'current_walfile_name'.
87  *
88  * The file will be padded to 16Mb with zeroes.
89  */
90 static bool
91 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
92 {
93  Walfile *f;
94  char fn[MAXPGPATH];
95  ssize_t size;
96  XLogSegNo segno;
97 
98  XLByteToSeg(startpoint, segno, WalSegSz);
100 
101  snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
102  stream->partial_suffix ? stream->partial_suffix : "");
103 
104  /*
105  * When streaming to files, if an existing file exists we verify that it's
106  * either empty (just created), or a complete WalSegSz segment (in which
107  * case it has been created and padded). Anything else indicates a corrupt
108  * file.
109  *
110  * When streaming to tar, no file with this name will exist before, so we
111  * never have to verify a size.
112  */
113  if (stream->walmethod->existsfile(fn))
114  {
115  size = stream->walmethod->get_file_size(fn);
116  if (size < 0)
117  {
118  fprintf(stderr,
119  _("%s: could not get size of write-ahead log file \"%s\": %s\n"),
120  progname, fn, stream->walmethod->getlasterror());
121  return false;
122  }
123  if (size == WalSegSz)
124  {
125  /* Already padded file. Open it for use */
127  if (f == NULL)
128  {
129  fprintf(stderr,
130  _("%s: could not open existing write-ahead log file \"%s\": %s\n"),
131  progname, fn, stream->walmethod->getlasterror());
132  return false;
133  }
134 
135  /* fsync file in case of a previous crash */
136  if (stream->walmethod->sync(f) != 0)
137  {
138  fprintf(stderr,
139  _("%s: could not fsync existing write-ahead log file \"%s\": %s\n"),
140  progname, fn, stream->walmethod->getlasterror());
141  stream->walmethod->close(f, CLOSE_UNLINK);
142  return false;
143  }
144 
145  walfile = f;
146  return true;
147  }
148  if (size != 0)
149  {
150  /* if write didn't set errno, assume problem is no disk space */
151  if (errno == 0)
152  errno = ENOSPC;
153  fprintf(stderr,
154  ngettext("%s: write-ahead log file \"%s\" has %d byte, should be 0 or %d\n",
155  "%s: write-ahead log file \"%s\" has %d bytes, should be 0 or %d\n",
156  size),
157  progname, fn, (int) size, WalSegSz);
158  return false;
159  }
160  /* File existed and was empty, so fall through and open */
161  }
162 
163  /* No file existed, so create one */
164 
166  stream->partial_suffix, WalSegSz);
167  if (f == NULL)
168  {
169  fprintf(stderr,
170  _("%s: could not open write-ahead log file \"%s\": %s\n"),
171  progname, fn, stream->walmethod->getlasterror());
172  return false;
173  }
174 
175  walfile = f;
176  return true;
177 }
178 
179 /*
180  * Close the current WAL file (if open), and rename it to the correct
181  * filename if it's complete. On failure, prints an error message to stderr
182  * and returns false, otherwise returns true.
183  */
184 static bool
186 {
187  off_t currpos;
188  int r;
189 
190  if (walfile == NULL)
191  return true;
192 
193  currpos = stream->walmethod->get_current_pos(walfile);
194  if (currpos == -1)
195  {
196  fprintf(stderr,
197  _("%s: could not determine seek position in file \"%s\": %s\n"),
199  stream->walmethod->close(walfile, CLOSE_UNLINK);
200  walfile = NULL;
201 
202  return false;
203  }
204 
205  if (stream->partial_suffix)
206  {
207  if (currpos == WalSegSz)
208  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
209  else
210  {
211  fprintf(stderr,
212  _("%s: not renaming \"%s%s\", segment is not complete\n"),
214  r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
215  }
216  }
217  else
218  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
219 
220  walfile = NULL;
221 
222  if (r != 0)
223  {
224  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
226  return false;
227  }
228 
229  /*
230  * Mark file as archived if requested by the caller - pg_basebackup needs
231  * to do so as files can otherwise get archived again after promotion of a
232  * new node. This is in line with walreceiver.c always doing a
233  * XLogArchiveForceDone() after a complete segment.
234  */
235  if (currpos == WalSegSz && stream->mark_done)
236  {
237  /* writes error message if failed */
239  return false;
240  }
241 
242  lastFlushPosition = pos;
243  return true;
244 }
245 
246 
247 /*
248  * Check if a timeline history file exists.
249  */
250 static bool
252 {
253  char histfname[MAXFNAMELEN];
254 
255  /*
256  * Timeline 1 never has a history file. We treat that as if it existed,
257  * since we never need to stream it.
258  */
259  if (stream->timeline == 1)
260  return true;
261 
262  TLHistoryFileName(histfname, stream->timeline);
263 
264  return stream->walmethod->existsfile(histfname);
265 }
266 
267 static bool
268 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
269 {
270  int size = strlen(content);
271  char histfname[MAXFNAMELEN];
272  Walfile *f;
273 
274  /*
275  * Check that the server's idea of how timeline history files should be
276  * named matches ours.
277  */
278  TLHistoryFileName(histfname, stream->timeline);
279  if (strcmp(histfname, filename) != 0)
280  {
281  fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
282  progname, stream->timeline, filename);
283  return false;
284  }
285 
286  f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
287  if (f == NULL)
288  {
289  fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
290  progname, histfname, stream->walmethod->getlasterror());
291  return false;
292  }
293 
294  if ((int) stream->walmethod->write(f, content, size) != size)
295  {
296  fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
297  progname, histfname, stream->walmethod->getlasterror());
298 
299  /*
300  * If we fail to make the file, delete it to release disk space
301  */
302  stream->walmethod->close(f, CLOSE_UNLINK);
303 
304  return false;
305  }
306 
307  if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
308  {
309  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
310  progname, histfname, stream->walmethod->getlasterror());
311  return false;
312  }
313 
314  /* Maintain archive_status, check close_walfile() for details. */
315  if (stream->mark_done)
316  {
317  /* writes error message if failed */
318  if (!mark_file_as_archived(stream, histfname))
319  return false;
320  }
321 
322  return true;
323 }
324 
325 /*
326  * Send a Standby Status Update message to server.
327  */
328 static bool
329 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
330 {
331  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
332  int len = 0;
333 
334  replybuf[len] = 'r';
335  len += 1;
336  fe_sendint64(blockpos, &replybuf[len]); /* write */
337  len += 8;
339  fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
340  else
341  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
342  len += 8;
343  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
344  len += 8;
345  fe_sendint64(now, &replybuf[len]); /* sendTime */
346  len += 8;
347  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
348  len += 1;
349 
350  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
351  {
352  fprintf(stderr, _("%s: could not send feedback packet: %s"),
353  progname, PQerrorMessage(conn));
354  return false;
355  }
356 
357  return true;
358 }
359 
360 /*
361  * Check that the server version we're connected to is supported by
362  * ReceiveXlogStream().
363  *
364  * If it's not, an error message is printed to stderr, and false is returned.
365  */
366 bool
368 {
369  int minServerMajor,
370  maxServerMajor;
371  int serverMajor;
372 
373  /*
374  * The message format used in streaming replication changed in 9.3, so we
375  * cannot stream from older servers. And we don't support servers newer
376  * than the client; it might work, but we don't know, so err on the safe
377  * side.
378  */
379  minServerMajor = 903;
380  maxServerMajor = PG_VERSION_NUM / 100;
381  serverMajor = PQserverVersion(conn) / 100;
382  if (serverMajor < minServerMajor)
383  {
384  const char *serverver = PQparameterStatus(conn, "server_version");
385 
386  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
387  progname,
388  serverver ? serverver : "'unknown'",
389  "9.3");
390  return false;
391  }
392  else if (serverMajor > maxServerMajor)
393  {
394  const char *serverver = PQparameterStatus(conn, "server_version");
395 
396  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
397  progname,
398  serverver ? serverver : "'unknown'",
399  PG_VERSION);
400  return false;
401  }
402  return true;
403 }
404 
405 /*
406  * Receive a log stream starting at the specified position.
407  *
408  * Individual parameters are passed through the StreamCtl structure.
409  *
410  * If sysidentifier is specified, validate that both the system
411  * identifier and the timeline matches the specified ones
412  * (by sending an extra IDENTIFY_SYSTEM command)
413  *
414  * All received segments will be written to the directory
415  * specified by basedir. This will also fetch any missing timeline history
416  * files.
417  *
418  * The stream_stop callback will be called every time data
419  * is received, and whenever a segment is completed. If it returns
420  * true, the streaming will stop and the function
421  * return. As long as it returns false, streaming will continue
422  * indefinitely.
423  *
424  * If stream_stop() checks for external input, stop_socket should be set to
425  * the FD it checks. This will allow such input to be detected promptly
426  * rather than after standby_message_timeout (which might be indefinite).
427  * Note that signals will interrupt waits for input as well, but that is
428  * race-y since a signal received while busy won't interrupt the wait.
429  *
430  * standby_message_timeout controls how often we send a message
431  * back to the master letting it know our progress, in milliseconds.
432  * Zero means no messages are sent.
433  * This message will only contain the write location, and never
434  * flush or replay.
435  *
436  * If 'partial_suffix' is not NULL, files are initially created with the
437  * given suffix, and the suffix is removed once the file is finished. That
438  * allows you to tell the difference between partial and completed files,
439  * so that you can continue later where you left.
440  *
441  * If 'synchronous' is true, the received WAL is flushed as soon as written,
442  * otherwise only when the WAL file is closed.
443  *
444  * Note: The WAL location *must* be at a log segment start!
445  */
446 bool
448 {
449  char query[128];
450  char slotcmd[128];
451  PGresult *res;
452  XLogRecPtr stoppos;
453 
454  /*
455  * The caller should've checked the server version already, but doesn't do
456  * any harm to check it here too.
457  */
459  return false;
460 
461  /*
462  * Decide whether we want to report the flush position. If we report the
463  * flush position, the primary will know what WAL we'll possibly
464  * re-request, and it can then remove older WAL safely. We must always do
465  * that when we are using slots.
466  *
467  * Reporting the flush position makes one eligible as a synchronous
468  * replica. People shouldn't include generic names in
469  * synchronous_standby_names, but we've protected them against it so far,
470  * so let's continue to do so unless specifically requested.
471  */
472  if (stream->replication_slot != NULL)
473  {
474  reportFlushPosition = true;
475  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
476  }
477  else
478  {
479  if (stream->synchronous)
480  reportFlushPosition = true;
481  else
482  reportFlushPosition = false;
483  slotcmd[0] = 0;
484  }
485 
486  if (stream->sysidentifier != NULL)
487  {
488  /* Validate system identifier hasn't changed */
489  res = PQexec(conn, "IDENTIFY_SYSTEM");
490  if (PQresultStatus(res) != PGRES_TUPLES_OK)
491  {
492  fprintf(stderr,
493  _("%s: could not send replication command \"%s\": %s"),
494  progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
495  PQclear(res);
496  return false;
497  }
498  if (PQntuples(res) != 1 || PQnfields(res) < 3)
499  {
500  fprintf(stderr,
501  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
502  progname, PQntuples(res), PQnfields(res), 1, 3);
503  PQclear(res);
504  return false;
505  }
506  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
507  {
508  fprintf(stderr,
509  _("%s: system identifier does not match between base backup and streaming connection\n"),
510  progname);
511  PQclear(res);
512  return false;
513  }
514  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
515  {
516  fprintf(stderr,
517  _("%s: starting timeline %u is not present in the server\n"),
518  progname, stream->timeline);
519  PQclear(res);
520  return false;
521  }
522  PQclear(res);
523  }
524 
525  /*
526  * initialize flush position to starting point, it's the caller's
527  * responsibility that that's sane.
528  */
529  lastFlushPosition = stream->startpos;
530 
531  while (1)
532  {
533  /*
534  * Fetch the timeline history file for this timeline, if we don't have
535  * it already. When streaming log to tar, this will always return
536  * false, as we are never streaming into an existing file and
537  * therefore there can be no pre-existing timeline history file.
538  */
539  if (!existsTimeLineHistoryFile(stream))
540  {
541  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
542  res = PQexec(conn, query);
543  if (PQresultStatus(res) != PGRES_TUPLES_OK)
544  {
545  /* FIXME: we might send it ok, but get an error */
546  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
547  progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
548  PQclear(res);
549  return false;
550  }
551 
552  /*
553  * The response to TIMELINE_HISTORY is a single row result set
554  * with two fields: filename and content
555  */
556  if (PQnfields(res) != 2 || PQntuples(res) != 1)
557  {
558  fprintf(stderr,
559  _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
560  progname, PQntuples(res), PQnfields(res), 1, 2);
561  }
562 
563  /* Write the history file to disk */
565  PQgetvalue(res, 0, 0),
566  PQgetvalue(res, 0, 1));
567 
568  PQclear(res);
569  }
570 
571  /*
572  * Before we start streaming from the requested location, check if the
573  * callback tells us to stop here.
574  */
575  if (stream->stream_stop(stream->startpos, stream->timeline, false))
576  return true;
577 
578  /* Initiate the replication stream at specified location */
579  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
580  slotcmd,
581  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
582  stream->timeline);
583  res = PQexec(conn, query);
584  if (PQresultStatus(res) != PGRES_COPY_BOTH)
585  {
586  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
587  progname, "START_REPLICATION", PQresultErrorMessage(res));
588  PQclear(res);
589  return false;
590  }
591  PQclear(res);
592 
593  /* Stream the WAL */
594  res = HandleCopyStream(conn, stream, &stoppos);
595  if (res == NULL)
596  goto error;
597 
598  /*
599  * Streaming finished.
600  *
601  * There are two possible reasons for that: a controlled shutdown, or
602  * we reached the end of the current timeline. In case of
603  * end-of-timeline, the server sends a result set after Copy has
604  * finished, containing information about the next timeline. Read
605  * that, and restart streaming from the next timeline. In case of
606  * controlled shutdown, stop here.
607  */
608  if (PQresultStatus(res) == PGRES_TUPLES_OK)
609  {
610  /*
611  * End-of-timeline. Read the next timeline's ID and starting
612  * position. Usually, the starting position will match the end of
613  * the previous timeline, but there are corner cases like if the
614  * server had sent us half of a WAL record, when it was promoted.
615  * The new timeline will begin at the end of the last complete
616  * record in that case, overlapping the partial WAL record on the
617  * old timeline.
618  */
619  uint32 newtimeline;
620  bool parsed;
621 
622  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
623  PQclear(res);
624  if (!parsed)
625  goto error;
626 
627  /* Sanity check the values the server gave us */
628  if (newtimeline <= stream->timeline)
629  {
630  fprintf(stderr,
631  _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
632  progname, newtimeline, stream->timeline);
633  goto error;
634  }
635  if (stream->startpos > stoppos)
636  {
637  fprintf(stderr,
638  _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
639  progname,
640  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
641  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
642  goto error;
643  }
644 
645  /* Read the final result, which should be CommandComplete. */
646  res = PQgetResult(conn);
647  if (PQresultStatus(res) != PGRES_COMMAND_OK)
648  {
649  fprintf(stderr,
650  _("%s: unexpected termination of replication stream: %s"),
652  PQclear(res);
653  goto error;
654  }
655  PQclear(res);
656 
657  /*
658  * Loop back to start streaming from the new timeline. Always
659  * start streaming at the beginning of a segment.
660  */
661  stream->timeline = newtimeline;
662  stream->startpos = stream->startpos -
664  continue;
665  }
666  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
667  {
668  PQclear(res);
669 
670  /*
671  * End of replication (ie. controlled shut down of the server).
672  *
673  * Check if the callback thinks it's OK to stop here. If not,
674  * complain.
675  */
676  if (stream->stream_stop(stoppos, stream->timeline, false))
677  return true;
678  else
679  {
680  fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
681  progname);
682  goto error;
683  }
684  }
685  else
686  {
687  /* Server returned an error. */
688  fprintf(stderr,
689  _("%s: unexpected termination of replication stream: %s"),
691  PQclear(res);
692  goto error;
693  }
694  }
695 
696 error:
697  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
698  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
700  walfile = NULL;
701  return false;
702 }
703 
704 /*
705  * Helper function to parse the result set returned by server after streaming
706  * has finished. On failure, prints an error to stderr and returns false.
707  */
708 static bool
710 {
711  uint32 startpos_xlogid,
712  startpos_xrecoff;
713 
714  /*----------
715  * The result set consists of one row and two columns, e.g:
716  *
717  * next_tli | next_tli_startpos
718  * ----------+-------------------
719  * 4 | 0/9949AE0
720  *
721  * next_tli is the timeline ID of the next timeline after the one that
722  * just finished streaming. next_tli_startpos is the WAL location where
723  * the server switched to it.
724  *----------
725  */
726  if (PQnfields(res) < 2 || PQntuples(res) != 1)
727  {
728  fprintf(stderr,
729  _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
730  progname, PQntuples(res), PQnfields(res), 1, 2);
731  return false;
732  }
733 
734  *timeline = atoi(PQgetvalue(res, 0, 0));
735  if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
736  &startpos_xrecoff) != 2)
737  {
738  fprintf(stderr,
739  _("%s: could not parse next timeline's starting point \"%s\"\n"),
740  progname, PQgetvalue(res, 0, 1));
741  return false;
742  }
743  *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
744 
745  return true;
746 }
747 
748 /*
749  * The main loop of ReceiveXlogStream. Handles the COPY stream after
750  * initiating streaming with the START_REPLICATION command.
751  *
752  * If the COPY ends (not necessarily successfully) due a message from the
753  * server, returns a PGresult and sets *stoppos to the last byte written.
754  * On any other sort of error, returns NULL.
755  */
756 static PGresult *
758  XLogRecPtr *stoppos)
759 {
760  char *copybuf = NULL;
761  TimestampTz last_status = -1;
762  XLogRecPtr blockpos = stream->startpos;
763 
764  still_sending = true;
765 
766  while (1)
767  {
768  int r;
770  long sleeptime;
771 
772  /*
773  * Check if we should continue streaming, or abort at this point.
774  */
775  if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
776  goto error;
777 
778  now = feGetCurrentTimestamp();
779 
780  /*
781  * If synchronous option is true, issue sync command as soon as there
782  * are WAL data which has not been flushed yet.
783  */
784  if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
785  {
786  if (stream->walmethod->sync(walfile) != 0)
787  {
788  fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
790  goto error;
791  }
792  lastFlushPosition = blockpos;
793 
794  /*
795  * Send feedback so that the server sees the latest WAL locations
796  * immediately.
797  */
798  if (!sendFeedback(conn, blockpos, now, false))
799  goto error;
800  last_status = now;
801  }
802 
803  /*
804  * Potentially send a status message to the master
805  */
806  if (still_sending && stream->standby_message_timeout > 0 &&
807  feTimestampDifferenceExceeds(last_status, now,
808  stream->standby_message_timeout))
809  {
810  /* Time to send feedback! */
811  if (!sendFeedback(conn, blockpos, now, false))
812  goto error;
813  last_status = now;
814  }
815 
816  /*
817  * Calculate how long send/receive loops should sleep
818  */
819  sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
820  last_status);
821 
822  r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
823  while (r != 0)
824  {
825  if (r == -1)
826  goto error;
827  if (r == -2)
828  {
829  PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
830 
831  if (res == NULL)
832  goto error;
833  else
834  return res;
835  }
836 
837  /* Check the message type. */
838  if (copybuf[0] == 'k')
839  {
840  if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
841  &last_status))
842  goto error;
843  }
844  else if (copybuf[0] == 'w')
845  {
846  if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
847  goto error;
848 
849  /*
850  * Check if we should continue streaming, or abort at this
851  * point.
852  */
853  if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
854  goto error;
855  }
856  else
857  {
858  fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
859  progname, copybuf[0]);
860  goto error;
861  }
862 
863  /*
864  * Process the received data, and any subsequent data we can read
865  * without blocking.
866  */
867  r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
868  }
869  }
870 
871 error:
872  if (copybuf != NULL)
873  PQfreemem(copybuf);
874  return NULL;
875 }
876 
877 /*
878  * Wait until we can read a CopyData message,
879  * or timeout, or occurrence of a signal or input on the stop_socket.
880  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
881  *
882  * Returns 1 if data has become available for reading, 0 if timed out
883  * or interrupted by signal or stop_socket input, and -1 on an error.
884  */
885 static int
886 CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
887 {
888  int ret;
889  fd_set input_mask;
890  int connsocket;
891  int maxfd;
892  struct timeval timeout;
893  struct timeval *timeoutptr;
894 
895  connsocket = PQsocket(conn);
896  if (connsocket < 0)
897  {
898  fprintf(stderr, _("%s: invalid socket: %s"), progname,
899  PQerrorMessage(conn));
900  return -1;
901  }
902 
903  FD_ZERO(&input_mask);
904  FD_SET(connsocket, &input_mask);
905  maxfd = connsocket;
906  if (stop_socket != PGINVALID_SOCKET)
907  {
908  FD_SET(stop_socket, &input_mask);
909  maxfd = Max(maxfd, stop_socket);
910  }
911 
912  if (timeout_ms < 0)
913  timeoutptr = NULL;
914  else
915  {
916  timeout.tv_sec = timeout_ms / 1000L;
917  timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
918  timeoutptr = &timeout;
919  }
920 
921  ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
922 
923  if (ret < 0)
924  {
925  if (errno == EINTR)
926  return 0; /* Got a signal, so not an error */
927  fprintf(stderr, _("%s: select() failed: %s\n"),
928  progname, strerror(errno));
929  return -1;
930  }
931  if (ret > 0 && FD_ISSET(connsocket, &input_mask))
932  return 1; /* Got input on connection socket */
933 
934  return 0; /* Got timeout or input on stop_socket */
935 }
936 
937 /*
938  * Receive CopyData message available from XLOG stream, blocking for
939  * maximum of 'timeout' ms.
940  *
941  * If data was received, returns the length of the data. *buffer is set to
942  * point to a buffer holding the received message. The buffer is only valid
943  * until the next CopyStreamReceive call.
944  *
945  * Returns 0 if no data was available within timeout, or if wait was
946  * interrupted by signal or stop_socket input.
947  * -1 on error. -2 if the server ended the COPY.
948  */
949 static int
950 CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
951  char **buffer)
952 {
953  char *copybuf = NULL;
954  int rawlen;
955 
956  if (*buffer != NULL)
957  PQfreemem(*buffer);
958  *buffer = NULL;
959 
960  /* Try to receive a CopyData message */
961  rawlen = PQgetCopyData(conn, &copybuf, 1);
962  if (rawlen == 0)
963  {
964  int ret;
965 
966  /*
967  * No data available. Wait for some to appear, but not longer than
968  * the specified timeout, so that we can ping the server. Also stop
969  * waiting if input appears on stop_socket.
970  */
971  ret = CopyStreamPoll(conn, timeout, stop_socket);
972  if (ret <= 0)
973  return ret;
974 
975  /* Now there is actually data on the socket */
976  if (PQconsumeInput(conn) == 0)
977  {
978  fprintf(stderr,
979  _("%s: could not receive data from WAL stream: %s"),
980  progname, PQerrorMessage(conn));
981  return -1;
982  }
983 
984  /* Now that we've consumed some input, try again */
985  rawlen = PQgetCopyData(conn, &copybuf, 1);
986  if (rawlen == 0)
987  return 0;
988  }
989  if (rawlen == -1) /* end-of-streaming or error */
990  return -2;
991  if (rawlen == -2)
992  {
993  fprintf(stderr, _("%s: could not read COPY data: %s"),
994  progname, PQerrorMessage(conn));
995  return -1;
996  }
997 
998  /* Return received messages to caller */
999  *buffer = copybuf;
1000  return rawlen;
1001 }
1002 
1003 /*
1004  * Process the keepalive message.
1005  */
1006 static bool
1008  XLogRecPtr blockpos, TimestampTz *last_status)
1009 {
1010  int pos;
1011  bool replyRequested;
1012  TimestampTz now;
1013 
1014  /*
1015  * Parse the keepalive message, enclosed in the CopyData message. We just
1016  * check if the server requested a reply, and ignore the rest.
1017  */
1018  pos = 1; /* skip msgtype 'k' */
1019  pos += 8; /* skip walEnd */
1020  pos += 8; /* skip sendTime */
1021 
1022  if (len < pos + 1)
1023  {
1024  fprintf(stderr, _("%s: streaming header too small: %d\n"),
1025  progname, len);
1026  return false;
1027  }
1028  replyRequested = copybuf[pos];
1029 
1030  /* If the server requested an immediate reply, send one. */
1031  if (replyRequested && still_sending)
1032  {
1033  if (reportFlushPosition && lastFlushPosition < blockpos &&
1034  walfile != NULL)
1035  {
1036  /*
1037  * If a valid flush location needs to be reported, flush the
1038  * current WAL file so that the latest flush location is sent back
1039  * to the server. This is necessary to see whether the last WAL
1040  * data has been successfully replicated or not, at the normal
1041  * shutdown of the server.
1042  */
1043  if (stream->walmethod->sync(walfile) != 0)
1044  {
1045  fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
1047  return false;
1048  }
1049  lastFlushPosition = blockpos;
1050  }
1051 
1052  now = feGetCurrentTimestamp();
1053  if (!sendFeedback(conn, blockpos, now, false))
1054  return false;
1055  *last_status = now;
1056  }
1057 
1058  return true;
1059 }
1060 
1061 /*
1062  * Process XLogData message.
1063  */
1064 static bool
1065 ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1066  XLogRecPtr *blockpos)
1067 {
1068  int xlogoff;
1069  int bytes_left;
1070  int bytes_written;
1071  int hdr_len;
1072 
1073  /*
1074  * Once we've decided we don't want to receive any more, just ignore any
1075  * subsequent XLogData messages.
1076  */
1077  if (!(still_sending))
1078  return true;
1079 
1080  /*
1081  * Read the header of the XLogData message, enclosed in the CopyData
1082  * message. We only need the WAL location field (dataStart), the rest of
1083  * the header is ignored.
1084  */
1085  hdr_len = 1; /* msgtype 'w' */
1086  hdr_len += 8; /* dataStart */
1087  hdr_len += 8; /* walEnd */
1088  hdr_len += 8; /* sendTime */
1089  if (len < hdr_len)
1090  {
1091  fprintf(stderr, _("%s: streaming header too small: %d\n"),
1092  progname, len);
1093  return false;
1094  }
1095  *blockpos = fe_recvint64(&copybuf[1]);
1096 
1097  /* Extract WAL location for this block */
1098  xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1099 
1100  /*
1101  * Verify that the initial location in the stream matches where we think
1102  * we are.
1103  */
1104  if (walfile == NULL)
1105  {
1106  /* No file open yet */
1107  if (xlogoff != 0)
1108  {
1109  fprintf(stderr,
1110  _("%s: received write-ahead log record for offset %u with no file open\n"),
1111  progname, xlogoff);
1112  return false;
1113  }
1114  }
1115  else
1116  {
1117  /* More data in existing segment */
1118  if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1119  {
1120  fprintf(stderr,
1121  _("%s: got WAL data offset %08x, expected %08x\n"),
1122  progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1123  return false;
1124  }
1125  }
1126 
1127  bytes_left = len - hdr_len;
1128  bytes_written = 0;
1129 
1130  while (bytes_left)
1131  {
1132  int bytes_to_write;
1133 
1134  /*
1135  * If crossing a WAL boundary, only write up until we reach wal
1136  * segment size.
1137  */
1138  if (xlogoff + bytes_left > WalSegSz)
1139  bytes_to_write = WalSegSz - xlogoff;
1140  else
1141  bytes_to_write = bytes_left;
1142 
1143  if (walfile == NULL)
1144  {
1145  if (!open_walfile(stream, *blockpos))
1146  {
1147  /* Error logged by open_walfile */
1148  return false;
1149  }
1150  }
1151 
1152  if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1153  bytes_to_write) != bytes_to_write)
1154  {
1155  fprintf(stderr,
1156  _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
1157  progname, bytes_to_write, current_walfile_name,
1158  stream->walmethod->getlasterror());
1159  return false;
1160  }
1161 
1162  /* Write was successful, advance our position */
1163  bytes_written += bytes_to_write;
1164  bytes_left -= bytes_to_write;
1165  *blockpos += bytes_to_write;
1166  xlogoff += bytes_to_write;
1167 
1168  /* Did we reach the end of a WAL segment? */
1169  if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1170  {
1171  if (!close_walfile(stream, *blockpos))
1172  /* Error message written in close_walfile() */
1173  return false;
1174 
1175  xlogoff = 0;
1176 
1177  if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1178  {
1179  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1180  {
1181  fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1182  progname, PQerrorMessage(conn));
1183  return false;
1184  }
1185  still_sending = false;
1186  return true; /* ignore the rest of this XLogData packet */
1187  }
1188  }
1189  }
1190  /* No more data left to write, receive next copy packet */
1191 
1192  return true;
1193 }
1194 
1195 /*
1196  * Handle end of the copy stream.
1197  */
1198 static PGresult *
1200  XLogRecPtr blockpos, XLogRecPtr *stoppos)
1201 {
1202  PGresult *res = PQgetResult(conn);
1203 
1204  /*
1205  * The server closed its end of the copy stream. If we haven't closed
1206  * ours already, we need to do so now, unless the server threw an error,
1207  * in which case we don't.
1208  */
1209  if (still_sending)
1210  {
1211  if (!close_walfile(stream, blockpos))
1212  {
1213  /* Error message written in close_walfile() */
1214  PQclear(res);
1215  return NULL;
1216  }
1217  if (PQresultStatus(res) == PGRES_COPY_IN)
1218  {
1219  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1220  {
1221  fprintf(stderr,
1222  _("%s: could not send copy-end packet: %s"),
1223  progname, PQerrorMessage(conn));
1224  PQclear(res);
1225  return NULL;
1226  }
1227  res = PQgetResult(conn);
1228  }
1229  still_sending = false;
1230  }
1231  if (copybuf != NULL)
1232  PQfreemem(copybuf);
1233  *stoppos = blockpos;
1234  return res;
1235 }
1236 
1237 /*
1238  * Check if we should continue streaming, or abort at this point.
1239  */
1240 static bool
1242  XLogRecPtr *stoppos)
1243 {
1244  if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1245  {
1246  if (!close_walfile(stream, blockpos))
1247  {
1248  /* Potential error message is written by close_walfile */
1249  return false;
1250  }
1251  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1252  {
1253  fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1254  progname, PQerrorMessage(conn));
1255  return false;
1256  }
1257  still_sending = false;
1258  }
1259 
1260  return true;
1261 }
1262 
1263 /*
1264  * Calculate how long send/receive loops should sleep
1265  */
1266 static long
1268  TimestampTz last_status)
1269 {
1270  TimestampTz status_targettime = 0;
1271  long sleeptime;
1272 
1273  if (standby_message_timeout && still_sending)
1274  status_targettime = last_status +
1275  (standby_message_timeout - 1) * ((int64) 1000);
1276 
1277  if (status_targettime > 0)
1278  {
1279  long secs;
1280  int usecs;
1281 
1283  status_targettime,
1284  &secs,
1285  &usecs);
1286  /* Always sleep at least 1 sec */
1287  if (secs <= 0)
1288  {
1289  secs = 1;
1290  usecs = 0;
1291  }
1292 
1293  sleeptime = secs * 1000 + usecs / 1000;
1294  }
1295  else
1296  sleeptime = -1;
1297 
1298  return sleeptime;
1299 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2272
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2732
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:91
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6106
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
Definition: receivelog.c:886
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:520
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3118
void * Walfile
Definition: walmethods.h:13
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
static void error(void)
Definition: sql-dyntest.c:147
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6071
static bool still_sending
Definition: receivelog.c:38
int64 TimestampTz
Definition: timestamp.h:39
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1241
char * sysidentifier
Definition: receivelog.h:34
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2339
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr startpos
Definition: receivelog.h:32
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1267
int sleeptime
Definition: pg_standby.c:42
char * partial_suffix
Definition: receivelog.h:48
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6096
const char * progname
Definition: pg_standby.c:37
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2724
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:561
int(* sync)(Walfile f)
Definition: walmethods.h:67
TimeLineID timeline
Definition: receivelog.h:33
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2647
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2428
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:1007
const char *(* getlasterror)(void)
Definition: walmethods.h:78
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:62
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
Definition: receivelog.c:950
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:709
PGconn * conn
Definition: streamutil.c:46
#define MAXPGPATH
static Walfile * walfile
Definition: receivelog.c:33
int PQflush(PGconn *conn)
Definition: fe-exec.c:3238
char * replication_slot
Definition: receivelog.h:49
bool mark_done
Definition: receivelog.h:38
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:539
#define TLHistoryFileName(fname, tli)
static bool reportFlushPosition
Definition: receivelog.c:35
uint64 XLogSegNo
Definition: xlogdefs.h:34
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:59
#define select(n, r, w, e, timeout)
Definition: win32_port.h:447
unsigned int uint32
Definition: c.h:296
int pgsocket
Definition: port.h:31
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:185
stream_stop_callback stream_stop
Definition: receivelog.h:42
WalWriteMethod * walmethod
Definition: receivelog.h:47
#define MAXFNAMELEN
static int standby_message_timeout
Definition: pg_basebackup.c:91
#define ngettext(s, p, n)
Definition: c.h:967
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:329
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:36
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1199
#define PGINVALID_SOCKET
Definition: port.h:33
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1682
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1065
StringInfo copybuf
Definition: tablesync.c:114
void PQclear(PGresult *res)
Definition: fe-exec.c:671
static void * fn(void *arg)
#define Max(x, y)
Definition: c.h:796
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:757
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:251
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
Definition: receivelog.c:447
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static XLogRecPtr startpos
bool synchronous
Definition: receivelog.h:37
pgsocket stop_socket
Definition: receivelog.h:44
int WalSegSz
Definition: pg_standby.c:39
static char * filename
Definition: pg_dumpall.c:90
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:50
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2663
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:41
int64 fe_recvint64(char *buf)
Definition: streamutil.c:585
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:60
const char * strerror(int errnum)
Definition: strerror.c:19
int standby_message_timeout
Definition: receivelog.h:36
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1897
ssize_t(* get_file_size)(const char *pathname)
Definition: walmethods.h:53
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:574
#define EINTR
Definition: win32_port.h:334
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:34
#define _(x)
Definition: elog.c:84
void PQfreemem(void *ptr)
Definition: fe-exec.c:3251
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6124
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1753
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:268
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:367
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)