PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
receivelog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * receivelog.c - receive transaction log files using the streaming
4  * replication protocol.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/receivelog.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22 
23 /* local includes */
24 #include "receivelog.h"
25 #include "streamutil.h"
26 
27 #include "libpq-fe.h"
28 #include "access/xlog_internal.h"
29 #include "common/file_utils.h"
30 
31 
32 /* fd and filename for currently open WAL file */
33 static Walfile *walfile = NULL;
34 static char current_walfile_name[MAXPGPATH] = "";
35 static bool reportFlushPosition = false;
37 
38 static bool still_sending = true; /* feedback still needs to be sent? */
39 
41  XLogRecPtr *stoppos);
42 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
43 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
44 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
45  int len, XLogRecPtr blockpos, TimestampTz *last_status);
46 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
47  XLogRecPtr *blockpos);
49  XLogRecPtr blockpos, XLogRecPtr *stoppos);
50 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
51  XLogRecPtr *stoppos);
53  TimestampTz last_status);
54 
56  uint32 *timeline);
57 
58 static bool
59 mark_file_as_archived(StreamCtl *stream, const char *fname)
60 {
61  Walfile *f;
62  static char tmppath[MAXPGPATH];
63 
64  snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
65  fname);
66 
67  f = stream->walmethod->open_for_write(tmppath, NULL, 0);
68  if (f == NULL)
69  {
70  fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
71  progname, tmppath, stream->walmethod->getlasterror());
72  return false;
73  }
74 
75  stream->walmethod->close(f, CLOSE_NORMAL);
76 
77  return true;
78 }
79 
80 /*
81  * Open a new WAL file in the specified directory.
82  *
83  * Returns true if OK; on failure, returns false after printing an error msg.
84  * On success, 'walfile' is set to the FD for the file, and the base filename
85  * (without partial_suffix) is stored in 'current_walfile_name'.
86  *
87  * The file will be padded to 16Mb with zeroes.
88  */
89 static bool
90 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
91 {
92  Walfile *f;
93  char fn[MAXPGPATH];
94  ssize_t size;
95  XLogSegNo segno;
96 
97  XLByteToSeg(startpoint, segno);
99 
100  snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
101  stream->partial_suffix ? stream->partial_suffix : "");
102 
103  /*
104  * When streaming to files, if an existing file exists we verify that it's
105  * either empty (just created), or a complete XLogSegSize segment (in
106  * which case it has been created and padded). Anything else indicates a
107  * corrupt file.
108  *
109  * When streaming to tar, no file with this name will exist before, so we
110  * never have to verify a size.
111  */
112  if (stream->walmethod->existsfile(fn))
113  {
114  size = stream->walmethod->get_file_size(fn);
115  if (size < 0)
116  {
117  fprintf(stderr,
118  _("%s: could not get size of transaction log file \"%s\": %s\n"),
119  progname, fn, stream->walmethod->getlasterror());
120  return false;
121  }
122  if (size == XLogSegSize)
123  {
124  /* Already padded file. Open it for use */
126  if (f == NULL)
127  {
128  fprintf(stderr,
129  _("%s: could not open existing transaction log file \"%s\": %s\n"),
130  progname, fn, stream->walmethod->getlasterror());
131  return false;
132  }
133 
134  /* fsync file in case of a previous crash */
135  if (stream->walmethod->sync(f) != 0)
136  {
137  fprintf(stderr,
138  _("%s: could not sync existing transaction log file \"%s\": %s\n"),
139  progname, fn, stream->walmethod->getlasterror());
140  stream->walmethod->close(f, CLOSE_UNLINK);
141  return false;
142  }
143 
144  walfile = f;
145  return true;
146  }
147  if (size != 0)
148  {
149  /* if write didn't set errno, assume problem is no disk space */
150  if (errno == 0)
151  errno = ENOSPC;
152  fprintf(stderr,
153  _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
154  progname, fn, (int) size, XLogSegSize);
155  return false;
156  }
157  /* File existed and was empty, so fall through and open */
158  }
159 
160  /* No file existed, so create one */
161 
163  if (f == NULL)
164  {
165  fprintf(stderr,
166  _("%s: could not open transaction log file \"%s\": %s\n"),
167  progname, fn, stream->walmethod->getlasterror());
168  return false;
169  }
170 
171  walfile = f;
172  return true;
173 }
174 
175 /*
176  * Close the current WAL file (if open), and rename it to the correct
177  * filename if it's complete. On failure, prints an error message to stderr
178  * and returns false, otherwise returns true.
179  */
180 static bool
182 {
183  off_t currpos;
184  int r;
185 
186  if (walfile == NULL)
187  return true;
188 
189  currpos = stream->walmethod->get_current_pos(walfile);
190  if (currpos == -1)
191  {
192  fprintf(stderr,
193  _("%s: could not determine seek position in file \"%s\": %s\n"),
195  stream->walmethod->close(walfile, CLOSE_UNLINK);
196  walfile = NULL;
197 
198  return false;
199  }
200 
201  if (stream->partial_suffix)
202  {
203  if (currpos == XLOG_SEG_SIZE)
204  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
205  else
206  {
207  fprintf(stderr,
208  _("%s: not renaming \"%s%s\", segment is not complete\n"),
210  r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
211  }
212  }
213  else
214  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
215 
216  walfile = NULL;
217 
218  if (r != 0)
219  {
220  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
222  return false;
223  }
224 
225  /*
226  * Mark file as archived if requested by the caller - pg_basebackup needs
227  * to do so as files can otherwise get archived again after promotion of a
228  * new node. This is in line with walreceiver.c always doing a
229  * XLogArchiveForceDone() after a complete segment.
230  */
231  if (currpos == XLOG_SEG_SIZE && stream->mark_done)
232  {
233  /* writes error message if failed */
235  return false;
236  }
237 
238  lastFlushPosition = pos;
239  return true;
240 }
241 
242 
243 /*
244  * Check if a timeline history file exists.
245  */
246 static bool
248 {
249  char histfname[MAXFNAMELEN];
250 
251  /*
252  * Timeline 1 never has a history file. We treat that as if it existed,
253  * since we never need to stream it.
254  */
255  if (stream->timeline == 1)
256  return true;
257 
258  TLHistoryFileName(histfname, stream->timeline);
259 
260  return stream->walmethod->existsfile(histfname);
261 }
262 
263 static bool
264 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
265 {
266  int size = strlen(content);
267  char histfname[MAXFNAMELEN];
268  Walfile *f;
269 
270  /*
271  * Check that the server's idea of how timeline history files should be
272  * named matches ours.
273  */
274  TLHistoryFileName(histfname, stream->timeline);
275  if (strcmp(histfname, filename) != 0)
276  {
277  fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
278  progname, stream->timeline, filename);
279  return false;
280  }
281 
282  f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
283  if (f == NULL)
284  {
285  fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
286  progname, histfname, stream->walmethod->getlasterror());
287  return false;
288  }
289 
290  if ((int) stream->walmethod->write(f, content, size) != size)
291  {
292  fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
293  progname, histfname, stream->walmethod->getlasterror());
294 
295  /*
296  * If we fail to make the file, delete it to release disk space
297  */
298  stream->walmethod->close(f, CLOSE_UNLINK);
299 
300  return false;
301  }
302 
303  if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
304  {
305  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
306  progname, histfname, stream->walmethod->getlasterror());
307  return false;
308  }
309 
310  /* Maintain archive_status, check close_walfile() for details. */
311  if (stream->mark_done)
312  {
313  /* writes error message if failed */
314  if (!mark_file_as_archived(stream, histfname))
315  return false;
316  }
317 
318  return true;
319 }
320 
321 /*
322  * Send a Standby Status Update message to server.
323  */
324 static bool
325 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
326 {
327  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
328  int len = 0;
329 
330  replybuf[len] = 'r';
331  len += 1;
332  fe_sendint64(blockpos, &replybuf[len]); /* write */
333  len += 8;
335  fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
336  else
337  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
338  len += 8;
339  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
340  len += 8;
341  fe_sendint64(now, &replybuf[len]); /* sendTime */
342  len += 8;
343  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
344  len += 1;
345 
346  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
347  {
348  fprintf(stderr, _("%s: could not send feedback packet: %s"),
349  progname, PQerrorMessage(conn));
350  return false;
351  }
352 
353  return true;
354 }
355 
356 /*
357  * Check that the server version we're connected to is supported by
358  * ReceiveXlogStream().
359  *
360  * If it's not, an error message is printed to stderr, and false is returned.
361  */
362 bool
364 {
365  int minServerMajor,
366  maxServerMajor;
367  int serverMajor;
368 
369  /*
370  * The message format used in streaming replication changed in 9.3, so we
371  * cannot stream from older servers. And we don't support servers newer
372  * than the client; it might work, but we don't know, so err on the safe
373  * side.
374  */
375  minServerMajor = 903;
376  maxServerMajor = PG_VERSION_NUM / 100;
377  serverMajor = PQserverVersion(conn) / 100;
378  if (serverMajor < minServerMajor)
379  {
380  const char *serverver = PQparameterStatus(conn, "server_version");
381 
382  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
383  progname,
384  serverver ? serverver : "'unknown'",
385  "9.3");
386  return false;
387  }
388  else if (serverMajor > maxServerMajor)
389  {
390  const char *serverver = PQparameterStatus(conn, "server_version");
391 
392  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
393  progname,
394  serverver ? serverver : "'unknown'",
395  PG_VERSION);
396  return false;
397  }
398  return true;
399 }
400 
401 /*
402  * Receive a log stream starting at the specified position.
403  *
404  * Individual parameters are passed through the StreamCtl structure.
405  *
406  * If sysidentifier is specified, validate that both the system
407  * identifier and the timeline matches the specified ones
408  * (by sending an extra IDENTIFY_SYSTEM command)
409  *
410  * All received segments will be written to the directory
411  * specified by basedir. This will also fetch any missing timeline history
412  * files.
413  *
414  * The stream_stop callback will be called every time data
415  * is received, and whenever a segment is completed. If it returns
416  * true, the streaming will stop and the function
417  * return. As long as it returns false, streaming will continue
418  * indefinitely.
419  *
420  * standby_message_timeout controls how often we send a message
421  * back to the master letting it know our progress, in milliseconds.
422  * This message will only contain the write location, and never
423  * flush or replay.
424  *
425  * If 'partial_suffix' is not NULL, files are initially created with the
426  * given suffix, and the suffix is removed once the file is finished. That
427  * allows you to tell the difference between partial and completed files,
428  * so that you can continue later where you left.
429  *
430  * If 'synchronous' is true, the received WAL is flushed as soon as written,
431  * otherwise only when the WAL file is closed.
432  *
433  * Note: The log position *must* be at a log segment start!
434  */
435 bool
437 {
438  char query[128];
439  char slotcmd[128];
440  PGresult *res;
441  XLogRecPtr stoppos;
442 
443  /*
444  * The caller should've checked the server version already, but doesn't do
445  * any harm to check it here too.
446  */
448  return false;
449 
450  /*
451  * Decide whether we want to report the flush position. If we report
452  * the flush position, the primary will know what WAL we'll
453  * possibly re-request, and it can then remove older WAL safely.
454  * We must always do that when we are using slots.
455  *
456  * Reporting the flush position makes one eligible as a synchronous
457  * replica. People shouldn't include generic names in
458  * synchronous_standby_names, but we've protected them against it so
459  * far, so let's continue to do so unless specifically requested.
460  */
461  if (stream->replication_slot != NULL)
462  {
463  reportFlushPosition = true;
464  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
465  }
466  else
467  {
468  if (stream->synchronous)
469  reportFlushPosition = true;
470  else
471  reportFlushPosition = false;
472  slotcmd[0] = 0;
473  }
474 
475  if (stream->sysidentifier != NULL)
476  {
477  /* Validate system identifier hasn't changed */
478  res = PQexec(conn, "IDENTIFY_SYSTEM");
479  if (PQresultStatus(res) != PGRES_TUPLES_OK)
480  {
481  fprintf(stderr,
482  _("%s: could not send replication command \"%s\": %s"),
483  progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
484  PQclear(res);
485  return false;
486  }
487  if (PQntuples(res) != 1 || PQnfields(res) < 3)
488  {
489  fprintf(stderr,
490  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
491  progname, PQntuples(res), PQnfields(res), 1, 3);
492  PQclear(res);
493  return false;
494  }
495  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
496  {
497  fprintf(stderr,
498  _("%s: system identifier does not match between base backup and streaming connection\n"),
499  progname);
500  PQclear(res);
501  return false;
502  }
503  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
504  {
505  fprintf(stderr,
506  _("%s: starting timeline %u is not present in the server\n"),
507  progname, stream->timeline);
508  PQclear(res);
509  return false;
510  }
511  PQclear(res);
512  }
513 
514  /*
515  * Create temporary replication slot if one is needed
516  */
517  if (stream->temp_slot)
518  {
519  snprintf(query, sizeof(query),
520  "CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL",
521  stream->replication_slot);
522  res = PQexec(conn, query);
523  if (PQresultStatus(res) != PGRES_TUPLES_OK)
524  {
525  fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"),
526  progname, stream->replication_slot, PQerrorMessage(conn));
527  PQclear(res);
528  return false;
529  }
530  }
531 
532  /*
533  * initialize flush position to starting point, it's the caller's
534  * responsibility that that's sane.
535  */
536  lastFlushPosition = stream->startpos;
537 
538  while (1)
539  {
540  /*
541  * Fetch the timeline history file for this timeline, if we don't have
542  * it already. When streaming log to tar, this will always return
543  * false, as we are never streaming into an existing file and
544  * therefore there can be no pre-existing timeline history file.
545  */
546  if (!existsTimeLineHistoryFile(stream))
547  {
548  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
549  res = PQexec(conn, query);
550  if (PQresultStatus(res) != PGRES_TUPLES_OK)
551  {
552  /* FIXME: we might send it ok, but get an error */
553  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
554  progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
555  PQclear(res);
556  return false;
557  }
558 
559  /*
560  * The response to TIMELINE_HISTORY is a single row result set
561  * with two fields: filename and content
562  */
563  if (PQnfields(res) != 2 || PQntuples(res) != 1)
564  {
565  fprintf(stderr,
566  _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
567  progname, PQntuples(res), PQnfields(res), 1, 2);
568  }
569 
570  /* Write the history file to disk */
572  PQgetvalue(res, 0, 0),
573  PQgetvalue(res, 0, 1));
574 
575  PQclear(res);
576  }
577 
578  /*
579  * Before we start streaming from the requested location, check if the
580  * callback tells us to stop here.
581  */
582  if (stream->stream_stop(stream->startpos, stream->timeline, false))
583  return true;
584 
585  /* Initiate the replication stream at specified location */
586  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
587  slotcmd,
588  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
589  stream->timeline);
590  res = PQexec(conn, query);
591  if (PQresultStatus(res) != PGRES_COPY_BOTH)
592  {
593  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
594  progname, "START_REPLICATION", PQresultErrorMessage(res));
595  PQclear(res);
596  return false;
597  }
598  PQclear(res);
599 
600  /* Stream the WAL */
601  res = HandleCopyStream(conn, stream, &stoppos);
602  if (res == NULL)
603  goto error;
604 
605  /*
606  * Streaming finished.
607  *
608  * There are two possible reasons for that: a controlled shutdown, or
609  * we reached the end of the current timeline. In case of
610  * end-of-timeline, the server sends a result set after Copy has
611  * finished, containing information about the next timeline. Read
612  * that, and restart streaming from the next timeline. In case of
613  * controlled shutdown, stop here.
614  */
615  if (PQresultStatus(res) == PGRES_TUPLES_OK)
616  {
617  /*
618  * End-of-timeline. Read the next timeline's ID and starting
619  * position. Usually, the starting position will match the end of
620  * the previous timeline, but there are corner cases like if the
621  * server had sent us half of a WAL record, when it was promoted.
622  * The new timeline will begin at the end of the last complete
623  * record in that case, overlapping the partial WAL record on the
624  * the old timeline.
625  */
626  uint32 newtimeline;
627  bool parsed;
628 
629  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
630  PQclear(res);
631  if (!parsed)
632  goto error;
633 
634  /* Sanity check the values the server gave us */
635  if (newtimeline <= stream->timeline)
636  {
637  fprintf(stderr,
638  _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
639  progname, newtimeline, stream->timeline);
640  goto error;
641  }
642  if (stream->startpos > stoppos)
643  {
644  fprintf(stderr,
645  _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
646  progname,
647  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
648  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
649  goto error;
650  }
651 
652  /* Read the final result, which should be CommandComplete. */
653  res = PQgetResult(conn);
654  if (PQresultStatus(res) != PGRES_COMMAND_OK)
655  {
656  fprintf(stderr,
657  _("%s: unexpected termination of replication stream: %s"),
659  PQclear(res);
660  goto error;
661  }
662  PQclear(res);
663 
664  /*
665  * Loop back to start streaming from the new timeline. Always
666  * start streaming at the beginning of a segment.
667  */
668  stream->timeline = newtimeline;
669  stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
670  continue;
671  }
672  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
673  {
674  PQclear(res);
675 
676  /*
677  * End of replication (ie. controlled shut down of the server).
678  *
679  * Check if the callback thinks it's OK to stop here. If not,
680  * complain.
681  */
682  if (stream->stream_stop(stoppos, stream->timeline, false))
683  return true;
684  else
685  {
686  fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
687  progname);
688  goto error;
689  }
690  }
691  else
692  {
693  /* Server returned an error. */
694  fprintf(stderr,
695  _("%s: unexpected termination of replication stream: %s"),
697  PQclear(res);
698  goto error;
699  }
700  }
701 
702 error:
703  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
704  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
706  walfile = NULL;
707  return false;
708 }
709 
710 /*
711  * Helper function to parse the result set returned by server after streaming
712  * has finished. On failure, prints an error to stderr and returns false.
713  */
714 static bool
716 {
717  uint32 startpos_xlogid,
718  startpos_xrecoff;
719 
720  /*----------
721  * The result set consists of one row and two columns, e.g:
722  *
723  * next_tli | next_tli_startpos
724  * ----------+-------------------
725  * 4 | 0/9949AE0
726  *
727  * next_tli is the timeline ID of the next timeline after the one that
728  * just finished streaming. next_tli_startpos is the XLOG position where
729  * the server switched to it.
730  *----------
731  */
732  if (PQnfields(res) < 2 || PQntuples(res) != 1)
733  {
734  fprintf(stderr,
735  _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
736  progname, PQntuples(res), PQnfields(res), 1, 2);
737  return false;
738  }
739 
740  *timeline = atoi(PQgetvalue(res, 0, 0));
741  if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
742  &startpos_xrecoff) != 2)
743  {
744  fprintf(stderr,
745  _("%s: could not parse next timeline's starting point \"%s\"\n"),
746  progname, PQgetvalue(res, 0, 1));
747  return false;
748  }
749  *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
750 
751  return true;
752 }
753 
754 /*
755  * The main loop of ReceiveXlogStream. Handles the COPY stream after
756  * initiating streaming with the START_STREAMING command.
757  *
758  * If the COPY ends (not necessarily successfully) due a message from the
759  * server, returns a PGresult and sets *stoppos to the last byte written.
760  * On any other sort of error, returns NULL.
761  */
762 static PGresult *
764  XLogRecPtr *stoppos)
765 {
766  char *copybuf = NULL;
767  TimestampTz last_status = -1;
768  XLogRecPtr blockpos = stream->startpos;
769 
770  still_sending = true;
771 
772  while (1)
773  {
774  int r;
776  long sleeptime;
777 
778  /*
779  * Check if we should continue streaming, or abort at this point.
780  */
781  if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
782  goto error;
783 
784  now = feGetCurrentTimestamp();
785 
786  /*
787  * If synchronous option is true, issue sync command as soon as there
788  * are WAL data which has not been flushed yet.
789  */
790  if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
791  {
792  if (stream->walmethod->sync(walfile) != 0)
793  {
794  fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
796  goto error;
797  }
798  lastFlushPosition = blockpos;
799 
800  /*
801  * Send feedback so that the server sees the latest WAL locations
802  * immediately.
803  */
804  if (!sendFeedback(conn, blockpos, now, false))
805  goto error;
806  last_status = now;
807  }
808 
809  /*
810  * Potentially send a status message to the master
811  */
812  if (still_sending && stream->standby_message_timeout > 0 &&
813  feTimestampDifferenceExceeds(last_status, now,
814  stream->standby_message_timeout))
815  {
816  /* Time to send feedback! */
817  if (!sendFeedback(conn, blockpos, now, false))
818  goto error;
819  last_status = now;
820  }
821 
822  /*
823  * Calculate how long send/receive loops should sleep
824  */
825  sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
826  last_status);
827 
828  r = CopyStreamReceive(conn, sleeptime, &copybuf);
829  while (r != 0)
830  {
831  if (r == -1)
832  goto error;
833  if (r == -2)
834  {
835  PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
836 
837  if (res == NULL)
838  goto error;
839  else
840  return res;
841  }
842 
843  /* Check the message type. */
844  if (copybuf[0] == 'k')
845  {
846  if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
847  &last_status))
848  goto error;
849  }
850  else if (copybuf[0] == 'w')
851  {
852  if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
853  goto error;
854 
855  /*
856  * Check if we should continue streaming, or abort at this
857  * point.
858  */
859  if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
860  goto error;
861  }
862  else
863  {
864  fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
865  progname, copybuf[0]);
866  goto error;
867  }
868 
869  /*
870  * Process the received data, and any subsequent data we can read
871  * without blocking.
872  */
873  r = CopyStreamReceive(conn, 0, &copybuf);
874  }
875  }
876 
877 error:
878  if (copybuf != NULL)
879  PQfreemem(copybuf);
880  return NULL;
881 }
882 
883 /*
884  * Wait until we can read CopyData message, or timeout.
885  *
886  * Returns 1 if data has become available for reading, 0 if timed out
887  * or interrupted by signal, and -1 on an error.
888  */
889 static int
890 CopyStreamPoll(PGconn *conn, long timeout_ms)
891 {
892  int ret;
893  fd_set input_mask;
894  struct timeval timeout;
895  struct timeval *timeoutptr;
896 
897  if (PQsocket(conn) < 0)
898  {
899  fprintf(stderr, _("%s: invalid socket: %s"), progname,
900  PQerrorMessage(conn));
901  return -1;
902  }
903 
904  FD_ZERO(&input_mask);
905  FD_SET(PQsocket(conn), &input_mask);
906 
907  if (timeout_ms < 0)
908  timeoutptr = NULL;
909  else
910  {
911  timeout.tv_sec = timeout_ms / 1000L;
912  timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
913  timeoutptr = &timeout;
914  }
915 
916  ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
917  if (ret == 0 || (ret < 0 && errno == EINTR))
918  return 0; /* Got a timeout or signal */
919  else if (ret < 0)
920  {
921  fprintf(stderr, _("%s: select() failed: %s\n"),
922  progname, strerror(errno));
923  return -1;
924  }
925 
926  return 1;
927 }
928 
929 /*
930  * Receive CopyData message available from XLOG stream, blocking for
931  * maximum of 'timeout' ms.
932  *
933  * If data was received, returns the length of the data. *buffer is set to
934  * point to a buffer holding the received message. The buffer is only valid
935  * until the next CopyStreamReceive call.
936  *
937  * 0 if no data was available within timeout, or wait was interrupted
938  * by signal. -1 on error. -2 if the server ended the COPY.
939  */
940 static int
941 CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
942 {
943  char *copybuf = NULL;
944  int rawlen;
945 
946  if (*buffer != NULL)
947  PQfreemem(*buffer);
948  *buffer = NULL;
949 
950  /* Try to receive a CopyData message */
951  rawlen = PQgetCopyData(conn, &copybuf, 1);
952  if (rawlen == 0)
953  {
954  /*
955  * No data available. Wait for some to appear, but not longer than the
956  * specified timeout, so that we can ping the server.
957  */
958  if (timeout != 0)
959  {
960  int ret;
961 
962  ret = CopyStreamPoll(conn, timeout);
963  if (ret <= 0)
964  return ret;
965  }
966 
967  /* Else there is actually data on the socket */
968  if (PQconsumeInput(conn) == 0)
969  {
970  fprintf(stderr,
971  _("%s: could not receive data from WAL stream: %s"),
972  progname, PQerrorMessage(conn));
973  return -1;
974  }
975 
976  /* Now that we've consumed some input, try again */
977  rawlen = PQgetCopyData(conn, &copybuf, 1);
978  if (rawlen == 0)
979  return 0;
980  }
981  if (rawlen == -1) /* end-of-streaming or error */
982  return -2;
983  if (rawlen == -2)
984  {
985  fprintf(stderr, _("%s: could not read COPY data: %s"),
986  progname, PQerrorMessage(conn));
987  return -1;
988  }
989 
990  /* Return received messages to caller */
991  *buffer = copybuf;
992  return rawlen;
993 }
994 
995 /*
996  * Process the keepalive message.
997  */
998 static bool
999 ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1000  XLogRecPtr blockpos, TimestampTz *last_status)
1001 {
1002  int pos;
1003  bool replyRequested;
1004  TimestampTz now;
1005 
1006  /*
1007  * Parse the keepalive message, enclosed in the CopyData message. We just
1008  * check if the server requested a reply, and ignore the rest.
1009  */
1010  pos = 1; /* skip msgtype 'k' */
1011  pos += 8; /* skip walEnd */
1012  pos += 8; /* skip sendTime */
1013 
1014  if (len < pos + 1)
1015  {
1016  fprintf(stderr, _("%s: streaming header too small: %d\n"),
1017  progname, len);
1018  return false;
1019  }
1020  replyRequested = copybuf[pos];
1021 
1022  /* If the server requested an immediate reply, send one. */
1023  if (replyRequested && still_sending)
1024  {
1025  if (reportFlushPosition && lastFlushPosition < blockpos &&
1026  walfile != NULL)
1027  {
1028  /*
1029  * If a valid flush location needs to be reported, flush the
1030  * current WAL file so that the latest flush location is sent back
1031  * to the server. This is necessary to see whether the last WAL
1032  * data has been successfully replicated or not, at the normal
1033  * shutdown of the server.
1034  */
1035  if (stream->walmethod->sync(walfile) != 0)
1036  {
1037  fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
1039  return false;
1040  }
1041  lastFlushPosition = blockpos;
1042  }
1043 
1044  now = feGetCurrentTimestamp();
1045  if (!sendFeedback(conn, blockpos, now, false))
1046  return false;
1047  *last_status = now;
1048  }
1049 
1050  return true;
1051 }
1052 
1053 /*
1054  * Process XLogData message.
1055  */
1056 static bool
1057 ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1058  XLogRecPtr *blockpos)
1059 {
1060  int xlogoff;
1061  int bytes_left;
1062  int bytes_written;
1063  int hdr_len;
1064 
1065  /*
1066  * Once we've decided we don't want to receive any more, just ignore any
1067  * subsequent XLogData messages.
1068  */
1069  if (!(still_sending))
1070  return true;
1071 
1072  /*
1073  * Read the header of the XLogData message, enclosed in the CopyData
1074  * message. We only need the WAL location field (dataStart), the rest of
1075  * the header is ignored.
1076  */
1077  hdr_len = 1; /* msgtype 'w' */
1078  hdr_len += 8; /* dataStart */
1079  hdr_len += 8; /* walEnd */
1080  hdr_len += 8; /* sendTime */
1081  if (len < hdr_len)
1082  {
1083  fprintf(stderr, _("%s: streaming header too small: %d\n"),
1084  progname, len);
1085  return false;
1086  }
1087  *blockpos = fe_recvint64(&copybuf[1]);
1088 
1089  /* Extract WAL location for this block */
1090  xlogoff = *blockpos % XLOG_SEG_SIZE;
1091 
1092  /*
1093  * Verify that the initial location in the stream matches where we think
1094  * we are.
1095  */
1096  if (walfile == NULL)
1097  {
1098  /* No file open yet */
1099  if (xlogoff != 0)
1100  {
1101  fprintf(stderr,
1102  _("%s: received transaction log record for offset %u with no file open\n"),
1103  progname, xlogoff);
1104  return false;
1105  }
1106  }
1107  else
1108  {
1109  /* More data in existing segment */
1110  if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1111  {
1112  fprintf(stderr,
1113  _("%s: got WAL data offset %08x, expected %08x\n"),
1114  progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1115  return false;
1116  }
1117  }
1118 
1119  bytes_left = len - hdr_len;
1120  bytes_written = 0;
1121 
1122  while (bytes_left)
1123  {
1124  int bytes_to_write;
1125 
1126  /*
1127  * If crossing a WAL boundary, only write up until we reach
1128  * XLOG_SEG_SIZE.
1129  */
1130  if (xlogoff + bytes_left > XLOG_SEG_SIZE)
1131  bytes_to_write = XLOG_SEG_SIZE - xlogoff;
1132  else
1133  bytes_to_write = bytes_left;
1134 
1135  if (walfile == NULL)
1136  {
1137  if (!open_walfile(stream, *blockpos))
1138  {
1139  /* Error logged by open_walfile */
1140  return false;
1141  }
1142  }
1143 
1144  if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1145  bytes_to_write) != bytes_to_write)
1146  {
1147  fprintf(stderr,
1148  _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
1149  progname, bytes_to_write, current_walfile_name,
1150  stream->walmethod->getlasterror());
1151  return false;
1152  }
1153 
1154  /* Write was successful, advance our position */
1155  bytes_written += bytes_to_write;
1156  bytes_left -= bytes_to_write;
1157  *blockpos += bytes_to_write;
1158  xlogoff += bytes_to_write;
1159 
1160  /* Did we reach the end of a WAL segment? */
1161  if (*blockpos % XLOG_SEG_SIZE == 0)
1162  {
1163  if (!close_walfile(stream, *blockpos))
1164  /* Error message written in close_walfile() */
1165  return false;
1166 
1167  xlogoff = 0;
1168 
1169  if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1170  {
1171  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1172  {
1173  fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1174  progname, PQerrorMessage(conn));
1175  return false;
1176  }
1177  still_sending = false;
1178  return true; /* ignore the rest of this XLogData packet */
1179  }
1180  }
1181  }
1182  /* No more data left to write, receive next copy packet */
1183 
1184  return true;
1185 }
1186 
1187 /*
1188  * Handle end of the copy stream.
1189  */
1190 static PGresult *
1192  XLogRecPtr blockpos, XLogRecPtr *stoppos)
1193 {
1194  PGresult *res = PQgetResult(conn);
1195 
1196  /*
1197  * The server closed its end of the copy stream. If we haven't closed
1198  * ours already, we need to do so now, unless the server threw an error,
1199  * in which case we don't.
1200  */
1201  if (still_sending)
1202  {
1203  if (!close_walfile(stream, blockpos))
1204  {
1205  /* Error message written in close_walfile() */
1206  PQclear(res);
1207  return NULL;
1208  }
1209  if (PQresultStatus(res) == PGRES_COPY_IN)
1210  {
1211  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1212  {
1213  fprintf(stderr,
1214  _("%s: could not send copy-end packet: %s"),
1215  progname, PQerrorMessage(conn));
1216  PQclear(res);
1217  return NULL;
1218  }
1219  res = PQgetResult(conn);
1220  }
1221  still_sending = false;
1222  }
1223  if (copybuf != NULL)
1224  PQfreemem(copybuf);
1225  *stoppos = blockpos;
1226  return res;
1227 }
1228 
1229 /*
1230  * Check if we should continue streaming, or abort at this point.
1231  */
1232 static bool
1234  XLogRecPtr *stoppos)
1235 {
1236  if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1237  {
1238  if (!close_walfile(stream, blockpos))
1239  {
1240  /* Potential error message is written by close_walfile */
1241  return false;
1242  }
1243  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1244  {
1245  fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1246  progname, PQerrorMessage(conn));
1247  return false;
1248  }
1249  still_sending = false;
1250  }
1251 
1252  return true;
1253 }
1254 
1255 /*
1256  * Calculate how long send/receive loops should sleep
1257  */
1258 static long
1260  TimestampTz last_status)
1261 {
1262  TimestampTz status_targettime = 0;
1263  long sleeptime;
1264 
1265  if (standby_message_timeout && still_sending)
1266  status_targettime = last_status +
1267  (standby_message_timeout - 1) * ((int64) 1000);
1268 
1269  if (status_targettime > 0)
1270  {
1271  long secs;
1272  int usecs;
1273 
1275  status_targettime,
1276  &secs,
1277  &usecs);
1278  /* Always sleep at least 1 sec */
1279  if (secs <= 0)
1280  {
1281  secs = 1;
1282  usecs = 0;
1283  }
1284 
1285  sleeptime = secs * 1000 + usecs / 1000;
1286  }
1287  else
1288  sleeptime = -1;
1289 
1290  return sleeptime;
1291 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2221
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
Definition: receivelog.c:941
#define XLogSegSize
Definition: xlog_internal.h:92
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:90
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:5934
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:440
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
void * Walfile
Definition: walmethods.h:13
static void error(void)
Definition: sql-dyntest.c:147
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:5899
static bool still_sending
Definition: receivelog.c:38
int64 TimestampTz
Definition: timestamp.h:39
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:50
#define XLogFileName(fname, tli, logSegNo)
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1233
char * sysidentifier
Definition: receivelog.h:34
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2288
#define select(n, r, w, e, timeout)
Definition: win32.h:374
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr startpos
Definition: receivelog.h:32
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1259
int sleeptime
Definition: pg_standby.c:40
char * partial_suffix
Definition: receivelog.h:46
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:5924
const char * progname
Definition: pg_standby.c:37
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:481
TimeLineID timeline
Definition: receivelog.h:33
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2377
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:999
ssize_t(* get_file_size)(const char *pathname)
Definition: walmethods.h:53
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:715
PGconn * conn
Definition: streamutil.c:42
#define MAXPGPATH
static Walfile * walfile
Definition: receivelog.c:33
static int CopyStreamPoll(PGconn *conn, long timeout_ms)
Definition: receivelog.c:890
int PQflush(PGconn *conn)
Definition: fe-exec.c:3187
char * replication_slot
Definition: receivelog.h:47
bool mark_done
Definition: receivelog.h:39
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:459
#define TLHistoryFileName(fname, tli)
static bool reportFlushPosition
Definition: receivelog.c:35
uint64 XLogSegNo
Definition: xlogdefs.h:34
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:62
unsigned int uint32
Definition: c.h:268
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:181
stream_stop_callback stream_stop
Definition: receivelog.h:43
WalWriteMethod * walmethod
Definition: receivelog.h:45
int(* sync)(Walfile f)
Definition: walmethods.h:67
#define MAXFNAMELEN
static int standby_message_timeout
Definition: pg_basebackup.c:90
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:325
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:36
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1191
#define EINTR
Definition: win32.h:285
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1631
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1057
StringInfo copybuf
Definition: tablesync.c:111
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:59
void PQclear(PGresult *res)
Definition: fe-exec.c:650
static void * fn(void *arg)
#define XLByteToSeg(xlrp, logSegNo)
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:763
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:247
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
Definition: receivelog.c:436
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:41
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:207
const char *(* getlasterror)(void)
Definition: walmethods.h:78
static XLogRecPtr startpos
bool synchronous
Definition: receivelog.h:38
static char * filename
Definition: pg_dumpall.c:87
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2612
int64 fe_recvint64(char *buf)
Definition: streamutil.c:513
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:59
const char * strerror(int errnum)
Definition: strerror.c:19
int standby_message_timeout
Definition: receivelog.h:36
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:494
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:34
bool temp_slot
Definition: receivelog.h:48
#define _(x)
Definition: elog.c:84
void PQfreemem(void *ptr)
Definition: fe-exec.c:3200
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:5952
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1702
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:264
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:363
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47