PostgreSQL Source Code git master
receivelog.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * receivelog.c - receive WAL files using the streaming
4 * replication protocol.
5 *
6 * Author: Magnus Hagander <magnus@hagander.net>
7 *
8 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
9 *
10 * IDENTIFICATION
11 * src/bin/pg_basebackup/receivelog.c
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres_fe.h"
16
17#include <sys/select.h>
18#include <sys/stat.h>
19#include <unistd.h>
20
22#include "common/logging.h"
23#include "libpq-fe.h"
24#include "receivelog.h"
25#include "streamutil.h"
26
27/* currently open WAL file */
28static Walfile *walfile = NULL;
29static bool reportFlushPosition = false;
31
32static bool still_sending = true; /* feedback still needs to be sent? */
33
35 XLogRecPtr *stoppos);
36static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
37static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
38 char **buffer);
39static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
40 int len, XLogRecPtr blockpos, TimestampTz *last_status);
41static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
42 XLogRecPtr *blockpos);
44 XLogRecPtr blockpos, XLogRecPtr *stoppos);
45static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
47 TimestampTz last_status);
48
50 uint32 *timeline);
51
52static bool
53mark_file_as_archived(StreamCtl *stream, const char *fname)
54{
55 Walfile *f;
56 static char tmppath[MAXPGPATH];
57
58 snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
59 fname);
60
61 f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
62 NULL, 0);
63 if (f == NULL)
64 {
65 pg_log_error("could not create archive status file \"%s\": %s",
66 tmppath, GetLastWalMethodError(stream->walmethod));
67 return false;
68 }
69
70 if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
71 {
72 pg_log_error("could not close archive status file \"%s\": %s",
73 tmppath, GetLastWalMethodError(stream->walmethod));
74 return false;
75 }
76
77 return true;
78}
79
80/*
81 * Open a new WAL file in the specified directory.
82 *
83 * Returns true if OK; on failure, returns false after printing an error msg.
84 * On success, 'walfile' is set to the opened WAL file.
85 *
86 * The file will be padded to 16Mb with zeroes.
87 */
88static bool
89open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
90{
91 Walfile *f;
92 char *fn;
93 ssize_t size;
94 XLogSegNo segno;
95 char walfile_name[MAXPGPATH];
96
97 XLByteToSeg(startpoint, segno, WalSegSz);
98 XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
99
100 /* Note that this considers the compression used if necessary */
101 fn = stream->walmethod->ops->get_file_name(stream->walmethod,
102 walfile_name,
103 stream->partial_suffix);
104
105 /*
106 * When streaming to files, if an existing file exists we verify that it's
107 * either empty (just created), or a complete WalSegSz segment (in which
108 * case it has been created and padded). Anything else indicates a corrupt
109 * file. Compressed files have no need for padding, so just ignore this
110 * case.
111 *
112 * When streaming to tar, no file with this name will exist before, so we
113 * never have to verify a size.
114 */
116 stream->walmethod->ops->existsfile(stream->walmethod, fn))
117 {
118 size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
119 if (size < 0)
120 {
121 pg_log_error("could not get size of write-ahead log file \"%s\": %s",
123 pg_free(fn);
124 return false;
125 }
126 if (size == WalSegSz)
127 {
128 /* Already padded file. Open it for use */
129 f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
130 if (f == NULL)
131 {
132 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
134 pg_free(fn);
135 return false;
136 }
137
138 /* fsync file in case of a previous crash */
139 if (stream->walmethod->ops->sync(f) != 0)
140 {
141 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
143 stream->walmethod->ops->close(f, CLOSE_UNLINK);
144 exit(1);
145 }
146
147 walfile = f;
148 pg_free(fn);
149 return true;
150 }
151 if (size != 0)
152 {
153 /* if write didn't set errno, assume problem is no disk space */
154 if (errno == 0)
155 errno = ENOSPC;
156 pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
157 "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
158 size),
159 fn, size, WalSegSz);
160 pg_free(fn);
161 return false;
162 }
163 /* File existed and was empty, so fall through and open */
164 }
165
166 /* No file existed, so create one */
167
168 f = stream->walmethod->ops->open_for_write(stream->walmethod,
169 walfile_name,
170 stream->partial_suffix,
171 WalSegSz);
172 if (f == NULL)
173 {
174 pg_log_error("could not open write-ahead log file \"%s\": %s",
176 pg_free(fn);
177 return false;
178 }
179
180 pg_free(fn);
181 walfile = f;
182 return true;
183}
184
185/*
186 * Close the current WAL file (if open), and rename it to the correct
187 * filename if it's complete. On failure, prints an error message to stderr
188 * and returns false, otherwise returns true.
189 */
190static bool
192{
193 char *fn;
194 pgoff_t currpos;
195 int r;
196 char walfile_name[MAXPGPATH];
197
198 if (walfile == NULL)
199 return true;
200
201 strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
202 currpos = walfile->currpos;
203
204 /* Note that this considers the compression used if necessary */
205 fn = stream->walmethod->ops->get_file_name(stream->walmethod,
206 walfile_name,
207 stream->partial_suffix);
208
209 if (stream->partial_suffix)
210 {
211 if (currpos == WalSegSz)
212 r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
213 else
214 {
215 pg_log_info("not renaming \"%s\", segment is not complete", fn);
216 r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
217 }
218 }
219 else
220 r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
221
222 walfile = NULL;
223
224 if (r != 0)
225 {
226 pg_log_error("could not close file \"%s\": %s",
228
229 pg_free(fn);
230 return false;
231 }
232
233 pg_free(fn);
234
235 /*
236 * Mark file as archived if requested by the caller - pg_basebackup needs
237 * to do so as files can otherwise get archived again after promotion of a
238 * new node. This is in line with walreceiver.c always doing a
239 * XLogArchiveForceDone() after a complete segment.
240 */
241 if (currpos == WalSegSz && stream->mark_done)
242 {
243 /* writes error message if failed */
244 if (!mark_file_as_archived(stream, walfile_name))
245 return false;
246 }
247
248 lastFlushPosition = pos;
249 return true;
250}
251
252
253/*
254 * Check if a timeline history file exists.
255 */
256static bool
258{
259 char histfname[MAXFNAMELEN];
260
261 /*
262 * Timeline 1 never has a history file. We treat that as if it existed,
263 * since we never need to stream it.
264 */
265 if (stream->timeline == 1)
266 return true;
267
268 TLHistoryFileName(histfname, stream->timeline);
269
270 return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
271}
272
273static bool
274writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
275{
276 int size = strlen(content);
277 char histfname[MAXFNAMELEN];
278 Walfile *f;
279
280 /*
281 * Check that the server's idea of how timeline history files should be
282 * named matches ours.
283 */
284 TLHistoryFileName(histfname, stream->timeline);
285 if (strcmp(histfname, filename) != 0)
286 {
287 pg_log_error("server reported unexpected history file name for timeline %u: %s",
288 stream->timeline, filename);
289 return false;
290 }
291
292 f = stream->walmethod->ops->open_for_write(stream->walmethod,
293 histfname, ".tmp", 0);
294 if (f == NULL)
295 {
296 pg_log_error("could not create timeline history file \"%s\": %s",
297 histfname, GetLastWalMethodError(stream->walmethod));
298 return false;
299 }
300
301 if ((int) stream->walmethod->ops->write(f, content, size) != size)
302 {
303 pg_log_error("could not write timeline history file \"%s\": %s",
304 histfname, GetLastWalMethodError(stream->walmethod));
305
306 /*
307 * If we fail to make the file, delete it to release disk space
308 */
309 stream->walmethod->ops->close(f, CLOSE_UNLINK);
310
311 return false;
312 }
313
314 if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
315 {
316 pg_log_error("could not close file \"%s\": %s",
317 histfname, GetLastWalMethodError(stream->walmethod));
318 return false;
319 }
320
321 /* Maintain archive_status, check close_walfile() for details. */
322 if (stream->mark_done)
323 {
324 /* writes error message if failed */
325 if (!mark_file_as_archived(stream, histfname))
326 return false;
327 }
328
329 return true;
330}
331
332/*
333 * Send a Standby Status Update message to server.
334 */
335static bool
336sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
337{
338 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
339 int len = 0;
340
341 replybuf[len] = 'r';
342 len += 1;
343 fe_sendint64(blockpos, &replybuf[len]); /* write */
344 len += 8;
346 fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
347 else
348 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
349 len += 8;
350 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
351 len += 8;
352 fe_sendint64(now, &replybuf[len]); /* sendTime */
353 len += 8;
354 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
355 len += 1;
356
357 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
358 {
359 pg_log_error("could not send feedback packet: %s",
361 return false;
362 }
363
364 return true;
365}
366
367/*
368 * Check that the server version we're connected to is supported by
369 * ReceiveXlogStream().
370 *
371 * If it's not, an error message is printed to stderr, and false is returned.
372 */
373bool
375{
376 int minServerMajor,
377 maxServerMajor;
378 int serverMajor;
379
380 /*
381 * The message format used in streaming replication changed in 9.3, so we
382 * cannot stream from older servers. And we don't support servers newer
383 * than the client; it might work, but we don't know, so err on the safe
384 * side.
385 */
386 minServerMajor = 903;
387 maxServerMajor = PG_VERSION_NUM / 100;
388 serverMajor = PQserverVersion(conn) / 100;
389 if (serverMajor < minServerMajor)
390 {
391 const char *serverver = PQparameterStatus(conn, "server_version");
392
393 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
394 serverver ? serverver : "'unknown'",
395 "9.3");
396 return false;
397 }
398 else if (serverMajor > maxServerMajor)
399 {
400 const char *serverver = PQparameterStatus(conn, "server_version");
401
402 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
403 serverver ? serverver : "'unknown'",
404 PG_VERSION);
405 return false;
406 }
407 return true;
408}
409
410/*
411 * Receive a log stream starting at the specified position.
412 *
413 * Individual parameters are passed through the StreamCtl structure.
414 *
415 * If sysidentifier is specified, validate that both the system
416 * identifier and the timeline matches the specified ones
417 * (by sending an extra IDENTIFY_SYSTEM command)
418 *
419 * All received segments will be written to the directory
420 * specified by basedir. This will also fetch any missing timeline history
421 * files.
422 *
423 * The stream_stop callback will be called every time data
424 * is received, and whenever a segment is completed. If it returns
425 * true, the streaming will stop and the function
426 * return. As long as it returns false, streaming will continue
427 * indefinitely.
428 *
429 * If stream_stop() checks for external input, stop_socket should be set to
430 * the FD it checks. This will allow such input to be detected promptly
431 * rather than after standby_message_timeout (which might be indefinite).
432 * Note that signals will interrupt waits for input as well, but that is
433 * race-y since a signal received while busy won't interrupt the wait.
434 *
435 * standby_message_timeout controls how often we send a message
436 * back to the primary letting it know our progress, in milliseconds.
437 * Zero means no messages are sent.
438 * This message will only contain the write location, and never
439 * flush or replay.
440 *
441 * If 'partial_suffix' is not NULL, files are initially created with the
442 * given suffix, and the suffix is removed once the file is finished. That
443 * allows you to tell the difference between partial and completed files,
444 * so that you can continue later where you left.
445 *
446 * If 'synchronous' is true, the received WAL is flushed as soon as written,
447 * otherwise only when the WAL file is closed.
448 *
449 * Note: The WAL location *must* be at a log segment start!
450 */
451bool
453{
454 char query[128];
455 char slotcmd[128];
456 PGresult *res;
457 XLogRecPtr stoppos;
458
459 /*
460 * The caller should've checked the server version already, but doesn't do
461 * any harm to check it here too.
462 */
464 return false;
465
466 /*
467 * Decide whether we want to report the flush position. If we report the
468 * flush position, the primary will know what WAL we'll possibly
469 * re-request, and it can then remove older WAL safely. We must always do
470 * that when we are using slots.
471 *
472 * Reporting the flush position makes one eligible as a synchronous
473 * replica. People shouldn't include generic names in
474 * synchronous_standby_names, but we've protected them against it so far,
475 * so let's continue to do so unless specifically requested.
476 */
477 if (stream->replication_slot != NULL)
478 {
479 reportFlushPosition = true;
480 sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
481 }
482 else
483 {
484 if (stream->synchronous)
485 reportFlushPosition = true;
486 else
487 reportFlushPosition = false;
488 slotcmd[0] = 0;
489 }
490
491 if (stream->sysidentifier != NULL)
492 {
493 char *sysidentifier = NULL;
494 TimeLineID servertli;
495
496 /*
497 * Get the server system identifier and timeline, and validate them.
498 */
499 if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
500 {
501 pg_free(sysidentifier);
502 return false;
503 }
504
505 if (strcmp(stream->sysidentifier, sysidentifier) != 0)
506 {
507 pg_log_error("system identifier does not match between base backup and streaming connection");
508 pg_free(sysidentifier);
509 return false;
510 }
511 pg_free(sysidentifier);
512
513 if (stream->timeline > servertli)
514 {
515 pg_log_error("starting timeline %u is not present in the server",
516 stream->timeline);
517 return false;
518 }
519 }
520
521 /*
522 * initialize flush position to starting point, it's the caller's
523 * responsibility that that's sane.
524 */
525 lastFlushPosition = stream->startpos;
526
527 while (1)
528 {
529 /*
530 * Fetch the timeline history file for this timeline, if we don't have
531 * it already. When streaming log to tar, this will always return
532 * false, as we are never streaming into an existing file and
533 * therefore there can be no pre-existing timeline history file.
534 */
535 if (!existsTimeLineHistoryFile(stream))
536 {
537 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
538 res = PQexec(conn, query);
540 {
541 /* FIXME: we might send it ok, but get an error */
542 pg_log_error("could not send replication command \"%s\": %s",
543 "TIMELINE_HISTORY", PQresultErrorMessage(res));
544 PQclear(res);
545 return false;
546 }
547
548 /*
549 * The response to TIMELINE_HISTORY is a single row result set
550 * with two fields: filename and content
551 */
552 if (PQnfields(res) != 2 || PQntuples(res) != 1)
553 {
554 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
555 PQntuples(res), PQnfields(res), 1, 2);
556 }
557
558 /* Write the history file to disk */
560 PQgetvalue(res, 0, 0),
561 PQgetvalue(res, 0, 1));
562
563 PQclear(res);
564 }
565
566 /*
567 * Before we start streaming from the requested location, check if the
568 * callback tells us to stop here.
569 */
570 if (stream->stream_stop(stream->startpos, stream->timeline, false))
571 return true;
572
573 /* Initiate the replication stream at specified location */
574 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
575 slotcmd,
576 LSN_FORMAT_ARGS(stream->startpos),
577 stream->timeline);
578 res = PQexec(conn, query);
580 {
581 pg_log_error("could not send replication command \"%s\": %s",
582 "START_REPLICATION", PQresultErrorMessage(res));
583 PQclear(res);
584 return false;
585 }
586 PQclear(res);
587
588 /* Stream the WAL */
589 res = HandleCopyStream(conn, stream, &stoppos);
590 if (res == NULL)
591 goto error;
592
593 /*
594 * Streaming finished.
595 *
596 * There are two possible reasons for that: a controlled shutdown, or
597 * we reached the end of the current timeline. In case of
598 * end-of-timeline, the server sends a result set after Copy has
599 * finished, containing information about the next timeline. Read
600 * that, and restart streaming from the next timeline. In case of
601 * controlled shutdown, stop here.
602 */
604 {
605 /*
606 * End-of-timeline. Read the next timeline's ID and starting
607 * position. Usually, the starting position will match the end of
608 * the previous timeline, but there are corner cases like if the
609 * server had sent us half of a WAL record, when it was promoted.
610 * The new timeline will begin at the end of the last complete
611 * record in that case, overlapping the partial WAL record on the
612 * old timeline.
613 */
614 uint32 newtimeline;
615 bool parsed;
616
617 parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
618 PQclear(res);
619 if (!parsed)
620 goto error;
621
622 /* Sanity check the values the server gave us */
623 if (newtimeline <= stream->timeline)
624 {
625 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
626 newtimeline, stream->timeline);
627 goto error;
628 }
629 if (stream->startpos > stoppos)
630 {
631 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
632 stream->timeline, LSN_FORMAT_ARGS(stoppos),
633 newtimeline, LSN_FORMAT_ARGS(stream->startpos));
634 goto error;
635 }
636
637 /* Read the final result, which should be CommandComplete. */
640 {
641 pg_log_error("unexpected termination of replication stream: %s",
643 PQclear(res);
644 goto error;
645 }
646 PQclear(res);
647
648 /*
649 * Loop back to start streaming from the new timeline. Always
650 * start streaming at the beginning of a segment.
651 */
652 stream->timeline = newtimeline;
653 stream->startpos = stream->startpos -
655 continue;
656 }
658 {
659 PQclear(res);
660
661 /*
662 * End of replication (ie. controlled shut down of the server).
663 *
664 * Check if the callback thinks it's OK to stop here. If not,
665 * complain.
666 */
667 if (stream->stream_stop(stoppos, stream->timeline, false))
668 return true;
669 else
670 {
671 pg_log_error("replication stream was terminated before stop point");
672 goto error;
673 }
674 }
675 else
676 {
677 /* Server returned an error. */
678 pg_log_error("unexpected termination of replication stream: %s",
680 PQclear(res);
681 goto error;
682 }
683 }
684
685error:
686 if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
687 pg_log_error("could not close file \"%s\": %s",
689 walfile = NULL;
690 return false;
691}
692
693/*
694 * Helper function to parse the result set returned by server after streaming
695 * has finished. On failure, prints an error to stderr and returns false.
696 */
697static bool
699{
700 uint32 startpos_xlogid,
701 startpos_xrecoff;
702
703 /*----------
704 * The result set consists of one row and two columns, e.g:
705 *
706 * next_tli | next_tli_startpos
707 * ----------+-------------------
708 * 4 | 0/9949AE0
709 *
710 * next_tli is the timeline ID of the next timeline after the one that
711 * just finished streaming. next_tli_startpos is the WAL location where
712 * the server switched to it.
713 *----------
714 */
715 if (PQnfields(res) < 2 || PQntuples(res) != 1)
716 {
717 pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
718 PQntuples(res), PQnfields(res), 1, 2);
719 return false;
720 }
721
722 *timeline = atoi(PQgetvalue(res, 0, 0));
723 if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
724 &startpos_xrecoff) != 2)
725 {
726 pg_log_error("could not parse next timeline's starting point \"%s\"",
727 PQgetvalue(res, 0, 1));
728 return false;
729 }
730 *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
731
732 return true;
733}
734
735/*
736 * The main loop of ReceiveXlogStream. Handles the COPY stream after
737 * initiating streaming with the START_REPLICATION command.
738 *
739 * If the COPY ends (not necessarily successfully) due a message from the
740 * server, returns a PGresult and sets *stoppos to the last byte written.
741 * On any other sort of error, returns NULL.
742 */
743static PGresult *
745 XLogRecPtr *stoppos)
746{
747 char *copybuf = NULL;
748 TimestampTz last_status = -1;
749 XLogRecPtr blockpos = stream->startpos;
750
751 still_sending = true;
752
753 while (1)
754 {
755 int r;
757 long sleeptime;
758
759 /*
760 * Check if we should continue streaming, or abort at this point.
761 */
762 if (!CheckCopyStreamStop(conn, stream, blockpos))
763 goto error;
764
766
767 /*
768 * If synchronous option is true, issue sync command as soon as there
769 * are WAL data which has not been flushed yet.
770 */
771 if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
772 {
773 if (stream->walmethod->ops->sync(walfile) != 0)
774 pg_fatal("could not fsync file \"%s\": %s",
776 lastFlushPosition = blockpos;
777
778 /*
779 * Send feedback so that the server sees the latest WAL locations
780 * immediately.
781 */
782 if (!sendFeedback(conn, blockpos, now, false))
783 goto error;
784 last_status = now;
785 }
786
787 /*
788 * Potentially send a status message to the primary
789 */
790 if (still_sending && stream->standby_message_timeout > 0 &&
793 {
794 /* Time to send feedback! */
795 if (!sendFeedback(conn, blockpos, now, false))
796 goto error;
797 last_status = now;
798 }
799
800 /*
801 * Calculate how long send/receive loops should sleep
802 */
804 last_status);
805
806 /* Done with any prior message */
808 copybuf = NULL;
809
810 r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
811 while (r != 0)
812 {
813 if (r == -1)
814 goto error;
815 if (r == -2)
816 {
817 PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
818
819 if (res == NULL)
820 goto error;
822 return res;
823 }
824
825 /* Check the message type. */
826 if (copybuf[0] == 'k')
827 {
828 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
829 &last_status))
830 goto error;
831 }
832 else if (copybuf[0] == 'w')
833 {
834 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
835 goto error;
836
837 /*
838 * Check if we should continue streaming, or abort at this
839 * point.
840 */
841 if (!CheckCopyStreamStop(conn, stream, blockpos))
842 goto error;
843 }
844 else
845 {
846 pg_log_error("unrecognized streaming header: \"%c\"",
847 copybuf[0]);
848 goto error;
849 }
850
851 /* Done with that message */
853 copybuf = NULL;
854
855 /*
856 * Process the received data, and any subsequent data we can read
857 * without blocking.
858 */
859 r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
860 }
861 }
862
863error:
865 return NULL;
866}
867
868/*
869 * Wait until we can read a CopyData message,
870 * or timeout, or occurrence of a signal or input on the stop_socket.
871 * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
872 *
873 * Returns 1 if data has become available for reading, 0 if timed out
874 * or interrupted by signal or stop_socket input, and -1 on an error.
875 */
876static int
877CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
878{
879 int ret;
880 fd_set input_mask;
881 int connsocket;
882 int maxfd;
883 struct timeval timeout;
884 struct timeval *timeoutptr;
885
886 connsocket = PQsocket(conn);
887 if (connsocket < 0)
888 {
889 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
890 return -1;
891 }
892
893 FD_ZERO(&input_mask);
894 FD_SET(connsocket, &input_mask);
895 maxfd = connsocket;
896 if (stop_socket != PGINVALID_SOCKET)
897 {
898 FD_SET(stop_socket, &input_mask);
899 maxfd = Max(maxfd, stop_socket);
900 }
901
902 if (timeout_ms < 0)
903 timeoutptr = NULL;
904 else
905 {
906 timeout.tv_sec = timeout_ms / 1000L;
907 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
908 timeoutptr = &timeout;
909 }
910
911 ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
912
913 if (ret < 0)
914 {
915 if (errno == EINTR)
916 return 0; /* Got a signal, so not an error */
917 pg_log_error("%s() failed: %m", "select");
918 return -1;
919 }
920 if (ret > 0 && FD_ISSET(connsocket, &input_mask))
921 return 1; /* Got input on connection socket */
922
923 return 0; /* Got timeout or input on stop_socket */
924}
925
926/*
927 * Receive CopyData message available from XLOG stream, blocking for
928 * maximum of 'timeout' ms.
929 *
930 * If data was received, returns the length of the data. *buffer is set to
931 * point to a buffer holding the received message. The caller must eventually
932 * free the buffer with PQfreemem().
933 *
934 * Returns 0 if no data was available within timeout, or if wait was
935 * interrupted by signal or stop_socket input.
936 * -1 on error. -2 if the server ended the COPY.
937 */
938static int
939CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
940 char **buffer)
941{
942 char *copybuf = NULL;
943 int rawlen;
944
945 /* Caller should have cleared any prior buffer */
946 Assert(*buffer == NULL);
947
948 /* Try to receive a CopyData message */
949 rawlen = PQgetCopyData(conn, &copybuf, 1);
950 if (rawlen == 0)
951 {
952 int ret;
953
954 /*
955 * No data available. Wait for some to appear, but not longer than
956 * the specified timeout, so that we can ping the server. Also stop
957 * waiting if input appears on stop_socket.
958 */
959 ret = CopyStreamPoll(conn, timeout, stop_socket);
960 if (ret <= 0)
961 return ret;
962
963 /* Now there is actually data on the socket */
964 if (PQconsumeInput(conn) == 0)
965 {
966 pg_log_error("could not receive data from WAL stream: %s",
968 return -1;
969 }
970
971 /* Now that we've consumed some input, try again */
972 rawlen = PQgetCopyData(conn, &copybuf, 1);
973 if (rawlen == 0)
974 return 0;
975 }
976 if (rawlen == -1) /* end-of-streaming or error */
977 return -2;
978 if (rawlen == -2)
979 {
980 pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
981 return -1;
982 }
983
984 /* Return received messages to caller */
985 *buffer = copybuf;
986 return rawlen;
987}
988
989/*
990 * Process the keepalive message.
991 */
992static bool
994 XLogRecPtr blockpos, TimestampTz *last_status)
995{
996 int pos;
997 bool replyRequested;
999
1000 /*
1001 * Parse the keepalive message, enclosed in the CopyData message. We just
1002 * check if the server requested a reply, and ignore the rest.
1003 */
1004 pos = 1; /* skip msgtype 'k' */
1005 pos += 8; /* skip walEnd */
1006 pos += 8; /* skip sendTime */
1007
1008 if (len < pos + 1)
1009 {
1010 pg_log_error("streaming header too small: %d", len);
1011 return false;
1012 }
1013 replyRequested = copybuf[pos];
1014
1015 /* If the server requested an immediate reply, send one. */
1016 if (replyRequested && still_sending)
1017 {
1018 if (reportFlushPosition && lastFlushPosition < blockpos &&
1019 walfile != NULL)
1020 {
1021 /*
1022 * If a valid flush location needs to be reported, flush the
1023 * current WAL file so that the latest flush location is sent back
1024 * to the server. This is necessary to see whether the last WAL
1025 * data has been successfully replicated or not, at the normal
1026 * shutdown of the server.
1027 */
1028 if (stream->walmethod->ops->sync(walfile) != 0)
1029 pg_fatal("could not fsync file \"%s\": %s",
1031 lastFlushPosition = blockpos;
1032 }
1033
1035 if (!sendFeedback(conn, blockpos, now, false))
1036 return false;
1037 *last_status = now;
1038 }
1039
1040 return true;
1041}
1042
1043/*
1044 * Process XLogData message.
1045 */
1046static bool
1048 XLogRecPtr *blockpos)
1049{
1050 int xlogoff;
1051 int bytes_left;
1052 int bytes_written;
1053 int hdr_len;
1054
1055 /*
1056 * Once we've decided we don't want to receive any more, just ignore any
1057 * subsequent XLogData messages.
1058 */
1059 if (!(still_sending))
1060 return true;
1061
1062 /*
1063 * Read the header of the XLogData message, enclosed in the CopyData
1064 * message. We only need the WAL location field (dataStart), the rest of
1065 * the header is ignored.
1066 */
1067 hdr_len = 1; /* msgtype 'w' */
1068 hdr_len += 8; /* dataStart */
1069 hdr_len += 8; /* walEnd */
1070 hdr_len += 8; /* sendTime */
1071 if (len < hdr_len)
1072 {
1073 pg_log_error("streaming header too small: %d", len);
1074 return false;
1075 }
1076 *blockpos = fe_recvint64(&copybuf[1]);
1077
1078 /* Extract WAL location for this block */
1079 xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1080
1081 /*
1082 * Verify that the initial location in the stream matches where we think
1083 * we are.
1084 */
1085 if (walfile == NULL)
1086 {
1087 /* No file open yet */
1088 if (xlogoff != 0)
1089 {
1090 pg_log_error("received write-ahead log record for offset %u with no file open",
1091 xlogoff);
1092 return false;
1093 }
1094 }
1095 else
1096 {
1097 /* More data in existing segment */
1098 if (walfile->currpos != xlogoff)
1099 {
1100 pg_log_error("got WAL data offset %08x, expected %08x",
1101 xlogoff, (int) walfile->currpos);
1102 return false;
1103 }
1104 }
1105
1106 bytes_left = len - hdr_len;
1107 bytes_written = 0;
1108
1109 while (bytes_left)
1110 {
1111 int bytes_to_write;
1112
1113 /*
1114 * If crossing a WAL boundary, only write up until we reach wal
1115 * segment size.
1116 */
1117 if (xlogoff + bytes_left > WalSegSz)
1118 bytes_to_write = WalSegSz - xlogoff;
1119 else
1120 bytes_to_write = bytes_left;
1121
1122 if (walfile == NULL)
1123 {
1124 if (!open_walfile(stream, *blockpos))
1125 {
1126 /* Error logged by open_walfile */
1127 return false;
1128 }
1129 }
1130
1131 if (stream->walmethod->ops->write(walfile,
1132 copybuf + hdr_len + bytes_written,
1133 bytes_to_write) != bytes_to_write)
1134 {
1135 pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1136 bytes_to_write, walfile->pathname,
1138 return false;
1139 }
1140
1141 /* Write was successful, advance our position */
1142 bytes_written += bytes_to_write;
1143 bytes_left -= bytes_to_write;
1144 *blockpos += bytes_to_write;
1145 xlogoff += bytes_to_write;
1146
1147 /* Did we reach the end of a WAL segment? */
1148 if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1149 {
1150 if (!close_walfile(stream, *blockpos))
1151 /* Error message written in close_walfile() */
1152 return false;
1153
1154 xlogoff = 0;
1155
1156 if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1157 {
1158 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1159 {
1160 pg_log_error("could not send copy-end packet: %s",
1162 return false;
1163 }
1164 still_sending = false;
1165 return true; /* ignore the rest of this XLogData packet */
1166 }
1167 }
1168 }
1169 /* No more data left to write, receive next copy packet */
1170
1171 return true;
1172}
1173
1174/*
1175 * Handle end of the copy stream.
1176 */
1177static PGresult *
1179 XLogRecPtr blockpos, XLogRecPtr *stoppos)
1180{
1182
1183 /*
1184 * The server closed its end of the copy stream. If we haven't closed
1185 * ours already, we need to do so now, unless the server threw an error,
1186 * in which case we don't.
1187 */
1188 if (still_sending)
1189 {
1190 if (!close_walfile(stream, blockpos))
1191 {
1192 /* Error message written in close_walfile() */
1193 PQclear(res);
1194 return NULL;
1195 }
1197 {
1198 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1199 {
1200 pg_log_error("could not send copy-end packet: %s",
1202 PQclear(res);
1203 return NULL;
1204 }
1205 res = PQgetResult(conn);
1206 }
1207 still_sending = false;
1208 }
1209 *stoppos = blockpos;
1210 return res;
1211}
1212
1213/*
1214 * Check if we should continue streaming, or abort at this point.
1215 */
1216static bool
1218{
1219 if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1220 {
1221 if (!close_walfile(stream, blockpos))
1222 {
1223 /* Potential error message is written by close_walfile */
1224 return false;
1225 }
1226 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1227 {
1228 pg_log_error("could not send copy-end packet: %s",
1230 return false;
1231 }
1232 still_sending = false;
1233 }
1234
1235 return true;
1236}
1237
1238/*
1239 * Calculate how long send/receive loops should sleep
1240 */
1241static long
1243 TimestampTz last_status)
1244{
1245 TimestampTz status_targettime = 0;
1246 long sleeptime;
1247
1249 status_targettime = last_status +
1250 (standby_message_timeout - 1) * ((int64) 1000);
1251
1252 if (status_targettime > 0)
1253 {
1254 long secs;
1255 int usecs;
1256
1258 status_targettime,
1259 &secs,
1260 &usecs);
1261 /* Always sleep at least 1 sec */
1262 if (secs <= 0)
1263 {
1264 secs = 1;
1265 usecs = 0;
1266 }
1267
1268 sleeptime = secs * 1000 + usecs / 1000;
1269 }
1270 else
1271 sleeptime = -1;
1272
1273 return sleeptime;
1274}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
#define ngettext(s, p, n)
Definition: c.h:1138
#define Max(x, y)
Definition: c.h:955
#define Assert(condition)
Definition: c.h:815
int64_t int64
Definition: c.h:485
uint64_t uint64
Definition: c.h:489
uint32_t uint32
Definition: c.h:488
@ PG_COMPRESSION_NONE
Definition: compression.h:23
int64 TimestampTz
Definition: timestamp.h:39
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7497
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:7462
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7507
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7533
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2749
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2695
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2816
void pg_free(void *ptr)
Definition: fe_memutils.c:105
@ PGRES_COPY_IN
Definition: libpq-fe.h:129
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:134
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:122
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:125
exit(1)
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_info(...)
Definition: logging.h:124
#define pg_fatal(...)
static int standby_message_timeout
#define MAXPGPATH
const void size_t len
static char * filename
Definition: pg_dumpall.c:119
static XLogRecPtr startpos
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:241
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:239
#define PGINVALID_SOCKET
Definition: port.h:31
#define pgoff_t
Definition: port.h:401
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:744
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:89
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:53
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
Definition: receivelog.c:939
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
Definition: receivelog.c:1217
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:993
static bool reportFlushPosition
Definition: receivelog.c:29
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1047
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:191
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:257
static bool still_sending
Definition: receivelog.c:32
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:274
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
Definition: receivelog.c:452
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1178
static Walfile * walfile
Definition: receivelog.c:28
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1242
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:374
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:336
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:30
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
Definition: receivelog.c:877
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:698
static pg_noinline void Size size
Definition: slab.c:607
static void error(void)
Definition: sql-dyntest.c:147
int WalSegSz
Definition: streamutil.c:32
int64 fe_recvint64(char *buf)
Definition: streamutil.c:932
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:867
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:886
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:921
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:908
PGconn * conn
Definition: streamutil.c:53
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:478
char * sysidentifier
Definition: receivelog.h:33
TimeLineID timeline
Definition: receivelog.h:32
stream_stop_callback stream_stop
Definition: receivelog.h:41
char * replication_slot
Definition: receivelog.h:48
XLogRecPtr startpos
Definition: receivelog.h:31
pgsocket stop_socket
Definition: receivelog.h:43
int standby_message_timeout
Definition: receivelog.h:35
WalWriteMethod * walmethod
Definition: receivelog.h:46
bool mark_done
Definition: receivelog.h:37
char * partial_suffix
Definition: receivelog.h:47
bool synchronous
Definition: receivelog.h:36
bool(* existsfile)(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.h:58
ssize_t(* write)(Walfile *f, const void *buf, size_t count)
Definition: walmethods.h:73
ssize_t(* get_file_size)(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.h:61
int(* close)(Walfile *f, WalCloseMethod method)
Definition: walmethods.h:55
char *(* get_file_name)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Definition: walmethods.h:67
int(* sync)(Walfile *f)
Definition: walmethods.h:78
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:49
const WalWriteMethodOps * ops
Definition: walmethods.h:105
pg_compress_algorithm compression_algorithm
Definition: walmethods.h:106
pgoff_t currpos
Definition: walmethods.h:20
char * pathname
Definition: walmethods.h:21
static StringInfo copybuf
Definition: tablesync.c:137
static void * fn(void *arg)
Definition: thread-alloc.c:119
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
Definition: walmethods.c:1383
@ CLOSE_UNLINK
Definition: walmethods.h:34
@ CLOSE_NO_RENAME
Definition: walmethods.h:35
@ CLOSE_NORMAL
Definition: walmethods.h:33
#define EINTR
Definition: win32_port.h:364
#define select(n, r, w, e, timeout)
Definition: win32_port.h:503
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define MAXFNAMELEN
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48