PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
receivelog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * receivelog.c - receive WAL files using the streaming
4  * replication protocol.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/receivelog.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22 
23 /* local includes */
24 #include "receivelog.h"
25 #include "streamutil.h"
26 
27 #include "libpq-fe.h"
28 #include "access/xlog_internal.h"
29 #include "common/file_utils.h"
30 
31 
32 /* fd and filename for currently open WAL file */
33 static Walfile *walfile = NULL;
34 static char current_walfile_name[MAXPGPATH] = "";
35 static bool reportFlushPosition = false;
37 
38 static bool still_sending = true; /* feedback still needs to be sent? */
39 
41  XLogRecPtr *stoppos);
42 static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
43 static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
44  char **buffer);
45 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
46  int len, XLogRecPtr blockpos, TimestampTz *last_status);
47 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
48  XLogRecPtr *blockpos);
50  XLogRecPtr blockpos, XLogRecPtr *stoppos);
51 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
52  XLogRecPtr *stoppos);
54  TimestampTz last_status);
55 
57  uint32 *timeline);
58 
59 static bool
60 mark_file_as_archived(StreamCtl *stream, const char *fname)
61 {
62  Walfile *f;
63  static char tmppath[MAXPGPATH];
64 
65  snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
66  fname);
67 
68  f = stream->walmethod->open_for_write(tmppath, NULL, 0);
69  if (f == NULL)
70  {
71  fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
72  progname, tmppath, stream->walmethod->getlasterror());
73  return false;
74  }
75 
76  stream->walmethod->close(f, CLOSE_NORMAL);
77 
78  return true;
79 }
80 
81 /*
82  * Open a new WAL file in the specified directory.
83  *
84  * Returns true if OK; on failure, returns false after printing an error msg.
85  * On success, 'walfile' is set to the FD for the file, and the base filename
86  * (without partial_suffix) is stored in 'current_walfile_name'.
87  *
88  * The file will be padded to 16Mb with zeroes.
89  */
90 static bool
91 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
92 {
93  Walfile *f;
94  char fn[MAXPGPATH];
95  ssize_t size;
96  XLogSegNo segno;
97 
98  XLByteToSeg(startpoint, segno);
100 
101  snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
102  stream->partial_suffix ? stream->partial_suffix : "");
103 
104  /*
105  * When streaming to files, if an existing file exists we verify that it's
106  * either empty (just created), or a complete XLogSegSize segment (in
107  * which case it has been created and padded). Anything else indicates a
108  * corrupt file.
109  *
110  * When streaming to tar, no file with this name will exist before, so we
111  * never have to verify a size.
112  */
113  if (stream->walmethod->existsfile(fn))
114  {
115  size = stream->walmethod->get_file_size(fn);
116  if (size < 0)
117  {
118  fprintf(stderr,
119  _("%s: could not get size of write-ahead log file \"%s\": %s\n"),
120  progname, fn, stream->walmethod->getlasterror());
121  return false;
122  }
123  if (size == XLogSegSize)
124  {
125  /* Already padded file. Open it for use */
127  if (f == NULL)
128  {
129  fprintf(stderr,
130  _("%s: could not open existing write-ahead log file \"%s\": %s\n"),
131  progname, fn, stream->walmethod->getlasterror());
132  return false;
133  }
134 
135  /* fsync file in case of a previous crash */
136  if (stream->walmethod->sync(f) != 0)
137  {
138  fprintf(stderr,
139  _("%s: could not fsync existing write-ahead log file \"%s\": %s\n"),
140  progname, fn, stream->walmethod->getlasterror());
141  stream->walmethod->close(f, CLOSE_UNLINK);
142  return false;
143  }
144 
145  walfile = f;
146  return true;
147  }
148  if (size != 0)
149  {
150  /* if write didn't set errno, assume problem is no disk space */
151  if (errno == 0)
152  errno = ENOSPC;
153  fprintf(stderr,
154  ngettext("%s: write-ahead log file \"%s\" has %d byte, should be 0 or %d\n",
155  "%s: write-ahead log file \"%s\" has %d bytes, should be 0 or %d\n",
156  size),
157  progname, fn, (int) size, XLogSegSize);
158  return false;
159  }
160  /* File existed and was empty, so fall through and open */
161  }
162 
163  /* No file existed, so create one */
164 
166  if (f == NULL)
167  {
168  fprintf(stderr,
169  _("%s: could not open write-ahead log file \"%s\": %s\n"),
170  progname, fn, stream->walmethod->getlasterror());
171  return false;
172  }
173 
174  walfile = f;
175  return true;
176 }
177 
178 /*
179  * Close the current WAL file (if open), and rename it to the correct
180  * filename if it's complete. On failure, prints an error message to stderr
181  * and returns false, otherwise returns true.
182  */
183 static bool
185 {
186  off_t currpos;
187  int r;
188 
189  if (walfile == NULL)
190  return true;
191 
192  currpos = stream->walmethod->get_current_pos(walfile);
193  if (currpos == -1)
194  {
195  fprintf(stderr,
196  _("%s: could not determine seek position in file \"%s\": %s\n"),
198  stream->walmethod->close(walfile, CLOSE_UNLINK);
199  walfile = NULL;
200 
201  return false;
202  }
203 
204  if (stream->partial_suffix)
205  {
206  if (currpos == XLOG_SEG_SIZE)
207  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
208  else
209  {
210  fprintf(stderr,
211  _("%s: not renaming \"%s%s\", segment is not complete\n"),
213  r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
214  }
215  }
216  else
217  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
218 
219  walfile = NULL;
220 
221  if (r != 0)
222  {
223  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
225  return false;
226  }
227 
228  /*
229  * Mark file as archived if requested by the caller - pg_basebackup needs
230  * to do so as files can otherwise get archived again after promotion of a
231  * new node. This is in line with walreceiver.c always doing a
232  * XLogArchiveForceDone() after a complete segment.
233  */
234  if (currpos == XLOG_SEG_SIZE && stream->mark_done)
235  {
236  /* writes error message if failed */
238  return false;
239  }
240 
241  lastFlushPosition = pos;
242  return true;
243 }
244 
245 
246 /*
247  * Check if a timeline history file exists.
248  */
249 static bool
251 {
252  char histfname[MAXFNAMELEN];
253 
254  /*
255  * Timeline 1 never has a history file. We treat that as if it existed,
256  * since we never need to stream it.
257  */
258  if (stream->timeline == 1)
259  return true;
260 
261  TLHistoryFileName(histfname, stream->timeline);
262 
263  return stream->walmethod->existsfile(histfname);
264 }
265 
266 static bool
267 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
268 {
269  int size = strlen(content);
270  char histfname[MAXFNAMELEN];
271  Walfile *f;
272 
273  /*
274  * Check that the server's idea of how timeline history files should be
275  * named matches ours.
276  */
277  TLHistoryFileName(histfname, stream->timeline);
278  if (strcmp(histfname, filename) != 0)
279  {
280  fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
281  progname, stream->timeline, filename);
282  return false;
283  }
284 
285  f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
286  if (f == NULL)
287  {
288  fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
289  progname, histfname, stream->walmethod->getlasterror());
290  return false;
291  }
292 
293  if ((int) stream->walmethod->write(f, content, size) != size)
294  {
295  fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
296  progname, histfname, stream->walmethod->getlasterror());
297 
298  /*
299  * If we fail to make the file, delete it to release disk space
300  */
301  stream->walmethod->close(f, CLOSE_UNLINK);
302 
303  return false;
304  }
305 
306  if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
307  {
308  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
309  progname, histfname, stream->walmethod->getlasterror());
310  return false;
311  }
312 
313  /* Maintain archive_status, check close_walfile() for details. */
314  if (stream->mark_done)
315  {
316  /* writes error message if failed */
317  if (!mark_file_as_archived(stream, histfname))
318  return false;
319  }
320 
321  return true;
322 }
323 
324 /*
325  * Send a Standby Status Update message to server.
326  */
327 static bool
328 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
329 {
330  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
331  int len = 0;
332 
333  replybuf[len] = 'r';
334  len += 1;
335  fe_sendint64(blockpos, &replybuf[len]); /* write */
336  len += 8;
338  fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
339  else
340  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
341  len += 8;
342  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
343  len += 8;
344  fe_sendint64(now, &replybuf[len]); /* sendTime */
345  len += 8;
346  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
347  len += 1;
348 
349  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
350  {
351  fprintf(stderr, _("%s: could not send feedback packet: %s"),
352  progname, PQerrorMessage(conn));
353  return false;
354  }
355 
356  return true;
357 }
358 
359 /*
360  * Check that the server version we're connected to is supported by
361  * ReceiveXlogStream().
362  *
363  * If it's not, an error message is printed to stderr, and false is returned.
364  */
365 bool
367 {
368  int minServerMajor,
369  maxServerMajor;
370  int serverMajor;
371 
372  /*
373  * The message format used in streaming replication changed in 9.3, so we
374  * cannot stream from older servers. And we don't support servers newer
375  * than the client; it might work, but we don't know, so err on the safe
376  * side.
377  */
378  minServerMajor = 903;
379  maxServerMajor = PG_VERSION_NUM / 100;
380  serverMajor = PQserverVersion(conn) / 100;
381  if (serverMajor < minServerMajor)
382  {
383  const char *serverver = PQparameterStatus(conn, "server_version");
384 
385  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
386  progname,
387  serverver ? serverver : "'unknown'",
388  "9.3");
389  return false;
390  }
391  else if (serverMajor > maxServerMajor)
392  {
393  const char *serverver = PQparameterStatus(conn, "server_version");
394 
395  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
396  progname,
397  serverver ? serverver : "'unknown'",
398  PG_VERSION);
399  return false;
400  }
401  return true;
402 }
403 
404 /*
405  * Receive a log stream starting at the specified position.
406  *
407  * Individual parameters are passed through the StreamCtl structure.
408  *
409  * If sysidentifier is specified, validate that both the system
410  * identifier and the timeline matches the specified ones
411  * (by sending an extra IDENTIFY_SYSTEM command)
412  *
413  * All received segments will be written to the directory
414  * specified by basedir. This will also fetch any missing timeline history
415  * files.
416  *
417  * The stream_stop callback will be called every time data
418  * is received, and whenever a segment is completed. If it returns
419  * true, the streaming will stop and the function
420  * return. As long as it returns false, streaming will continue
421  * indefinitely.
422  *
423  * If stream_stop() checks for external input, stop_socket should be set to
424  * the FD it checks. This will allow such input to be detected promptly
425  * rather than after standby_message_timeout (which might be indefinite).
426  * Note that signals will interrupt waits for input as well, but that is
427  * race-y since a signal received while busy won't interrupt the wait.
428  *
429  * standby_message_timeout controls how often we send a message
430  * back to the master letting it know our progress, in milliseconds.
431  * Zero means no messages are sent.
432  * This message will only contain the write location, and never
433  * flush or replay.
434  *
435  * If 'partial_suffix' is not NULL, files are initially created with the
436  * given suffix, and the suffix is removed once the file is finished. That
437  * allows you to tell the difference between partial and completed files,
438  * so that you can continue later where you left.
439  *
440  * If 'synchronous' is true, the received WAL is flushed as soon as written,
441  * otherwise only when the WAL file is closed.
442  *
443  * Note: The WAL location *must* be at a log segment start!
444  */
445 bool
447 {
448  char query[128];
449  char slotcmd[128];
450  PGresult *res;
451  XLogRecPtr stoppos;
452 
453  /*
454  * The caller should've checked the server version already, but doesn't do
455  * any harm to check it here too.
456  */
458  return false;
459 
460  /*
461  * Decide whether we want to report the flush position. If we report the
462  * flush position, the primary will know what WAL we'll possibly
463  * re-request, and it can then remove older WAL safely. We must always do
464  * that when we are using slots.
465  *
466  * Reporting the flush position makes one eligible as a synchronous
467  * replica. People shouldn't include generic names in
468  * synchronous_standby_names, but we've protected them against it so far,
469  * so let's continue to do so unless specifically requested.
470  */
471  if (stream->replication_slot != NULL)
472  {
473  reportFlushPosition = true;
474  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
475  }
476  else
477  {
478  if (stream->synchronous)
479  reportFlushPosition = true;
480  else
481  reportFlushPosition = false;
482  slotcmd[0] = 0;
483  }
484 
485  if (stream->sysidentifier != NULL)
486  {
487  /* Validate system identifier hasn't changed */
488  res = PQexec(conn, "IDENTIFY_SYSTEM");
489  if (PQresultStatus(res) != PGRES_TUPLES_OK)
490  {
491  fprintf(stderr,
492  _("%s: could not send replication command \"%s\": %s"),
493  progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
494  PQclear(res);
495  return false;
496  }
497  if (PQntuples(res) != 1 || PQnfields(res) < 3)
498  {
499  fprintf(stderr,
500  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
501  progname, PQntuples(res), PQnfields(res), 1, 3);
502  PQclear(res);
503  return false;
504  }
505  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
506  {
507  fprintf(stderr,
508  _("%s: system identifier does not match between base backup and streaming connection\n"),
509  progname);
510  PQclear(res);
511  return false;
512  }
513  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
514  {
515  fprintf(stderr,
516  _("%s: starting timeline %u is not present in the server\n"),
517  progname, stream->timeline);
518  PQclear(res);
519  return false;
520  }
521  PQclear(res);
522  }
523 
524  /*
525  * Create temporary replication slot if one is needed
526  */
527  if (stream->temp_slot)
528  {
529  snprintf(query, sizeof(query),
530  "CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL",
531  stream->replication_slot);
532  res = PQexec(conn, query);
533  if (PQresultStatus(res) != PGRES_TUPLES_OK)
534  {
535  fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"),
536  progname, stream->replication_slot, PQerrorMessage(conn));
537  PQclear(res);
538  return false;
539  }
540  }
541 
542  /*
543  * initialize flush position to starting point, it's the caller's
544  * responsibility that that's sane.
545  */
546  lastFlushPosition = stream->startpos;
547 
548  while (1)
549  {
550  /*
551  * Fetch the timeline history file for this timeline, if we don't have
552  * it already. When streaming log to tar, this will always return
553  * false, as we are never streaming into an existing file and
554  * therefore there can be no pre-existing timeline history file.
555  */
556  if (!existsTimeLineHistoryFile(stream))
557  {
558  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
559  res = PQexec(conn, query);
560  if (PQresultStatus(res) != PGRES_TUPLES_OK)
561  {
562  /* FIXME: we might send it ok, but get an error */
563  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
564  progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
565  PQclear(res);
566  return false;
567  }
568 
569  /*
570  * The response to TIMELINE_HISTORY is a single row result set
571  * with two fields: filename and content
572  */
573  if (PQnfields(res) != 2 || PQntuples(res) != 1)
574  {
575  fprintf(stderr,
576  _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
577  progname, PQntuples(res), PQnfields(res), 1, 2);
578  }
579 
580  /* Write the history file to disk */
582  PQgetvalue(res, 0, 0),
583  PQgetvalue(res, 0, 1));
584 
585  PQclear(res);
586  }
587 
588  /*
589  * Before we start streaming from the requested location, check if the
590  * callback tells us to stop here.
591  */
592  if (stream->stream_stop(stream->startpos, stream->timeline, false))
593  return true;
594 
595  /* Initiate the replication stream at specified location */
596  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
597  slotcmd,
598  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
599  stream->timeline);
600  res = PQexec(conn, query);
601  if (PQresultStatus(res) != PGRES_COPY_BOTH)
602  {
603  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
604  progname, "START_REPLICATION", PQresultErrorMessage(res));
605  PQclear(res);
606  return false;
607  }
608  PQclear(res);
609 
610  /* Stream the WAL */
611  res = HandleCopyStream(conn, stream, &stoppos);
612  if (res == NULL)
613  goto error;
614 
615  /*
616  * Streaming finished.
617  *
618  * There are two possible reasons for that: a controlled shutdown, or
619  * we reached the end of the current timeline. In case of
620  * end-of-timeline, the server sends a result set after Copy has
621  * finished, containing information about the next timeline. Read
622  * that, and restart streaming from the next timeline. In case of
623  * controlled shutdown, stop here.
624  */
625  if (PQresultStatus(res) == PGRES_TUPLES_OK)
626  {
627  /*
628  * End-of-timeline. Read the next timeline's ID and starting
629  * position. Usually, the starting position will match the end of
630  * the previous timeline, but there are corner cases like if the
631  * server had sent us half of a WAL record, when it was promoted.
632  * The new timeline will begin at the end of the last complete
633  * record in that case, overlapping the partial WAL record on the
634  * old timeline.
635  */
636  uint32 newtimeline;
637  bool parsed;
638 
639  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
640  PQclear(res);
641  if (!parsed)
642  goto error;
643 
644  /* Sanity check the values the server gave us */
645  if (newtimeline <= stream->timeline)
646  {
647  fprintf(stderr,
648  _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
649  progname, newtimeline, stream->timeline);
650  goto error;
651  }
652  if (stream->startpos > stoppos)
653  {
654  fprintf(stderr,
655  _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
656  progname,
657  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
658  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
659  goto error;
660  }
661 
662  /* Read the final result, which should be CommandComplete. */
663  res = PQgetResult(conn);
664  if (PQresultStatus(res) != PGRES_COMMAND_OK)
665  {
666  fprintf(stderr,
667  _("%s: unexpected termination of replication stream: %s"),
669  PQclear(res);
670  goto error;
671  }
672  PQclear(res);
673 
674  /*
675  * Loop back to start streaming from the new timeline. Always
676  * start streaming at the beginning of a segment.
677  */
678  stream->timeline = newtimeline;
679  stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
680  continue;
681  }
682  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
683  {
684  PQclear(res);
685 
686  /*
687  * End of replication (ie. controlled shut down of the server).
688  *
689  * Check if the callback thinks it's OK to stop here. If not,
690  * complain.
691  */
692  if (stream->stream_stop(stoppos, stream->timeline, false))
693  return true;
694  else
695  {
696  fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
697  progname);
698  goto error;
699  }
700  }
701  else
702  {
703  /* Server returned an error. */
704  fprintf(stderr,
705  _("%s: unexpected termination of replication stream: %s"),
707  PQclear(res);
708  goto error;
709  }
710  }
711 
712 error:
713  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
714  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
716  walfile = NULL;
717  return false;
718 }
719 
720 /*
721  * Helper function to parse the result set returned by server after streaming
722  * has finished. On failure, prints an error to stderr and returns false.
723  */
724 static bool
726 {
727  uint32 startpos_xlogid,
728  startpos_xrecoff;
729 
730  /*----------
731  * The result set consists of one row and two columns, e.g:
732  *
733  * next_tli | next_tli_startpos
734  * ----------+-------------------
735  * 4 | 0/9949AE0
736  *
737  * next_tli is the timeline ID of the next timeline after the one that
738  * just finished streaming. next_tli_startpos is the WAL location where
739  * the server switched to it.
740  *----------
741  */
742  if (PQnfields(res) < 2 || PQntuples(res) != 1)
743  {
744  fprintf(stderr,
745  _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
746  progname, PQntuples(res), PQnfields(res), 1, 2);
747  return false;
748  }
749 
750  *timeline = atoi(PQgetvalue(res, 0, 0));
751  if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
752  &startpos_xrecoff) != 2)
753  {
754  fprintf(stderr,
755  _("%s: could not parse next timeline's starting point \"%s\"\n"),
756  progname, PQgetvalue(res, 0, 1));
757  return false;
758  }
759  *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
760 
761  return true;
762 }
763 
764 /*
765  * The main loop of ReceiveXlogStream. Handles the COPY stream after
766  * initiating streaming with the START_STREAMING command.
767  *
768  * If the COPY ends (not necessarily successfully) due a message from the
769  * server, returns a PGresult and sets *stoppos to the last byte written.
770  * On any other sort of error, returns NULL.
771  */
772 static PGresult *
774  XLogRecPtr *stoppos)
775 {
776  char *copybuf = NULL;
777  TimestampTz last_status = -1;
778  XLogRecPtr blockpos = stream->startpos;
779 
780  still_sending = true;
781 
782  while (1)
783  {
784  int r;
786  long sleeptime;
787 
788  /*
789  * Check if we should continue streaming, or abort at this point.
790  */
791  if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
792  goto error;
793 
794  now = feGetCurrentTimestamp();
795 
796  /*
797  * If synchronous option is true, issue sync command as soon as there
798  * are WAL data which has not been flushed yet.
799  */
800  if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
801  {
802  if (stream->walmethod->sync(walfile) != 0)
803  {
804  fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
806  goto error;
807  }
808  lastFlushPosition = blockpos;
809 
810  /*
811  * Send feedback so that the server sees the latest WAL locations
812  * immediately.
813  */
814  if (!sendFeedback(conn, blockpos, now, false))
815  goto error;
816  last_status = now;
817  }
818 
819  /*
820  * Potentially send a status message to the master
821  */
822  if (still_sending && stream->standby_message_timeout > 0 &&
823  feTimestampDifferenceExceeds(last_status, now,
824  stream->standby_message_timeout))
825  {
826  /* Time to send feedback! */
827  if (!sendFeedback(conn, blockpos, now, false))
828  goto error;
829  last_status = now;
830  }
831 
832  /*
833  * Calculate how long send/receive loops should sleep
834  */
835  sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
836  last_status);
837 
838  r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
839  while (r != 0)
840  {
841  if (r == -1)
842  goto error;
843  if (r == -2)
844  {
845  PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
846 
847  if (res == NULL)
848  goto error;
849  else
850  return res;
851  }
852 
853  /* Check the message type. */
854  if (copybuf[0] == 'k')
855  {
856  if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
857  &last_status))
858  goto error;
859  }
860  else if (copybuf[0] == 'w')
861  {
862  if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
863  goto error;
864 
865  /*
866  * Check if we should continue streaming, or abort at this
867  * point.
868  */
869  if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
870  goto error;
871  }
872  else
873  {
874  fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
875  progname, copybuf[0]);
876  goto error;
877  }
878 
879  /*
880  * Process the received data, and any subsequent data we can read
881  * without blocking.
882  */
883  r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
884  }
885  }
886 
887 error:
888  if (copybuf != NULL)
889  PQfreemem(copybuf);
890  return NULL;
891 }
892 
893 /*
894  * Wait until we can read a CopyData message,
895  * or timeout, or occurrence of a signal or input on the stop_socket.
896  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
897  *
898  * Returns 1 if data has become available for reading, 0 if timed out
899  * or interrupted by signal or stop_socket input, and -1 on an error.
900  */
901 static int
902 CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
903 {
904  int ret;
905  fd_set input_mask;
906  int connsocket;
907  int maxfd;
908  struct timeval timeout;
909  struct timeval *timeoutptr;
910 
911  connsocket = PQsocket(conn);
912  if (connsocket < 0)
913  {
914  fprintf(stderr, _("%s: invalid socket: %s"), progname,
915  PQerrorMessage(conn));
916  return -1;
917  }
918 
919  FD_ZERO(&input_mask);
920  FD_SET(connsocket, &input_mask);
921  maxfd = connsocket;
922  if (stop_socket != PGINVALID_SOCKET)
923  {
924  FD_SET(stop_socket, &input_mask);
925  maxfd = Max(maxfd, stop_socket);
926  }
927 
928  if (timeout_ms < 0)
929  timeoutptr = NULL;
930  else
931  {
932  timeout.tv_sec = timeout_ms / 1000L;
933  timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
934  timeoutptr = &timeout;
935  }
936 
937  ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
938 
939  if (ret < 0)
940  {
941  if (errno == EINTR)
942  return 0; /* Got a signal, so not an error */
943  fprintf(stderr, _("%s: select() failed: %s\n"),
944  progname, strerror(errno));
945  return -1;
946  }
947  if (ret > 0 && FD_ISSET(connsocket, &input_mask))
948  return 1; /* Got input on connection socket */
949 
950  return 0; /* Got timeout or input on stop_socket */
951 }
952 
953 /*
954  * Receive CopyData message available from XLOG stream, blocking for
955  * maximum of 'timeout' ms.
956  *
957  * If data was received, returns the length of the data. *buffer is set to
958  * point to a buffer holding the received message. The buffer is only valid
959  * until the next CopyStreamReceive call.
960  *
961  * Returns 0 if no data was available within timeout, or if wait was
962  * interrupted by signal or stop_socket input.
963  * -1 on error. -2 if the server ended the COPY.
964  */
965 static int
966 CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
967  char **buffer)
968 {
969  char *copybuf = NULL;
970  int rawlen;
971 
972  if (*buffer != NULL)
973  PQfreemem(*buffer);
974  *buffer = NULL;
975 
976  /* Try to receive a CopyData message */
977  rawlen = PQgetCopyData(conn, &copybuf, 1);
978  if (rawlen == 0)
979  {
980  int ret;
981 
982  /*
983  * No data available. Wait for some to appear, but not longer than
984  * the specified timeout, so that we can ping the server. Also stop
985  * waiting if input appears on stop_socket.
986  */
987  ret = CopyStreamPoll(conn, timeout, stop_socket);
988  if (ret <= 0)
989  return ret;
990 
991  /* Now there is actually data on the socket */
992  if (PQconsumeInput(conn) == 0)
993  {
994  fprintf(stderr,
995  _("%s: could not receive data from WAL stream: %s"),
996  progname, PQerrorMessage(conn));
997  return -1;
998  }
999 
1000  /* Now that we've consumed some input, try again */
1001  rawlen = PQgetCopyData(conn, &copybuf, 1);
1002  if (rawlen == 0)
1003  return 0;
1004  }
1005  if (rawlen == -1) /* end-of-streaming or error */
1006  return -2;
1007  if (rawlen == -2)
1008  {
1009  fprintf(stderr, _("%s: could not read COPY data: %s"),
1010  progname, PQerrorMessage(conn));
1011  return -1;
1012  }
1013 
1014  /* Return received messages to caller */
1015  *buffer = copybuf;
1016  return rawlen;
1017 }
1018 
1019 /*
1020  * Process the keepalive message.
1021  */
1022 static bool
1024  XLogRecPtr blockpos, TimestampTz *last_status)
1025 {
1026  int pos;
1027  bool replyRequested;
1028  TimestampTz now;
1029 
1030  /*
1031  * Parse the keepalive message, enclosed in the CopyData message. We just
1032  * check if the server requested a reply, and ignore the rest.
1033  */
1034  pos = 1; /* skip msgtype 'k' */
1035  pos += 8; /* skip walEnd */
1036  pos += 8; /* skip sendTime */
1037 
1038  if (len < pos + 1)
1039  {
1040  fprintf(stderr, _("%s: streaming header too small: %d\n"),
1041  progname, len);
1042  return false;
1043  }
1044  replyRequested = copybuf[pos];
1045 
1046  /* If the server requested an immediate reply, send one. */
1047  if (replyRequested && still_sending)
1048  {
1049  if (reportFlushPosition && lastFlushPosition < blockpos &&
1050  walfile != NULL)
1051  {
1052  /*
1053  * If a valid flush location needs to be reported, flush the
1054  * current WAL file so that the latest flush location is sent back
1055  * to the server. This is necessary to see whether the last WAL
1056  * data has been successfully replicated or not, at the normal
1057  * shutdown of the server.
1058  */
1059  if (stream->walmethod->sync(walfile) != 0)
1060  {
1061  fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
1063  return false;
1064  }
1065  lastFlushPosition = blockpos;
1066  }
1067 
1068  now = feGetCurrentTimestamp();
1069  if (!sendFeedback(conn, blockpos, now, false))
1070  return false;
1071  *last_status = now;
1072  }
1073 
1074  return true;
1075 }
1076 
1077 /*
1078  * Process XLogData message.
1079  */
1080 static bool
1081 ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1082  XLogRecPtr *blockpos)
1083 {
1084  int xlogoff;
1085  int bytes_left;
1086  int bytes_written;
1087  int hdr_len;
1088 
1089  /*
1090  * Once we've decided we don't want to receive any more, just ignore any
1091  * subsequent XLogData messages.
1092  */
1093  if (!(still_sending))
1094  return true;
1095 
1096  /*
1097  * Read the header of the XLogData message, enclosed in the CopyData
1098  * message. We only need the WAL location field (dataStart), the rest of
1099  * the header is ignored.
1100  */
1101  hdr_len = 1; /* msgtype 'w' */
1102  hdr_len += 8; /* dataStart */
1103  hdr_len += 8; /* walEnd */
1104  hdr_len += 8; /* sendTime */
1105  if (len < hdr_len)
1106  {
1107  fprintf(stderr, _("%s: streaming header too small: %d\n"),
1108  progname, len);
1109  return false;
1110  }
1111  *blockpos = fe_recvint64(&copybuf[1]);
1112 
1113  /* Extract WAL location for this block */
1114  xlogoff = *blockpos % XLOG_SEG_SIZE;
1115 
1116  /*
1117  * Verify that the initial location in the stream matches where we think
1118  * we are.
1119  */
1120  if (walfile == NULL)
1121  {
1122  /* No file open yet */
1123  if (xlogoff != 0)
1124  {
1125  fprintf(stderr,
1126  _("%s: received write-ahead log record for offset %u with no file open\n"),
1127  progname, xlogoff);
1128  return false;
1129  }
1130  }
1131  else
1132  {
1133  /* More data in existing segment */
1134  if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1135  {
1136  fprintf(stderr,
1137  _("%s: got WAL data offset %08x, expected %08x\n"),
1138  progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1139  return false;
1140  }
1141  }
1142 
1143  bytes_left = len - hdr_len;
1144  bytes_written = 0;
1145 
1146  while (bytes_left)
1147  {
1148  int bytes_to_write;
1149 
1150  /*
1151  * If crossing a WAL boundary, only write up until we reach
1152  * XLOG_SEG_SIZE.
1153  */
1154  if (xlogoff + bytes_left > XLOG_SEG_SIZE)
1155  bytes_to_write = XLOG_SEG_SIZE - xlogoff;
1156  else
1157  bytes_to_write = bytes_left;
1158 
1159  if (walfile == NULL)
1160  {
1161  if (!open_walfile(stream, *blockpos))
1162  {
1163  /* Error logged by open_walfile */
1164  return false;
1165  }
1166  }
1167 
1168  if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1169  bytes_to_write) != bytes_to_write)
1170  {
1171  fprintf(stderr,
1172  _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
1173  progname, bytes_to_write, current_walfile_name,
1174  stream->walmethod->getlasterror());
1175  return false;
1176  }
1177 
1178  /* Write was successful, advance our position */
1179  bytes_written += bytes_to_write;
1180  bytes_left -= bytes_to_write;
1181  *blockpos += bytes_to_write;
1182  xlogoff += bytes_to_write;
1183 
1184  /* Did we reach the end of a WAL segment? */
1185  if (*blockpos % XLOG_SEG_SIZE == 0)
1186  {
1187  if (!close_walfile(stream, *blockpos))
1188  /* Error message written in close_walfile() */
1189  return false;
1190 
1191  xlogoff = 0;
1192 
1193  if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1194  {
1195  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1196  {
1197  fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1198  progname, PQerrorMessage(conn));
1199  return false;
1200  }
1201  still_sending = false;
1202  return true; /* ignore the rest of this XLogData packet */
1203  }
1204  }
1205  }
1206  /* No more data left to write, receive next copy packet */
1207 
1208  return true;
1209 }
1210 
1211 /*
1212  * Handle end of the copy stream.
1213  */
1214 static PGresult *
1216  XLogRecPtr blockpos, XLogRecPtr *stoppos)
1217 {
1218  PGresult *res = PQgetResult(conn);
1219 
1220  /*
1221  * The server closed its end of the copy stream. If we haven't closed
1222  * ours already, we need to do so now, unless the server threw an error,
1223  * in which case we don't.
1224  */
1225  if (still_sending)
1226  {
1227  if (!close_walfile(stream, blockpos))
1228  {
1229  /* Error message written in close_walfile() */
1230  PQclear(res);
1231  return NULL;
1232  }
1233  if (PQresultStatus(res) == PGRES_COPY_IN)
1234  {
1235  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1236  {
1237  fprintf(stderr,
1238  _("%s: could not send copy-end packet: %s"),
1239  progname, PQerrorMessage(conn));
1240  PQclear(res);
1241  return NULL;
1242  }
1243  res = PQgetResult(conn);
1244  }
1245  still_sending = false;
1246  }
1247  if (copybuf != NULL)
1248  PQfreemem(copybuf);
1249  *stoppos = blockpos;
1250  return res;
1251 }
1252 
1253 /*
1254  * Check if we should continue streaming, or abort at this point.
1255  */
1256 static bool
1258  XLogRecPtr *stoppos)
1259 {
1260  if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1261  {
1262  if (!close_walfile(stream, blockpos))
1263  {
1264  /* Potential error message is written by close_walfile */
1265  return false;
1266  }
1267  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1268  {
1269  fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1270  progname, PQerrorMessage(conn));
1271  return false;
1272  }
1273  still_sending = false;
1274  }
1275 
1276  return true;
1277 }
1278 
1279 /*
1280  * Calculate how long send/receive loops should sleep
1281  */
1282 static long
1284  TimestampTz last_status)
1285 {
1286  TimestampTz status_targettime = 0;
1287  long sleeptime;
1288 
1289  if (standby_message_timeout && still_sending)
1290  status_targettime = last_status +
1291  (standby_message_timeout - 1) * ((int64) 1000);
1292 
1293  if (status_targettime > 0)
1294  {
1295  long secs;
1296  int usecs;
1297 
1299  status_targettime,
1300  &secs,
1301  &usecs);
1302  /* Always sleep at least 1 sec */
1303  if (secs <= 0)
1304  {
1305  secs = 1;
1306  usecs = 0;
1307  }
1308 
1309  sleeptime = secs * 1000 + usecs / 1000;
1310  }
1311  else
1312  sleeptime = -1;
1313 
1314  return sleeptime;
1315 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2221
#define XLogSegSize
Definition: xlog_internal.h:92
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:91
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6097
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
Definition: receivelog.c:902
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:441
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
void * Walfile
Definition: walmethods.h:13
static void error(void)
Definition: sql-dyntest.c:147
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6062
static bool still_sending
Definition: receivelog.c:38
int64 TimestampTz
Definition: timestamp.h:39
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:50
#define XLogFileName(fname, tli, logSegNo)
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1257
char * sysidentifier
Definition: receivelog.h:34
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2288
#define select(n, r, w, e, timeout)
Definition: win32.h:374
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr startpos
Definition: receivelog.h:32
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1283
int sleeptime
Definition: pg_standby.c:40
char * partial_suffix
Definition: receivelog.h:48
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6087
const char * progname
Definition: pg_standby.c:37
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:482
TimeLineID timeline
Definition: receivelog.h:33
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2377
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:1023
ssize_t(* get_file_size)(const char *pathname)
Definition: walmethods.h:53
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
Definition: receivelog.c:966
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:725
PGconn * conn
Definition: streamutil.c:43
#define MAXPGPATH
static Walfile * walfile
Definition: receivelog.c:33
int PQflush(PGconn *conn)
Definition: fe-exec.c:3187
char * replication_slot
Definition: receivelog.h:49
bool mark_done
Definition: receivelog.h:38
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:460
#define TLHistoryFileName(fname, tli)
static bool reportFlushPosition
Definition: receivelog.c:35
uint64 XLogSegNo
Definition: xlogdefs.h:34
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:62
unsigned int uint32
Definition: c.h:268
int pgsocket
Definition: port.h:22
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:184
stream_stop_callback stream_stop
Definition: receivelog.h:42
WalWriteMethod * walmethod
Definition: receivelog.h:47
int(* sync)(Walfile f)
Definition: walmethods.h:67
#define MAXFNAMELEN
static int standby_message_timeout
Definition: pg_basebackup.c:90
#define ngettext(s, p, n)
Definition: c.h:127
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:328
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:36
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1215
#define PGINVALID_SOCKET
Definition: port.h:24
#define EINTR
Definition: win32.h:285
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1631
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1081
StringInfo copybuf
Definition: tablesync.c:114
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:59
void PQclear(PGresult *res)
Definition: fe-exec.c:650
static void * fn(void *arg)
#define Max(x, y)
Definition: c.h:801
#define XLByteToSeg(xlrp, logSegNo)
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:773
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:250
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
Definition: receivelog.c:446
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:41
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
const char *(* getlasterror)(void)
Definition: walmethods.h:78
static XLogRecPtr startpos
bool synchronous
Definition: receivelog.h:37
pgsocket stop_socket
Definition: receivelog.h:44
static char * filename
Definition: pg_dumpall.c:90
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2612
int64 fe_recvint64(char *buf)
Definition: streamutil.c:514
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:60
const char * strerror(int errnum)
Definition: strerror.c:19
int standby_message_timeout
Definition: receivelog.h:36
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:495
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:34
bool temp_slot
Definition: receivelog.h:50
#define _(x)
Definition: elog.c:84
void PQfreemem(void *ptr)
Definition: fe-exec.c:3200
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6115
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1702
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:267
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:366
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47