PostgreSQL Source Code  git master
receivelog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * receivelog.c - receive WAL files using the streaming
4  * replication protocol.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/receivelog.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/select.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 
21 #include "access/xlog_internal.h"
22 #include "common/file_utils.h"
23 #include "common/logging.h"
24 #include "libpq-fe.h"
25 #include "receivelog.h"
26 #include "streamutil.h"
27 
28 /* currently open WAL file */
29 static Walfile *walfile = NULL;
30 static bool reportFlushPosition = false;
32 
33 static bool still_sending = true; /* feedback still needs to be sent? */
34 
36  XLogRecPtr *stoppos);
37 static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
38 static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
39  char **buffer);
40 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
41  int len, XLogRecPtr blockpos, TimestampTz *last_status);
42 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
43  XLogRecPtr *blockpos);
45  XLogRecPtr blockpos, XLogRecPtr *stoppos);
46 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
48  TimestampTz last_status);
49 
51  uint32 *timeline);
52 
53 static bool
54 mark_file_as_archived(StreamCtl *stream, const char *fname)
55 {
56  Walfile *f;
57  static char tmppath[MAXPGPATH];
58 
59  snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
60  fname);
61 
62  f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
63  NULL, 0);
64  if (f == NULL)
65  {
66  pg_log_error("could not create archive status file \"%s\": %s",
67  tmppath, GetLastWalMethodError(stream->walmethod));
68  return false;
69  }
70 
71  if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
72  {
73  pg_log_error("could not close archive status file \"%s\": %s",
74  tmppath, GetLastWalMethodError(stream->walmethod));
75  return false;
76  }
77 
78  return true;
79 }
80 
81 /*
82  * Open a new WAL file in the specified directory.
83  *
84  * Returns true if OK; on failure, returns false after printing an error msg.
85  * On success, 'walfile' is set to the opened WAL file.
86  *
87  * The file will be padded to 16Mb with zeroes.
88  */
89 static bool
90 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
91 {
92  Walfile *f;
93  char *fn;
94  ssize_t size;
95  XLogSegNo segno;
96  char walfile_name[MAXPGPATH];
97 
98  XLByteToSeg(startpoint, segno, WalSegSz);
99  XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
100 
101  /* Note that this considers the compression used if necessary */
102  fn = stream->walmethod->ops->get_file_name(stream->walmethod,
103  walfile_name,
104  stream->partial_suffix);
105 
106  /*
107  * When streaming to files, if an existing file exists we verify that it's
108  * either empty (just created), or a complete WalSegSz segment (in which
109  * case it has been created and padded). Anything else indicates a corrupt
110  * file. Compressed files have no need for padding, so just ignore this
111  * case.
112  *
113  * When streaming to tar, no file with this name will exist before, so we
114  * never have to verify a size.
115  */
117  stream->walmethod->ops->existsfile(stream->walmethod, fn))
118  {
119  size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
120  if (size < 0)
121  {
122  pg_log_error("could not get size of write-ahead log file \"%s\": %s",
124  pg_free(fn);
125  return false;
126  }
127  if (size == WalSegSz)
128  {
129  /* Already padded file. Open it for use */
130  f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
131  if (f == NULL)
132  {
133  pg_log_error("could not open existing write-ahead log file \"%s\": %s",
135  pg_free(fn);
136  return false;
137  }
138 
139  /* fsync file in case of a previous crash */
140  if (stream->walmethod->ops->sync(f) != 0)
141  {
142  pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
144  stream->walmethod->ops->close(f, CLOSE_UNLINK);
145  exit(1);
146  }
147 
148  walfile = f;
149  pg_free(fn);
150  return true;
151  }
152  if (size != 0)
153  {
154  /* if write didn't set errno, assume problem is no disk space */
155  if (errno == 0)
156  errno = ENOSPC;
157  pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
158  "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
159  size),
160  fn, size, WalSegSz);
161  pg_free(fn);
162  return false;
163  }
164  /* File existed and was empty, so fall through and open */
165  }
166 
167  /* No file existed, so create one */
168 
169  f = stream->walmethod->ops->open_for_write(stream->walmethod,
170  walfile_name,
171  stream->partial_suffix,
172  WalSegSz);
173  if (f == NULL)
174  {
175  pg_log_error("could not open write-ahead log file \"%s\": %s",
177  pg_free(fn);
178  return false;
179  }
180 
181  pg_free(fn);
182  walfile = f;
183  return true;
184 }
185 
186 /*
187  * Close the current WAL file (if open), and rename it to the correct
188  * filename if it's complete. On failure, prints an error message to stderr
189  * and returns false, otherwise returns true.
190  */
191 static bool
193 {
194  char *fn;
195  off_t currpos;
196  int r;
197  char walfile_name[MAXPGPATH];
198 
199  if (walfile == NULL)
200  return true;
201 
202  strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
203  currpos = walfile->currpos;
204 
205  /* Note that this considers the compression used if necessary */
206  fn = stream->walmethod->ops->get_file_name(stream->walmethod,
207  walfile_name,
208  stream->partial_suffix);
209 
210  if (stream->partial_suffix)
211  {
212  if (currpos == WalSegSz)
213  r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
214  else
215  {
216  pg_log_info("not renaming \"%s\", segment is not complete", fn);
217  r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
218  }
219  }
220  else
221  r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
222 
223  walfile = NULL;
224 
225  if (r != 0)
226  {
227  pg_log_error("could not close file \"%s\": %s",
229 
230  pg_free(fn);
231  return false;
232  }
233 
234  pg_free(fn);
235 
236  /*
237  * Mark file as archived if requested by the caller - pg_basebackup needs
238  * to do so as files can otherwise get archived again after promotion of a
239  * new node. This is in line with walreceiver.c always doing a
240  * XLogArchiveForceDone() after a complete segment.
241  */
242  if (currpos == WalSegSz && stream->mark_done)
243  {
244  /* writes error message if failed */
245  if (!mark_file_as_archived(stream, walfile_name))
246  return false;
247  }
248 
249  lastFlushPosition = pos;
250  return true;
251 }
252 
253 
254 /*
255  * Check if a timeline history file exists.
256  */
257 static bool
259 {
260  char histfname[MAXFNAMELEN];
261 
262  /*
263  * Timeline 1 never has a history file. We treat that as if it existed,
264  * since we never need to stream it.
265  */
266  if (stream->timeline == 1)
267  return true;
268 
269  TLHistoryFileName(histfname, stream->timeline);
270 
271  return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
272 }
273 
274 static bool
275 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
276 {
277  int size = strlen(content);
278  char histfname[MAXFNAMELEN];
279  Walfile *f;
280 
281  /*
282  * Check that the server's idea of how timeline history files should be
283  * named matches ours.
284  */
285  TLHistoryFileName(histfname, stream->timeline);
286  if (strcmp(histfname, filename) != 0)
287  {
288  pg_log_error("server reported unexpected history file name for timeline %u: %s",
289  stream->timeline, filename);
290  return false;
291  }
292 
293  f = stream->walmethod->ops->open_for_write(stream->walmethod,
294  histfname, ".tmp", 0);
295  if (f == NULL)
296  {
297  pg_log_error("could not create timeline history file \"%s\": %s",
298  histfname, GetLastWalMethodError(stream->walmethod));
299  return false;
300  }
301 
302  if ((int) stream->walmethod->ops->write(f, content, size) != size)
303  {
304  pg_log_error("could not write timeline history file \"%s\": %s",
305  histfname, GetLastWalMethodError(stream->walmethod));
306 
307  /*
308  * If we fail to make the file, delete it to release disk space
309  */
310  stream->walmethod->ops->close(f, CLOSE_UNLINK);
311 
312  return false;
313  }
314 
315  if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
316  {
317  pg_log_error("could not close file \"%s\": %s",
318  histfname, GetLastWalMethodError(stream->walmethod));
319  return false;
320  }
321 
322  /* Maintain archive_status, check close_walfile() for details. */
323  if (stream->mark_done)
324  {
325  /* writes error message if failed */
326  if (!mark_file_as_archived(stream, histfname))
327  return false;
328  }
329 
330  return true;
331 }
332 
333 /*
334  * Send a Standby Status Update message to server.
335  */
336 static bool
337 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
338 {
339  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
340  int len = 0;
341 
342  replybuf[len] = 'r';
343  len += 1;
344  fe_sendint64(blockpos, &replybuf[len]); /* write */
345  len += 8;
347  fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
348  else
349  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
350  len += 8;
351  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
352  len += 8;
353  fe_sendint64(now, &replybuf[len]); /* sendTime */
354  len += 8;
355  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
356  len += 1;
357 
358  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
359  {
360  pg_log_error("could not send feedback packet: %s",
362  return false;
363  }
364 
365  return true;
366 }
367 
368 /*
369  * Check that the server version we're connected to is supported by
370  * ReceiveXlogStream().
371  *
372  * If it's not, an error message is printed to stderr, and false is returned.
373  */
374 bool
376 {
377  int minServerMajor,
378  maxServerMajor;
379  int serverMajor;
380 
381  /*
382  * The message format used in streaming replication changed in 9.3, so we
383  * cannot stream from older servers. And we don't support servers newer
384  * than the client; it might work, but we don't know, so err on the safe
385  * side.
386  */
387  minServerMajor = 903;
388  maxServerMajor = PG_VERSION_NUM / 100;
389  serverMajor = PQserverVersion(conn) / 100;
390  if (serverMajor < minServerMajor)
391  {
392  const char *serverver = PQparameterStatus(conn, "server_version");
393 
394  pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
395  serverver ? serverver : "'unknown'",
396  "9.3");
397  return false;
398  }
399  else if (serverMajor > maxServerMajor)
400  {
401  const char *serverver = PQparameterStatus(conn, "server_version");
402 
403  pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
404  serverver ? serverver : "'unknown'",
405  PG_VERSION);
406  return false;
407  }
408  return true;
409 }
410 
411 /*
412  * Receive a log stream starting at the specified position.
413  *
414  * Individual parameters are passed through the StreamCtl structure.
415  *
416  * If sysidentifier is specified, validate that both the system
417  * identifier and the timeline matches the specified ones
418  * (by sending an extra IDENTIFY_SYSTEM command)
419  *
420  * All received segments will be written to the directory
421  * specified by basedir. This will also fetch any missing timeline history
422  * files.
423  *
424  * The stream_stop callback will be called every time data
425  * is received, and whenever a segment is completed. If it returns
426  * true, the streaming will stop and the function
427  * return. As long as it returns false, streaming will continue
428  * indefinitely.
429  *
430  * If stream_stop() checks for external input, stop_socket should be set to
431  * the FD it checks. This will allow such input to be detected promptly
432  * rather than after standby_message_timeout (which might be indefinite).
433  * Note that signals will interrupt waits for input as well, but that is
434  * race-y since a signal received while busy won't interrupt the wait.
435  *
436  * standby_message_timeout controls how often we send a message
437  * back to the primary letting it know our progress, in milliseconds.
438  * Zero means no messages are sent.
439  * This message will only contain the write location, and never
440  * flush or replay.
441  *
442  * If 'partial_suffix' is not NULL, files are initially created with the
443  * given suffix, and the suffix is removed once the file is finished. That
444  * allows you to tell the difference between partial and completed files,
445  * so that you can continue later where you left.
446  *
447  * If 'synchronous' is true, the received WAL is flushed as soon as written,
448  * otherwise only when the WAL file is closed.
449  *
450  * Note: The WAL location *must* be at a log segment start!
451  */
452 bool
454 {
455  char query[128];
456  char slotcmd[128];
457  PGresult *res;
458  XLogRecPtr stoppos;
459 
460  /*
461  * The caller should've checked the server version already, but doesn't do
462  * any harm to check it here too.
463  */
465  return false;
466 
467  /*
468  * Decide whether we want to report the flush position. If we report the
469  * flush position, the primary will know what WAL we'll possibly
470  * re-request, and it can then remove older WAL safely. We must always do
471  * that when we are using slots.
472  *
473  * Reporting the flush position makes one eligible as a synchronous
474  * replica. People shouldn't include generic names in
475  * synchronous_standby_names, but we've protected them against it so far,
476  * so let's continue to do so unless specifically requested.
477  */
478  if (stream->replication_slot != NULL)
479  {
480  reportFlushPosition = true;
481  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
482  }
483  else
484  {
485  if (stream->synchronous)
486  reportFlushPosition = true;
487  else
488  reportFlushPosition = false;
489  slotcmd[0] = 0;
490  }
491 
492  if (stream->sysidentifier != NULL)
493  {
494  char *sysidentifier = NULL;
495  TimeLineID servertli;
496 
497  /*
498  * Get the server system identifier and timeline, and validate them.
499  */
500  if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
501  {
502  pg_free(sysidentifier);
503  return false;
504  }
505 
506  if (strcmp(stream->sysidentifier, sysidentifier) != 0)
507  {
508  pg_log_error("system identifier does not match between base backup and streaming connection");
509  pg_free(sysidentifier);
510  return false;
511  }
512  pg_free(sysidentifier);
513 
514  if (stream->timeline > servertli)
515  {
516  pg_log_error("starting timeline %u is not present in the server",
517  stream->timeline);
518  return false;
519  }
520  }
521 
522  /*
523  * initialize flush position to starting point, it's the caller's
524  * responsibility that that's sane.
525  */
526  lastFlushPosition = stream->startpos;
527 
528  while (1)
529  {
530  /*
531  * Fetch the timeline history file for this timeline, if we don't have
532  * it already. When streaming log to tar, this will always return
533  * false, as we are never streaming into an existing file and
534  * therefore there can be no pre-existing timeline history file.
535  */
536  if (!existsTimeLineHistoryFile(stream))
537  {
538  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
539  res = PQexec(conn, query);
541  {
542  /* FIXME: we might send it ok, but get an error */
543  pg_log_error("could not send replication command \"%s\": %s",
544  "TIMELINE_HISTORY", PQresultErrorMessage(res));
545  PQclear(res);
546  return false;
547  }
548 
549  /*
550  * The response to TIMELINE_HISTORY is a single row result set
551  * with two fields: filename and content
552  */
553  if (PQnfields(res) != 2 || PQntuples(res) != 1)
554  {
555  pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
556  PQntuples(res), PQnfields(res), 1, 2);
557  }
558 
559  /* Write the history file to disk */
561  PQgetvalue(res, 0, 0),
562  PQgetvalue(res, 0, 1));
563 
564  PQclear(res);
565  }
566 
567  /*
568  * Before we start streaming from the requested location, check if the
569  * callback tells us to stop here.
570  */
571  if (stream->stream_stop(stream->startpos, stream->timeline, false))
572  return true;
573 
574  /* Initiate the replication stream at specified location */
575  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
576  slotcmd,
577  LSN_FORMAT_ARGS(stream->startpos),
578  stream->timeline);
579  res = PQexec(conn, query);
581  {
582  pg_log_error("could not send replication command \"%s\": %s",
583  "START_REPLICATION", PQresultErrorMessage(res));
584  PQclear(res);
585  return false;
586  }
587  PQclear(res);
588 
589  /* Stream the WAL */
590  res = HandleCopyStream(conn, stream, &stoppos);
591  if (res == NULL)
592  goto error;
593 
594  /*
595  * Streaming finished.
596  *
597  * There are two possible reasons for that: a controlled shutdown, or
598  * we reached the end of the current timeline. In case of
599  * end-of-timeline, the server sends a result set after Copy has
600  * finished, containing information about the next timeline. Read
601  * that, and restart streaming from the next timeline. In case of
602  * controlled shutdown, stop here.
603  */
605  {
606  /*
607  * End-of-timeline. Read the next timeline's ID and starting
608  * position. Usually, the starting position will match the end of
609  * the previous timeline, but there are corner cases like if the
610  * server had sent us half of a WAL record, when it was promoted.
611  * The new timeline will begin at the end of the last complete
612  * record in that case, overlapping the partial WAL record on the
613  * old timeline.
614  */
615  uint32 newtimeline;
616  bool parsed;
617 
618  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
619  PQclear(res);
620  if (!parsed)
621  goto error;
622 
623  /* Sanity check the values the server gave us */
624  if (newtimeline <= stream->timeline)
625  {
626  pg_log_error("server reported unexpected next timeline %u, following timeline %u",
627  newtimeline, stream->timeline);
628  goto error;
629  }
630  if (stream->startpos > stoppos)
631  {
632  pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
633  stream->timeline, LSN_FORMAT_ARGS(stoppos),
634  newtimeline, LSN_FORMAT_ARGS(stream->startpos));
635  goto error;
636  }
637 
638  /* Read the final result, which should be CommandComplete. */
639  res = PQgetResult(conn);
641  {
642  pg_log_error("unexpected termination of replication stream: %s",
644  PQclear(res);
645  goto error;
646  }
647  PQclear(res);
648 
649  /*
650  * Loop back to start streaming from the new timeline. Always
651  * start streaming at the beginning of a segment.
652  */
653  stream->timeline = newtimeline;
654  stream->startpos = stream->startpos -
656  continue;
657  }
658  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
659  {
660  PQclear(res);
661 
662  /*
663  * End of replication (ie. controlled shut down of the server).
664  *
665  * Check if the callback thinks it's OK to stop here. If not,
666  * complain.
667  */
668  if (stream->stream_stop(stoppos, stream->timeline, false))
669  return true;
670  else
671  {
672  pg_log_error("replication stream was terminated before stop point");
673  goto error;
674  }
675  }
676  else
677  {
678  /* Server returned an error. */
679  pg_log_error("unexpected termination of replication stream: %s",
681  PQclear(res);
682  goto error;
683  }
684  }
685 
686 error:
687  if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
688  pg_log_error("could not close file \"%s\": %s",
690  walfile = NULL;
691  return false;
692 }
693 
694 /*
695  * Helper function to parse the result set returned by server after streaming
696  * has finished. On failure, prints an error to stderr and returns false.
697  */
698 static bool
700 {
701  uint32 startpos_xlogid,
702  startpos_xrecoff;
703 
704  /*----------
705  * The result set consists of one row and two columns, e.g:
706  *
707  * next_tli | next_tli_startpos
708  * ----------+-------------------
709  * 4 | 0/9949AE0
710  *
711  * next_tli is the timeline ID of the next timeline after the one that
712  * just finished streaming. next_tli_startpos is the WAL location where
713  * the server switched to it.
714  *----------
715  */
716  if (PQnfields(res) < 2 || PQntuples(res) != 1)
717  {
718  pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
719  PQntuples(res), PQnfields(res), 1, 2);
720  return false;
721  }
722 
723  *timeline = atoi(PQgetvalue(res, 0, 0));
724  if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
725  &startpos_xrecoff) != 2)
726  {
727  pg_log_error("could not parse next timeline's starting point \"%s\"",
728  PQgetvalue(res, 0, 1));
729  return false;
730  }
731  *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
732 
733  return true;
734 }
735 
736 /*
737  * The main loop of ReceiveXlogStream. Handles the COPY stream after
738  * initiating streaming with the START_REPLICATION command.
739  *
740  * If the COPY ends (not necessarily successfully) due a message from the
741  * server, returns a PGresult and sets *stoppos to the last byte written.
742  * On any other sort of error, returns NULL.
743  */
744 static PGresult *
746  XLogRecPtr *stoppos)
747 {
748  char *copybuf = NULL;
749  TimestampTz last_status = -1;
750  XLogRecPtr blockpos = stream->startpos;
751 
752  still_sending = true;
753 
754  while (1)
755  {
756  int r;
758  long sleeptime;
759 
760  /*
761  * Check if we should continue streaming, or abort at this point.
762  */
763  if (!CheckCopyStreamStop(conn, stream, blockpos))
764  goto error;
765 
767 
768  /*
769  * If synchronous option is true, issue sync command as soon as there
770  * are WAL data which has not been flushed yet.
771  */
772  if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
773  {
774  if (stream->walmethod->ops->sync(walfile) != 0)
775  pg_fatal("could not fsync file \"%s\": %s",
777  lastFlushPosition = blockpos;
778 
779  /*
780  * Send feedback so that the server sees the latest WAL locations
781  * immediately.
782  */
783  if (!sendFeedback(conn, blockpos, now, false))
784  goto error;
785  last_status = now;
786  }
787 
788  /*
789  * Potentially send a status message to the primary
790  */
791  if (still_sending && stream->standby_message_timeout > 0 &&
792  feTimestampDifferenceExceeds(last_status, now,
793  stream->standby_message_timeout))
794  {
795  /* Time to send feedback! */
796  if (!sendFeedback(conn, blockpos, now, false))
797  goto error;
798  last_status = now;
799  }
800 
801  /*
802  * Calculate how long send/receive loops should sleep
803  */
805  last_status);
806 
807  r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
808  while (r != 0)
809  {
810  if (r == -1)
811  goto error;
812  if (r == -2)
813  {
814  PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
815 
816  if (res == NULL)
817  goto error;
818  else
819  return res;
820  }
821 
822  /* Check the message type. */
823  if (copybuf[0] == 'k')
824  {
825  if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
826  &last_status))
827  goto error;
828  }
829  else if (copybuf[0] == 'w')
830  {
831  if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
832  goto error;
833 
834  /*
835  * Check if we should continue streaming, or abort at this
836  * point.
837  */
838  if (!CheckCopyStreamStop(conn, stream, blockpos))
839  goto error;
840  }
841  else
842  {
843  pg_log_error("unrecognized streaming header: \"%c\"",
844  copybuf[0]);
845  goto error;
846  }
847 
848  /*
849  * Process the received data, and any subsequent data we can read
850  * without blocking.
851  */
852  r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
853  }
854  }
855 
856 error:
858  return NULL;
859 }
860 
861 /*
862  * Wait until we can read a CopyData message,
863  * or timeout, or occurrence of a signal or input on the stop_socket.
864  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
865  *
866  * Returns 1 if data has become available for reading, 0 if timed out
867  * or interrupted by signal or stop_socket input, and -1 on an error.
868  */
869 static int
870 CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
871 {
872  int ret;
873  fd_set input_mask;
874  int connsocket;
875  int maxfd;
876  struct timeval timeout;
877  struct timeval *timeoutptr;
878 
879  connsocket = PQsocket(conn);
880  if (connsocket < 0)
881  {
882  pg_log_error("invalid socket: %s", PQerrorMessage(conn));
883  return -1;
884  }
885 
886  FD_ZERO(&input_mask);
887  FD_SET(connsocket, &input_mask);
888  maxfd = connsocket;
889  if (stop_socket != PGINVALID_SOCKET)
890  {
891  FD_SET(stop_socket, &input_mask);
892  maxfd = Max(maxfd, stop_socket);
893  }
894 
895  if (timeout_ms < 0)
896  timeoutptr = NULL;
897  else
898  {
899  timeout.tv_sec = timeout_ms / 1000L;
900  timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
901  timeoutptr = &timeout;
902  }
903 
904  ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
905 
906  if (ret < 0)
907  {
908  if (errno == EINTR)
909  return 0; /* Got a signal, so not an error */
910  pg_log_error("%s() failed: %m", "select");
911  return -1;
912  }
913  if (ret > 0 && FD_ISSET(connsocket, &input_mask))
914  return 1; /* Got input on connection socket */
915 
916  return 0; /* Got timeout or input on stop_socket */
917 }
918 
919 /*
920  * Receive CopyData message available from XLOG stream, blocking for
921  * maximum of 'timeout' ms.
922  *
923  * If data was received, returns the length of the data. *buffer is set to
924  * point to a buffer holding the received message. The buffer is only valid
925  * until the next CopyStreamReceive call.
926  *
927  * Returns 0 if no data was available within timeout, or if wait was
928  * interrupted by signal or stop_socket input.
929  * -1 on error. -2 if the server ended the COPY.
930  */
931 static int
932 CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
933  char **buffer)
934 {
935  char *copybuf = NULL;
936  int rawlen;
937 
938  PQfreemem(*buffer);
939  *buffer = NULL;
940 
941  /* Try to receive a CopyData message */
942  rawlen = PQgetCopyData(conn, &copybuf, 1);
943  if (rawlen == 0)
944  {
945  int ret;
946 
947  /*
948  * No data available. Wait for some to appear, but not longer than
949  * the specified timeout, so that we can ping the server. Also stop
950  * waiting if input appears on stop_socket.
951  */
952  ret = CopyStreamPoll(conn, timeout, stop_socket);
953  if (ret <= 0)
954  return ret;
955 
956  /* Now there is actually data on the socket */
957  if (PQconsumeInput(conn) == 0)
958  {
959  pg_log_error("could not receive data from WAL stream: %s",
961  return -1;
962  }
963 
964  /* Now that we've consumed some input, try again */
965  rawlen = PQgetCopyData(conn, &copybuf, 1);
966  if (rawlen == 0)
967  return 0;
968  }
969  if (rawlen == -1) /* end-of-streaming or error */
970  return -2;
971  if (rawlen == -2)
972  {
973  pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
974  return -1;
975  }
976 
977  /* Return received messages to caller */
978  *buffer = copybuf;
979  return rawlen;
980 }
981 
982 /*
983  * Process the keepalive message.
984  */
985 static bool
987  XLogRecPtr blockpos, TimestampTz *last_status)
988 {
989  int pos;
990  bool replyRequested;
992 
993  /*
994  * Parse the keepalive message, enclosed in the CopyData message. We just
995  * check if the server requested a reply, and ignore the rest.
996  */
997  pos = 1; /* skip msgtype 'k' */
998  pos += 8; /* skip walEnd */
999  pos += 8; /* skip sendTime */
1000 
1001  if (len < pos + 1)
1002  {
1003  pg_log_error("streaming header too small: %d", len);
1004  return false;
1005  }
1006  replyRequested = copybuf[pos];
1007 
1008  /* If the server requested an immediate reply, send one. */
1009  if (replyRequested && still_sending)
1010  {
1011  if (reportFlushPosition && lastFlushPosition < blockpos &&
1012  walfile != NULL)
1013  {
1014  /*
1015  * If a valid flush location needs to be reported, flush the
1016  * current WAL file so that the latest flush location is sent back
1017  * to the server. This is necessary to see whether the last WAL
1018  * data has been successfully replicated or not, at the normal
1019  * shutdown of the server.
1020  */
1021  if (stream->walmethod->ops->sync(walfile) != 0)
1022  pg_fatal("could not fsync file \"%s\": %s",
1024  lastFlushPosition = blockpos;
1025  }
1026 
1028  if (!sendFeedback(conn, blockpos, now, false))
1029  return false;
1030  *last_status = now;
1031  }
1032 
1033  return true;
1034 }
1035 
1036 /*
1037  * Process XLogData message.
1038  */
1039 static bool
1041  XLogRecPtr *blockpos)
1042 {
1043  int xlogoff;
1044  int bytes_left;
1045  int bytes_written;
1046  int hdr_len;
1047 
1048  /*
1049  * Once we've decided we don't want to receive any more, just ignore any
1050  * subsequent XLogData messages.
1051  */
1052  if (!(still_sending))
1053  return true;
1054 
1055  /*
1056  * Read the header of the XLogData message, enclosed in the CopyData
1057  * message. We only need the WAL location field (dataStart), the rest of
1058  * the header is ignored.
1059  */
1060  hdr_len = 1; /* msgtype 'w' */
1061  hdr_len += 8; /* dataStart */
1062  hdr_len += 8; /* walEnd */
1063  hdr_len += 8; /* sendTime */
1064  if (len < hdr_len)
1065  {
1066  pg_log_error("streaming header too small: %d", len);
1067  return false;
1068  }
1069  *blockpos = fe_recvint64(&copybuf[1]);
1070 
1071  /* Extract WAL location for this block */
1072  xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1073 
1074  /*
1075  * Verify that the initial location in the stream matches where we think
1076  * we are.
1077  */
1078  if (walfile == NULL)
1079  {
1080  /* No file open yet */
1081  if (xlogoff != 0)
1082  {
1083  pg_log_error("received write-ahead log record for offset %u with no file open",
1084  xlogoff);
1085  return false;
1086  }
1087  }
1088  else
1089  {
1090  /* More data in existing segment */
1091  if (walfile->currpos != xlogoff)
1092  {
1093  pg_log_error("got WAL data offset %08x, expected %08x",
1094  xlogoff, (int) walfile->currpos);
1095  return false;
1096  }
1097  }
1098 
1099  bytes_left = len - hdr_len;
1100  bytes_written = 0;
1101 
1102  while (bytes_left)
1103  {
1104  int bytes_to_write;
1105 
1106  /*
1107  * If crossing a WAL boundary, only write up until we reach wal
1108  * segment size.
1109  */
1110  if (xlogoff + bytes_left > WalSegSz)
1111  bytes_to_write = WalSegSz - xlogoff;
1112  else
1113  bytes_to_write = bytes_left;
1114 
1115  if (walfile == NULL)
1116  {
1117  if (!open_walfile(stream, *blockpos))
1118  {
1119  /* Error logged by open_walfile */
1120  return false;
1121  }
1122  }
1123 
1124  if (stream->walmethod->ops->write(walfile,
1125  copybuf + hdr_len + bytes_written,
1126  bytes_to_write) != bytes_to_write)
1127  {
1128  pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1129  bytes_to_write, walfile->pathname,
1130  GetLastWalMethodError(stream->walmethod));
1131  return false;
1132  }
1133 
1134  /* Write was successful, advance our position */
1135  bytes_written += bytes_to_write;
1136  bytes_left -= bytes_to_write;
1137  *blockpos += bytes_to_write;
1138  xlogoff += bytes_to_write;
1139 
1140  /* Did we reach the end of a WAL segment? */
1141  if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1142  {
1143  if (!close_walfile(stream, *blockpos))
1144  /* Error message written in close_walfile() */
1145  return false;
1146 
1147  xlogoff = 0;
1148 
1149  if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1150  {
1151  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1152  {
1153  pg_log_error("could not send copy-end packet: %s",
1154  PQerrorMessage(conn));
1155  return false;
1156  }
1157  still_sending = false;
1158  return true; /* ignore the rest of this XLogData packet */
1159  }
1160  }
1161  }
1162  /* No more data left to write, receive next copy packet */
1163 
1164  return true;
1165 }
1166 
1167 /*
1168  * Handle end of the copy stream.
1169  */
1170 static PGresult *
1172  XLogRecPtr blockpos, XLogRecPtr *stoppos)
1173 {
1175 
1176  /*
1177  * The server closed its end of the copy stream. If we haven't closed
1178  * ours already, we need to do so now, unless the server threw an error,
1179  * in which case we don't.
1180  */
1181  if (still_sending)
1182  {
1183  if (!close_walfile(stream, blockpos))
1184  {
1185  /* Error message written in close_walfile() */
1186  PQclear(res);
1187  return NULL;
1188  }
1190  {
1191  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1192  {
1193  pg_log_error("could not send copy-end packet: %s",
1194  PQerrorMessage(conn));
1195  PQclear(res);
1196  return NULL;
1197  }
1198  res = PQgetResult(conn);
1199  }
1200  still_sending = false;
1201  }
1202  PQfreemem(copybuf);
1203  *stoppos = blockpos;
1204  return res;
1205 }
1206 
1207 /*
1208  * Check if we should continue streaming, or abort at this point.
1209  */
1210 static bool
1212 {
1213  if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1214  {
1215  if (!close_walfile(stream, blockpos))
1216  {
1217  /* Potential error message is written by close_walfile */
1218  return false;
1219  }
1220  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1221  {
1222  pg_log_error("could not send copy-end packet: %s",
1223  PQerrorMessage(conn));
1224  return false;
1225  }
1226  still_sending = false;
1227  }
1228 
1229  return true;
1230 }
1231 
1232 /*
1233  * Calculate how long send/receive loops should sleep
1234  */
1235 static long
1237  TimestampTz last_status)
1238 {
1239  TimestampTz status_targettime = 0;
1240  long sleeptime;
1241 
1243  status_targettime = last_status +
1244  (standby_message_timeout - 1) * ((int64) 1000);
1245 
1246  if (status_targettime > 0)
1247  {
1248  long secs;
1249  int usecs;
1250 
1252  status_targettime,
1253  &secs,
1254  &usecs);
1255  /* Always sleep at least 1 sec */
1256  if (secs <= 0)
1257  {
1258  secs = 1;
1259  usecs = 0;
1260  }
1261 
1262  sleeptime = secs * 1000 + usecs / 1000;
1263  }
1264  else
1265  sleeptime = -1;
1266 
1267  return sleeptime;
1268 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
unsigned int uint32
Definition: c.h:493
#define ngettext(s, p, n)
Definition: c.h:1168
#define Max(x, y)
Definition: c.h:985
@ PG_COMPRESSION_NONE
Definition: compression.h:23
int64 TimestampTz
Definition: timestamp.h:39
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6913
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6938
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6948
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6974
int PQflush(PGconn *conn)
Definition: fe-exec.c:3960
void PQfreemem(void *ptr)
Definition: fe-exec.c:3992
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3371
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3387
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2711
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3441
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2657
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2224
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1960
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3836
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3449
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2038
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2778
void pg_free(void *ptr)
Definition: fe_memutils.c:105
@ PGRES_COPY_IN
Definition: libpq-fe.h:107
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:112
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:103
exit(1)
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_info(...)
Definition: logging.h:124
#define pg_fatal(...)
static int standby_message_timeout
#define MAXPGPATH
const void size_t len
static char * filename
Definition: pg_dumpall.c:121
static XLogRecPtr startpos
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:240
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:238
#define PGINVALID_SOCKET
Definition: port.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:745
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:90
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:54
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
Definition: receivelog.c:932
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
Definition: receivelog.c:1211
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:986
static bool reportFlushPosition
Definition: receivelog.c:30
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1040
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:192
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:258
static bool still_sending
Definition: receivelog.c:33
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:275
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
Definition: receivelog.c:453
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1171
static Walfile * walfile
Definition: receivelog.c:29
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1236
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:375
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:337
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:31
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
Definition: receivelog.c:870
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:699
static pg_noinline void Size size
Definition: slab.c:607
static void error(void)
Definition: sql-dyntest.c:147
int WalSegSz
Definition: streamutil.c:34
int64 fe_recvint64(char *buf)
Definition: streamutil.c:931
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:866
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:885
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:920
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:907
PGconn * conn
Definition: streamutil.c:55
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:477
char * sysidentifier
Definition: receivelog.h:33
TimeLineID timeline
Definition: receivelog.h:32
stream_stop_callback stream_stop
Definition: receivelog.h:41
char * replication_slot
Definition: receivelog.h:48
XLogRecPtr startpos
Definition: receivelog.h:31
pgsocket stop_socket
Definition: receivelog.h:43
int standby_message_timeout
Definition: receivelog.h:35
WalWriteMethod * walmethod
Definition: receivelog.h:46
bool mark_done
Definition: receivelog.h:37
char * partial_suffix
Definition: receivelog.h:47
bool synchronous
Definition: receivelog.h:36
bool(* existsfile)(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.h:58
ssize_t(* write)(Walfile *f, const void *buf, size_t count)
Definition: walmethods.h:73
char *(* get_file_name)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Definition: walmethods.h:67
ssize_t(* get_file_size)(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.h:61
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:49
int(* close)(Walfile *f, WalCloseMethod method)
Definition: walmethods.h:55
int(* sync)(Walfile *f)
Definition: walmethods.h:78
const WalWriteMethodOps * ops
Definition: walmethods.h:105
pg_compress_algorithm compression_algorithm
Definition: walmethods.h:106
char * pathname
Definition: walmethods.h:21
off_t currpos
Definition: walmethods.h:20
static StringInfo copybuf
Definition: tablesync.c:130
static void * fn(void *arg)
Definition: thread-alloc.c:119
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
Definition: walmethods.c:1383
@ CLOSE_UNLINK
Definition: walmethods.h:34
@ CLOSE_NO_RENAME
Definition: walmethods.h:35
@ CLOSE_NORMAL
Definition: walmethods.h:33
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:495
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define MAXFNAMELEN
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48