PostgreSQL Source Code  git master
receivelog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * receivelog.c - receive WAL files using the streaming
4  * replication protocol.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/receivelog.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/select.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 
21 #include "access/xlog_internal.h"
22 #include "common/logging.h"
23 #include "libpq-fe.h"
24 #include "receivelog.h"
25 #include "streamutil.h"
26 
27 /* currently open WAL file */
28 static Walfile *walfile = NULL;
29 static bool reportFlushPosition = false;
31 
32 static bool still_sending = true; /* feedback still needs to be sent? */
33 
35  XLogRecPtr *stoppos);
36 static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
37 static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
38  char **buffer);
39 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
40  int len, XLogRecPtr blockpos, TimestampTz *last_status);
41 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
42  XLogRecPtr *blockpos);
44  XLogRecPtr blockpos, XLogRecPtr *stoppos);
45 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
47  TimestampTz last_status);
48 
50  uint32 *timeline);
51 
52 static bool
53 mark_file_as_archived(StreamCtl *stream, const char *fname)
54 {
55  Walfile *f;
56  static char tmppath[MAXPGPATH];
57 
58  snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
59  fname);
60 
61  f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
62  NULL, 0);
63  if (f == NULL)
64  {
65  pg_log_error("could not create archive status file \"%s\": %s",
66  tmppath, GetLastWalMethodError(stream->walmethod));
67  return false;
68  }
69 
70  if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
71  {
72  pg_log_error("could not close archive status file \"%s\": %s",
73  tmppath, GetLastWalMethodError(stream->walmethod));
74  return false;
75  }
76 
77  return true;
78 }
79 
80 /*
81  * Open a new WAL file in the specified directory.
82  *
83  * Returns true if OK; on failure, returns false after printing an error msg.
84  * On success, 'walfile' is set to the opened WAL file.
85  *
86  * The file will be padded to 16Mb with zeroes.
87  */
88 static bool
89 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
90 {
91  Walfile *f;
92  char *fn;
93  ssize_t size;
94  XLogSegNo segno;
95  char walfile_name[MAXPGPATH];
96 
97  XLByteToSeg(startpoint, segno, WalSegSz);
98  XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
99 
100  /* Note that this considers the compression used if necessary */
101  fn = stream->walmethod->ops->get_file_name(stream->walmethod,
102  walfile_name,
103  stream->partial_suffix);
104 
105  /*
106  * When streaming to files, if an existing file exists we verify that it's
107  * either empty (just created), or a complete WalSegSz segment (in which
108  * case it has been created and padded). Anything else indicates a corrupt
109  * file. Compressed files have no need for padding, so just ignore this
110  * case.
111  *
112  * When streaming to tar, no file with this name will exist before, so we
113  * never have to verify a size.
114  */
116  stream->walmethod->ops->existsfile(stream->walmethod, fn))
117  {
118  size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
119  if (size < 0)
120  {
121  pg_log_error("could not get size of write-ahead log file \"%s\": %s",
123  pg_free(fn);
124  return false;
125  }
126  if (size == WalSegSz)
127  {
128  /* Already padded file. Open it for use */
129  f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
130  if (f == NULL)
131  {
132  pg_log_error("could not open existing write-ahead log file \"%s\": %s",
134  pg_free(fn);
135  return false;
136  }
137 
138  /* fsync file in case of a previous crash */
139  if (stream->walmethod->ops->sync(f) != 0)
140  {
141  pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
143  stream->walmethod->ops->close(f, CLOSE_UNLINK);
144  exit(1);
145  }
146 
147  walfile = f;
148  pg_free(fn);
149  return true;
150  }
151  if (size != 0)
152  {
153  /* if write didn't set errno, assume problem is no disk space */
154  if (errno == 0)
155  errno = ENOSPC;
156  pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
157  "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
158  size),
159  fn, size, WalSegSz);
160  pg_free(fn);
161  return false;
162  }
163  /* File existed and was empty, so fall through and open */
164  }
165 
166  /* No file existed, so create one */
167 
168  f = stream->walmethod->ops->open_for_write(stream->walmethod,
169  walfile_name,
170  stream->partial_suffix,
171  WalSegSz);
172  if (f == NULL)
173  {
174  pg_log_error("could not open write-ahead log file \"%s\": %s",
176  pg_free(fn);
177  return false;
178  }
179 
180  pg_free(fn);
181  walfile = f;
182  return true;
183 }
184 
185 /*
186  * Close the current WAL file (if open), and rename it to the correct
187  * filename if it's complete. On failure, prints an error message to stderr
188  * and returns false, otherwise returns true.
189  */
190 static bool
192 {
193  char *fn;
194  off_t currpos;
195  int r;
196  char walfile_name[MAXPGPATH];
197 
198  if (walfile == NULL)
199  return true;
200 
201  strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
202  currpos = walfile->currpos;
203 
204  /* Note that this considers the compression used if necessary */
205  fn = stream->walmethod->ops->get_file_name(stream->walmethod,
206  walfile_name,
207  stream->partial_suffix);
208 
209  if (stream->partial_suffix)
210  {
211  if (currpos == WalSegSz)
212  r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
213  else
214  {
215  pg_log_info("not renaming \"%s\", segment is not complete", fn);
216  r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
217  }
218  }
219  else
220  r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
221 
222  walfile = NULL;
223 
224  if (r != 0)
225  {
226  pg_log_error("could not close file \"%s\": %s",
228 
229  pg_free(fn);
230  return false;
231  }
232 
233  pg_free(fn);
234 
235  /*
236  * Mark file as archived if requested by the caller - pg_basebackup needs
237  * to do so as files can otherwise get archived again after promotion of a
238  * new node. This is in line with walreceiver.c always doing a
239  * XLogArchiveForceDone() after a complete segment.
240  */
241  if (currpos == WalSegSz && stream->mark_done)
242  {
243  /* writes error message if failed */
244  if (!mark_file_as_archived(stream, walfile_name))
245  return false;
246  }
247 
248  lastFlushPosition = pos;
249  return true;
250 }
251 
252 
253 /*
254  * Check if a timeline history file exists.
255  */
256 static bool
258 {
259  char histfname[MAXFNAMELEN];
260 
261  /*
262  * Timeline 1 never has a history file. We treat that as if it existed,
263  * since we never need to stream it.
264  */
265  if (stream->timeline == 1)
266  return true;
267 
268  TLHistoryFileName(histfname, stream->timeline);
269 
270  return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
271 }
272 
273 static bool
274 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
275 {
276  int size = strlen(content);
277  char histfname[MAXFNAMELEN];
278  Walfile *f;
279 
280  /*
281  * Check that the server's idea of how timeline history files should be
282  * named matches ours.
283  */
284  TLHistoryFileName(histfname, stream->timeline);
285  if (strcmp(histfname, filename) != 0)
286  {
287  pg_log_error("server reported unexpected history file name for timeline %u: %s",
288  stream->timeline, filename);
289  return false;
290  }
291 
292  f = stream->walmethod->ops->open_for_write(stream->walmethod,
293  histfname, ".tmp", 0);
294  if (f == NULL)
295  {
296  pg_log_error("could not create timeline history file \"%s\": %s",
297  histfname, GetLastWalMethodError(stream->walmethod));
298  return false;
299  }
300 
301  if ((int) stream->walmethod->ops->write(f, content, size) != size)
302  {
303  pg_log_error("could not write timeline history file \"%s\": %s",
304  histfname, GetLastWalMethodError(stream->walmethod));
305 
306  /*
307  * If we fail to make the file, delete it to release disk space
308  */
309  stream->walmethod->ops->close(f, CLOSE_UNLINK);
310 
311  return false;
312  }
313 
314  if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
315  {
316  pg_log_error("could not close file \"%s\": %s",
317  histfname, GetLastWalMethodError(stream->walmethod));
318  return false;
319  }
320 
321  /* Maintain archive_status, check close_walfile() for details. */
322  if (stream->mark_done)
323  {
324  /* writes error message if failed */
325  if (!mark_file_as_archived(stream, histfname))
326  return false;
327  }
328 
329  return true;
330 }
331 
332 /*
333  * Send a Standby Status Update message to server.
334  */
335 static bool
336 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
337 {
338  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
339  int len = 0;
340 
341  replybuf[len] = 'r';
342  len += 1;
343  fe_sendint64(blockpos, &replybuf[len]); /* write */
344  len += 8;
346  fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
347  else
348  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
349  len += 8;
350  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
351  len += 8;
352  fe_sendint64(now, &replybuf[len]); /* sendTime */
353  len += 8;
354  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
355  len += 1;
356 
357  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
358  {
359  pg_log_error("could not send feedback packet: %s",
361  return false;
362  }
363 
364  return true;
365 }
366 
367 /*
368  * Check that the server version we're connected to is supported by
369  * ReceiveXlogStream().
370  *
371  * If it's not, an error message is printed to stderr, and false is returned.
372  */
373 bool
375 {
376  int minServerMajor,
377  maxServerMajor;
378  int serverMajor;
379 
380  /*
381  * The message format used in streaming replication changed in 9.3, so we
382  * cannot stream from older servers. And we don't support servers newer
383  * than the client; it might work, but we don't know, so err on the safe
384  * side.
385  */
386  minServerMajor = 903;
387  maxServerMajor = PG_VERSION_NUM / 100;
388  serverMajor = PQserverVersion(conn) / 100;
389  if (serverMajor < minServerMajor)
390  {
391  const char *serverver = PQparameterStatus(conn, "server_version");
392 
393  pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
394  serverver ? serverver : "'unknown'",
395  "9.3");
396  return false;
397  }
398  else if (serverMajor > maxServerMajor)
399  {
400  const char *serverver = PQparameterStatus(conn, "server_version");
401 
402  pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
403  serverver ? serverver : "'unknown'",
404  PG_VERSION);
405  return false;
406  }
407  return true;
408 }
409 
410 /*
411  * Receive a log stream starting at the specified position.
412  *
413  * Individual parameters are passed through the StreamCtl structure.
414  *
415  * If sysidentifier is specified, validate that both the system
416  * identifier and the timeline matches the specified ones
417  * (by sending an extra IDENTIFY_SYSTEM command)
418  *
419  * All received segments will be written to the directory
420  * specified by basedir. This will also fetch any missing timeline history
421  * files.
422  *
423  * The stream_stop callback will be called every time data
424  * is received, and whenever a segment is completed. If it returns
425  * true, the streaming will stop and the function
426  * return. As long as it returns false, streaming will continue
427  * indefinitely.
428  *
429  * If stream_stop() checks for external input, stop_socket should be set to
430  * the FD it checks. This will allow such input to be detected promptly
431  * rather than after standby_message_timeout (which might be indefinite).
432  * Note that signals will interrupt waits for input as well, but that is
433  * race-y since a signal received while busy won't interrupt the wait.
434  *
435  * standby_message_timeout controls how often we send a message
436  * back to the primary letting it know our progress, in milliseconds.
437  * Zero means no messages are sent.
438  * This message will only contain the write location, and never
439  * flush or replay.
440  *
441  * If 'partial_suffix' is not NULL, files are initially created with the
442  * given suffix, and the suffix is removed once the file is finished. That
443  * allows you to tell the difference between partial and completed files,
444  * so that you can continue later where you left.
445  *
446  * If 'synchronous' is true, the received WAL is flushed as soon as written,
447  * otherwise only when the WAL file is closed.
448  *
449  * Note: The WAL location *must* be at a log segment start!
450  */
451 bool
453 {
454  char query[128];
455  char slotcmd[128];
456  PGresult *res;
457  XLogRecPtr stoppos;
458 
459  /*
460  * The caller should've checked the server version already, but doesn't do
461  * any harm to check it here too.
462  */
464  return false;
465 
466  /*
467  * Decide whether we want to report the flush position. If we report the
468  * flush position, the primary will know what WAL we'll possibly
469  * re-request, and it can then remove older WAL safely. We must always do
470  * that when we are using slots.
471  *
472  * Reporting the flush position makes one eligible as a synchronous
473  * replica. People shouldn't include generic names in
474  * synchronous_standby_names, but we've protected them against it so far,
475  * so let's continue to do so unless specifically requested.
476  */
477  if (stream->replication_slot != NULL)
478  {
479  reportFlushPosition = true;
480  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
481  }
482  else
483  {
484  if (stream->synchronous)
485  reportFlushPosition = true;
486  else
487  reportFlushPosition = false;
488  slotcmd[0] = 0;
489  }
490 
491  if (stream->sysidentifier != NULL)
492  {
493  char *sysidentifier = NULL;
494  TimeLineID servertli;
495 
496  /*
497  * Get the server system identifier and timeline, and validate them.
498  */
499  if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
500  {
501  pg_free(sysidentifier);
502  return false;
503  }
504 
505  if (strcmp(stream->sysidentifier, sysidentifier) != 0)
506  {
507  pg_log_error("system identifier does not match between base backup and streaming connection");
508  pg_free(sysidentifier);
509  return false;
510  }
511  pg_free(sysidentifier);
512 
513  if (stream->timeline > servertli)
514  {
515  pg_log_error("starting timeline %u is not present in the server",
516  stream->timeline);
517  return false;
518  }
519  }
520 
521  /*
522  * initialize flush position to starting point, it's the caller's
523  * responsibility that that's sane.
524  */
525  lastFlushPosition = stream->startpos;
526 
527  while (1)
528  {
529  /*
530  * Fetch the timeline history file for this timeline, if we don't have
531  * it already. When streaming log to tar, this will always return
532  * false, as we are never streaming into an existing file and
533  * therefore there can be no pre-existing timeline history file.
534  */
535  if (!existsTimeLineHistoryFile(stream))
536  {
537  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
538  res = PQexec(conn, query);
540  {
541  /* FIXME: we might send it ok, but get an error */
542  pg_log_error("could not send replication command \"%s\": %s",
543  "TIMELINE_HISTORY", PQresultErrorMessage(res));
544  PQclear(res);
545  return false;
546  }
547 
548  /*
549  * The response to TIMELINE_HISTORY is a single row result set
550  * with two fields: filename and content
551  */
552  if (PQnfields(res) != 2 || PQntuples(res) != 1)
553  {
554  pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
555  PQntuples(res), PQnfields(res), 1, 2);
556  }
557 
558  /* Write the history file to disk */
560  PQgetvalue(res, 0, 0),
561  PQgetvalue(res, 0, 1));
562 
563  PQclear(res);
564  }
565 
566  /*
567  * Before we start streaming from the requested location, check if the
568  * callback tells us to stop here.
569  */
570  if (stream->stream_stop(stream->startpos, stream->timeline, false))
571  return true;
572 
573  /* Initiate the replication stream at specified location */
574  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
575  slotcmd,
576  LSN_FORMAT_ARGS(stream->startpos),
577  stream->timeline);
578  res = PQexec(conn, query);
580  {
581  pg_log_error("could not send replication command \"%s\": %s",
582  "START_REPLICATION", PQresultErrorMessage(res));
583  PQclear(res);
584  return false;
585  }
586  PQclear(res);
587 
588  /* Stream the WAL */
589  res = HandleCopyStream(conn, stream, &stoppos);
590  if (res == NULL)
591  goto error;
592 
593  /*
594  * Streaming finished.
595  *
596  * There are two possible reasons for that: a controlled shutdown, or
597  * we reached the end of the current timeline. In case of
598  * end-of-timeline, the server sends a result set after Copy has
599  * finished, containing information about the next timeline. Read
600  * that, and restart streaming from the next timeline. In case of
601  * controlled shutdown, stop here.
602  */
604  {
605  /*
606  * End-of-timeline. Read the next timeline's ID and starting
607  * position. Usually, the starting position will match the end of
608  * the previous timeline, but there are corner cases like if the
609  * server had sent us half of a WAL record, when it was promoted.
610  * The new timeline will begin at the end of the last complete
611  * record in that case, overlapping the partial WAL record on the
612  * old timeline.
613  */
614  uint32 newtimeline;
615  bool parsed;
616 
617  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
618  PQclear(res);
619  if (!parsed)
620  goto error;
621 
622  /* Sanity check the values the server gave us */
623  if (newtimeline <= stream->timeline)
624  {
625  pg_log_error("server reported unexpected next timeline %u, following timeline %u",
626  newtimeline, stream->timeline);
627  goto error;
628  }
629  if (stream->startpos > stoppos)
630  {
631  pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
632  stream->timeline, LSN_FORMAT_ARGS(stoppos),
633  newtimeline, LSN_FORMAT_ARGS(stream->startpos));
634  goto error;
635  }
636 
637  /* Read the final result, which should be CommandComplete. */
638  res = PQgetResult(conn);
640  {
641  pg_log_error("unexpected termination of replication stream: %s",
643  PQclear(res);
644  goto error;
645  }
646  PQclear(res);
647 
648  /*
649  * Loop back to start streaming from the new timeline. Always
650  * start streaming at the beginning of a segment.
651  */
652  stream->timeline = newtimeline;
653  stream->startpos = stream->startpos -
655  continue;
656  }
657  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
658  {
659  PQclear(res);
660 
661  /*
662  * End of replication (ie. controlled shut down of the server).
663  *
664  * Check if the callback thinks it's OK to stop here. If not,
665  * complain.
666  */
667  if (stream->stream_stop(stoppos, stream->timeline, false))
668  return true;
669  else
670  {
671  pg_log_error("replication stream was terminated before stop point");
672  goto error;
673  }
674  }
675  else
676  {
677  /* Server returned an error. */
678  pg_log_error("unexpected termination of replication stream: %s",
680  PQclear(res);
681  goto error;
682  }
683  }
684 
685 error:
686  if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
687  pg_log_error("could not close file \"%s\": %s",
689  walfile = NULL;
690  return false;
691 }
692 
693 /*
694  * Helper function to parse the result set returned by server after streaming
695  * has finished. On failure, prints an error to stderr and returns false.
696  */
697 static bool
699 {
700  uint32 startpos_xlogid,
701  startpos_xrecoff;
702 
703  /*----------
704  * The result set consists of one row and two columns, e.g:
705  *
706  * next_tli | next_tli_startpos
707  * ----------+-------------------
708  * 4 | 0/9949AE0
709  *
710  * next_tli is the timeline ID of the next timeline after the one that
711  * just finished streaming. next_tli_startpos is the WAL location where
712  * the server switched to it.
713  *----------
714  */
715  if (PQnfields(res) < 2 || PQntuples(res) != 1)
716  {
717  pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
718  PQntuples(res), PQnfields(res), 1, 2);
719  return false;
720  }
721 
722  *timeline = atoi(PQgetvalue(res, 0, 0));
723  if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
724  &startpos_xrecoff) != 2)
725  {
726  pg_log_error("could not parse next timeline's starting point \"%s\"",
727  PQgetvalue(res, 0, 1));
728  return false;
729  }
730  *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
731 
732  return true;
733 }
734 
735 /*
736  * The main loop of ReceiveXlogStream. Handles the COPY stream after
737  * initiating streaming with the START_REPLICATION command.
738  *
739  * If the COPY ends (not necessarily successfully) due a message from the
740  * server, returns a PGresult and sets *stoppos to the last byte written.
741  * On any other sort of error, returns NULL.
742  */
743 static PGresult *
745  XLogRecPtr *stoppos)
746 {
747  char *copybuf = NULL;
748  TimestampTz last_status = -1;
749  XLogRecPtr blockpos = stream->startpos;
750 
751  still_sending = true;
752 
753  while (1)
754  {
755  int r;
757  long sleeptime;
758 
759  /*
760  * Check if we should continue streaming, or abort at this point.
761  */
762  if (!CheckCopyStreamStop(conn, stream, blockpos))
763  goto error;
764 
766 
767  /*
768  * If synchronous option is true, issue sync command as soon as there
769  * are WAL data which has not been flushed yet.
770  */
771  if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
772  {
773  if (stream->walmethod->ops->sync(walfile) != 0)
774  pg_fatal("could not fsync file \"%s\": %s",
776  lastFlushPosition = blockpos;
777 
778  /*
779  * Send feedback so that the server sees the latest WAL locations
780  * immediately.
781  */
782  if (!sendFeedback(conn, blockpos, now, false))
783  goto error;
784  last_status = now;
785  }
786 
787  /*
788  * Potentially send a status message to the primary
789  */
790  if (still_sending && stream->standby_message_timeout > 0 &&
791  feTimestampDifferenceExceeds(last_status, now,
792  stream->standby_message_timeout))
793  {
794  /* Time to send feedback! */
795  if (!sendFeedback(conn, blockpos, now, false))
796  goto error;
797  last_status = now;
798  }
799 
800  /*
801  * Calculate how long send/receive loops should sleep
802  */
804  last_status);
805 
806  r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
807  while (r != 0)
808  {
809  if (r == -1)
810  goto error;
811  if (r == -2)
812  {
813  PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
814 
815  if (res == NULL)
816  goto error;
817  else
818  return res;
819  }
820 
821  /* Check the message type. */
822  if (copybuf[0] == 'k')
823  {
824  if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
825  &last_status))
826  goto error;
827  }
828  else if (copybuf[0] == 'w')
829  {
830  if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
831  goto error;
832 
833  /*
834  * Check if we should continue streaming, or abort at this
835  * point.
836  */
837  if (!CheckCopyStreamStop(conn, stream, blockpos))
838  goto error;
839  }
840  else
841  {
842  pg_log_error("unrecognized streaming header: \"%c\"",
843  copybuf[0]);
844  goto error;
845  }
846 
847  /*
848  * Process the received data, and any subsequent data we can read
849  * without blocking.
850  */
851  r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
852  }
853  }
854 
855 error:
857  return NULL;
858 }
859 
860 /*
861  * Wait until we can read a CopyData message,
862  * or timeout, or occurrence of a signal or input on the stop_socket.
863  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
864  *
865  * Returns 1 if data has become available for reading, 0 if timed out
866  * or interrupted by signal or stop_socket input, and -1 on an error.
867  */
868 static int
869 CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
870 {
871  int ret;
872  fd_set input_mask;
873  int connsocket;
874  int maxfd;
875  struct timeval timeout;
876  struct timeval *timeoutptr;
877 
878  connsocket = PQsocket(conn);
879  if (connsocket < 0)
880  {
881  pg_log_error("invalid socket: %s", PQerrorMessage(conn));
882  return -1;
883  }
884 
885  FD_ZERO(&input_mask);
886  FD_SET(connsocket, &input_mask);
887  maxfd = connsocket;
888  if (stop_socket != PGINVALID_SOCKET)
889  {
890  FD_SET(stop_socket, &input_mask);
891  maxfd = Max(maxfd, stop_socket);
892  }
893 
894  if (timeout_ms < 0)
895  timeoutptr = NULL;
896  else
897  {
898  timeout.tv_sec = timeout_ms / 1000L;
899  timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
900  timeoutptr = &timeout;
901  }
902 
903  ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
904 
905  if (ret < 0)
906  {
907  if (errno == EINTR)
908  return 0; /* Got a signal, so not an error */
909  pg_log_error("%s() failed: %m", "select");
910  return -1;
911  }
912  if (ret > 0 && FD_ISSET(connsocket, &input_mask))
913  return 1; /* Got input on connection socket */
914 
915  return 0; /* Got timeout or input on stop_socket */
916 }
917 
918 /*
919  * Receive CopyData message available from XLOG stream, blocking for
920  * maximum of 'timeout' ms.
921  *
922  * If data was received, returns the length of the data. *buffer is set to
923  * point to a buffer holding the received message. The buffer is only valid
924  * until the next CopyStreamReceive call.
925  *
926  * Returns 0 if no data was available within timeout, or if wait was
927  * interrupted by signal or stop_socket input.
928  * -1 on error. -2 if the server ended the COPY.
929  */
930 static int
931 CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
932  char **buffer)
933 {
934  char *copybuf = NULL;
935  int rawlen;
936 
937  PQfreemem(*buffer);
938  *buffer = NULL;
939 
940  /* Try to receive a CopyData message */
941  rawlen = PQgetCopyData(conn, &copybuf, 1);
942  if (rawlen == 0)
943  {
944  int ret;
945 
946  /*
947  * No data available. Wait for some to appear, but not longer than
948  * the specified timeout, so that we can ping the server. Also stop
949  * waiting if input appears on stop_socket.
950  */
951  ret = CopyStreamPoll(conn, timeout, stop_socket);
952  if (ret <= 0)
953  return ret;
954 
955  /* Now there is actually data on the socket */
956  if (PQconsumeInput(conn) == 0)
957  {
958  pg_log_error("could not receive data from WAL stream: %s",
960  return -1;
961  }
962 
963  /* Now that we've consumed some input, try again */
964  rawlen = PQgetCopyData(conn, &copybuf, 1);
965  if (rawlen == 0)
966  return 0;
967  }
968  if (rawlen == -1) /* end-of-streaming or error */
969  return -2;
970  if (rawlen == -2)
971  {
972  pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
973  return -1;
974  }
975 
976  /* Return received messages to caller */
977  *buffer = copybuf;
978  return rawlen;
979 }
980 
981 /*
982  * Process the keepalive message.
983  */
984 static bool
986  XLogRecPtr blockpos, TimestampTz *last_status)
987 {
988  int pos;
989  bool replyRequested;
991 
992  /*
993  * Parse the keepalive message, enclosed in the CopyData message. We just
994  * check if the server requested a reply, and ignore the rest.
995  */
996  pos = 1; /* skip msgtype 'k' */
997  pos += 8; /* skip walEnd */
998  pos += 8; /* skip sendTime */
999 
1000  if (len < pos + 1)
1001  {
1002  pg_log_error("streaming header too small: %d", len);
1003  return false;
1004  }
1005  replyRequested = copybuf[pos];
1006 
1007  /* If the server requested an immediate reply, send one. */
1008  if (replyRequested && still_sending)
1009  {
1010  if (reportFlushPosition && lastFlushPosition < blockpos &&
1011  walfile != NULL)
1012  {
1013  /*
1014  * If a valid flush location needs to be reported, flush the
1015  * current WAL file so that the latest flush location is sent back
1016  * to the server. This is necessary to see whether the last WAL
1017  * data has been successfully replicated or not, at the normal
1018  * shutdown of the server.
1019  */
1020  if (stream->walmethod->ops->sync(walfile) != 0)
1021  pg_fatal("could not fsync file \"%s\": %s",
1023  lastFlushPosition = blockpos;
1024  }
1025 
1027  if (!sendFeedback(conn, blockpos, now, false))
1028  return false;
1029  *last_status = now;
1030  }
1031 
1032  return true;
1033 }
1034 
1035 /*
1036  * Process XLogData message.
1037  */
1038 static bool
1040  XLogRecPtr *blockpos)
1041 {
1042  int xlogoff;
1043  int bytes_left;
1044  int bytes_written;
1045  int hdr_len;
1046 
1047  /*
1048  * Once we've decided we don't want to receive any more, just ignore any
1049  * subsequent XLogData messages.
1050  */
1051  if (!(still_sending))
1052  return true;
1053 
1054  /*
1055  * Read the header of the XLogData message, enclosed in the CopyData
1056  * message. We only need the WAL location field (dataStart), the rest of
1057  * the header is ignored.
1058  */
1059  hdr_len = 1; /* msgtype 'w' */
1060  hdr_len += 8; /* dataStart */
1061  hdr_len += 8; /* walEnd */
1062  hdr_len += 8; /* sendTime */
1063  if (len < hdr_len)
1064  {
1065  pg_log_error("streaming header too small: %d", len);
1066  return false;
1067  }
1068  *blockpos = fe_recvint64(&copybuf[1]);
1069 
1070  /* Extract WAL location for this block */
1071  xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1072 
1073  /*
1074  * Verify that the initial location in the stream matches where we think
1075  * we are.
1076  */
1077  if (walfile == NULL)
1078  {
1079  /* No file open yet */
1080  if (xlogoff != 0)
1081  {
1082  pg_log_error("received write-ahead log record for offset %u with no file open",
1083  xlogoff);
1084  return false;
1085  }
1086  }
1087  else
1088  {
1089  /* More data in existing segment */
1090  if (walfile->currpos != xlogoff)
1091  {
1092  pg_log_error("got WAL data offset %08x, expected %08x",
1093  xlogoff, (int) walfile->currpos);
1094  return false;
1095  }
1096  }
1097 
1098  bytes_left = len - hdr_len;
1099  bytes_written = 0;
1100 
1101  while (bytes_left)
1102  {
1103  int bytes_to_write;
1104 
1105  /*
1106  * If crossing a WAL boundary, only write up until we reach wal
1107  * segment size.
1108  */
1109  if (xlogoff + bytes_left > WalSegSz)
1110  bytes_to_write = WalSegSz - xlogoff;
1111  else
1112  bytes_to_write = bytes_left;
1113 
1114  if (walfile == NULL)
1115  {
1116  if (!open_walfile(stream, *blockpos))
1117  {
1118  /* Error logged by open_walfile */
1119  return false;
1120  }
1121  }
1122 
1123  if (stream->walmethod->ops->write(walfile,
1124  copybuf + hdr_len + bytes_written,
1125  bytes_to_write) != bytes_to_write)
1126  {
1127  pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1128  bytes_to_write, walfile->pathname,
1129  GetLastWalMethodError(stream->walmethod));
1130  return false;
1131  }
1132 
1133  /* Write was successful, advance our position */
1134  bytes_written += bytes_to_write;
1135  bytes_left -= bytes_to_write;
1136  *blockpos += bytes_to_write;
1137  xlogoff += bytes_to_write;
1138 
1139  /* Did we reach the end of a WAL segment? */
1140  if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1141  {
1142  if (!close_walfile(stream, *blockpos))
1143  /* Error message written in close_walfile() */
1144  return false;
1145 
1146  xlogoff = 0;
1147 
1148  if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1149  {
1150  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1151  {
1152  pg_log_error("could not send copy-end packet: %s",
1153  PQerrorMessage(conn));
1154  return false;
1155  }
1156  still_sending = false;
1157  return true; /* ignore the rest of this XLogData packet */
1158  }
1159  }
1160  }
1161  /* No more data left to write, receive next copy packet */
1162 
1163  return true;
1164 }
1165 
1166 /*
1167  * Handle end of the copy stream.
1168  */
1169 static PGresult *
1171  XLogRecPtr blockpos, XLogRecPtr *stoppos)
1172 {
1174 
1175  /*
1176  * The server closed its end of the copy stream. If we haven't closed
1177  * ours already, we need to do so now, unless the server threw an error,
1178  * in which case we don't.
1179  */
1180  if (still_sending)
1181  {
1182  if (!close_walfile(stream, blockpos))
1183  {
1184  /* Error message written in close_walfile() */
1185  PQclear(res);
1186  return NULL;
1187  }
1189  {
1190  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1191  {
1192  pg_log_error("could not send copy-end packet: %s",
1193  PQerrorMessage(conn));
1194  PQclear(res);
1195  return NULL;
1196  }
1197  res = PQgetResult(conn);
1198  }
1199  still_sending = false;
1200  }
1201  PQfreemem(copybuf);
1202  *stoppos = blockpos;
1203  return res;
1204 }
1205 
1206 /*
1207  * Check if we should continue streaming, or abort at this point.
1208  */
1209 static bool
1211 {
1212  if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1213  {
1214  if (!close_walfile(stream, blockpos))
1215  {
1216  /* Potential error message is written by close_walfile */
1217  return false;
1218  }
1219  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1220  {
1221  pg_log_error("could not send copy-end packet: %s",
1222  PQerrorMessage(conn));
1223  return false;
1224  }
1225  still_sending = false;
1226  }
1227 
1228  return true;
1229 }
1230 
1231 /*
1232  * Calculate how long send/receive loops should sleep
1233  */
1234 static long
1236  TimestampTz last_status)
1237 {
1238  TimestampTz status_targettime = 0;
1239  long sleeptime;
1240 
1242  status_targettime = last_status +
1243  (standby_message_timeout - 1) * ((int64) 1000);
1244 
1245  if (status_targettime > 0)
1246  {
1247  long secs;
1248  int usecs;
1249 
1251  status_targettime,
1252  &secs,
1253  &usecs);
1254  /* Always sleep at least 1 sec */
1255  if (secs <= 0)
1256  {
1257  secs = 1;
1258  usecs = 0;
1259  }
1260 
1261  sleeptime = secs * 1000 + usecs / 1000;
1262  }
1263  else
1264  sleeptime = -1;
1265 
1266  return sleeptime;
1267 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
unsigned int uint32
Definition: c.h:518
#define ngettext(s, p, n)
Definition: c.h:1184
#define Max(x, y)
Definition: c.h:1001
@ PG_COMPRESSION_NONE
Definition: compression.h:23
int64 TimestampTz
Definition: timestamp.h:39
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:7153
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7188
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7198
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7224
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2749
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2695
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2816
void pg_free(void *ptr)
Definition: fe_memutils.c:105
@ PGRES_COPY_IN
Definition: libpq-fe.h:127
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:132
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:123
exit(1)
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_info(...)
Definition: logging.h:124
#define pg_fatal(...)
static int standby_message_timeout
#define MAXPGPATH
const void size_t len
static char * filename
Definition: pg_dumpall.c:119
static XLogRecPtr startpos
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:240
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:238
#define PGINVALID_SOCKET
Definition: port.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:744
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:89
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:53
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
Definition: receivelog.c:931
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
Definition: receivelog.c:1210
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:985
static bool reportFlushPosition
Definition: receivelog.c:29
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1039
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:191
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:257
static bool still_sending
Definition: receivelog.c:32
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:274
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
Definition: receivelog.c:452
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1170
static Walfile * walfile
Definition: receivelog.c:28
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1235
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:374
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:336
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:30
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
Definition: receivelog.c:869
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:698
static pg_noinline void Size size
Definition: slab.c:607
static void error(void)
Definition: sql-dyntest.c:147
int WalSegSz
Definition: streamutil.c:32
int64 fe_recvint64(char *buf)
Definition: streamutil.c:932
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:867
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:886
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:921
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:908
PGconn * conn
Definition: streamutil.c:53
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:478
char * sysidentifier
Definition: receivelog.h:33
TimeLineID timeline
Definition: receivelog.h:32
stream_stop_callback stream_stop
Definition: receivelog.h:41
char * replication_slot
Definition: receivelog.h:48
XLogRecPtr startpos
Definition: receivelog.h:31
pgsocket stop_socket
Definition: receivelog.h:43
int standby_message_timeout
Definition: receivelog.h:35
WalWriteMethod * walmethod
Definition: receivelog.h:46
bool mark_done
Definition: receivelog.h:37
char * partial_suffix
Definition: receivelog.h:47
bool synchronous
Definition: receivelog.h:36
bool(* existsfile)(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.h:58
ssize_t(* write)(Walfile *f, const void *buf, size_t count)
Definition: walmethods.h:73
char *(* get_file_name)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Definition: walmethods.h:67
ssize_t(* get_file_size)(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.h:61
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:49
int(* close)(Walfile *f, WalCloseMethod method)
Definition: walmethods.h:55
int(* sync)(Walfile *f)
Definition: walmethods.h:78
const WalWriteMethodOps * ops
Definition: walmethods.h:105
pg_compress_algorithm compression_algorithm
Definition: walmethods.h:106
char * pathname
Definition: walmethods.h:21
off_t currpos
Definition: walmethods.h:20
static StringInfo copybuf
Definition: tablesync.c:137
static void * fn(void *arg)
Definition: thread-alloc.c:119
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
Definition: walmethods.c:1383
@ CLOSE_UNLINK
Definition: walmethods.h:34
@ CLOSE_NO_RENAME
Definition: walmethods.h:35
@ CLOSE_NORMAL
Definition: walmethods.h:33
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:513
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define MAXFNAMELEN
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48