PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_recvlogical.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 * fashion and write it to a local file.
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_recvlogical.c
10 *-------------------------------------------------------------------------
11 */
12
13#include "postgres_fe.h"
14
15#include <dirent.h>
16#include <limits.h>
17#include <sys/select.h>
18#include <sys/stat.h>
19#include <unistd.h>
20
21#include "common/file_perm.h"
22#include "common/logging.h"
24#include "getopt_long.h"
25#include "libpq-fe.h"
26#include "libpq/pqsignal.h"
27#include "libpq/protocol.h"
28#include "pqexpbuffer.h"
29#include "streamutil.h"
30
31/* Time to sleep between reconnection attempts */
32#define RECONNECT_SLEEP_TIME 5
33
41
42/* Global Options */
43static char *outfile = NULL;
44static int verbose = 0;
45static bool two_phase = false; /* enable-two-phase option */
46static bool failover = false; /* enable-failover option */
47static int noloop = 0;
48static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
49static int fsync_interval = 10 * 1000; /* 10 sec = default */
52static bool do_create_slot = false;
53static bool slot_exists_ok = false;
54static bool do_start_slot = false;
55static bool do_drop_slot = false;
56static char *replication_slot = NULL;
57
58/* filled pairwise with option, value. value may be NULL */
59static char **options;
60static size_t noptions = 0;
61static const char *plugin = "test_decoding";
62
63/* Global State */
64static int outfd = -1;
65static volatile sig_atomic_t time_to_abort = false;
67static volatile sig_atomic_t output_reopen = false;
68static bool output_isfile;
70static bool output_needs_fsync = false;
73
74static void usage(void);
75static void StreamLogicalLog(void);
78 StreamStopReason reason,
79 XLogRecPtr lsn);
80
81static void
82usage(void)
83{
84 printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
85 progname);
86 printf(_("Usage:\n"));
87 printf(_(" %s [OPTION]...\n"), progname);
88 printf(_("\nAction to be performed:\n"));
89 printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
90 printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
91 printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
92 printf(_("\nOptions:\n"));
93 printf(_(" --enable-failover enable replication slot synchronization to standby servers when\n"
94 " creating a replication slot\n"));
95 printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
96 printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
97 printf(_(" -F --fsync-interval=SECS\n"
98 " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
99 printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
100 printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
101 printf(_(" -n, --no-loop do not loop on connection lost\n"));
102 printf(_(" -o, --option=NAME[=VALUE]\n"
103 " pass option NAME with optional value VALUE to the\n"
104 " output plugin\n"));
105 printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
106 printf(_(" -s, --status-interval=SECS\n"
107 " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
108 printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
109 printf(_(" -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
110 printf(_(" --two-phase (same as --enable-two-phase, deprecated)\n"));
111 printf(_(" -v, --verbose output verbose messages\n"));
112 printf(_(" -V, --version output version information, then exit\n"));
113 printf(_(" -?, --help show this help, then exit\n"));
114 printf(_("\nConnection options:\n"));
115 printf(_(" -d, --dbname=DBNAME database to connect to\n"));
116 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
117 printf(_(" -p, --port=PORT database server port number\n"));
118 printf(_(" -U, --username=NAME connect as specified database user\n"));
119 printf(_(" -w, --no-password never prompt for password\n"));
120 printf(_(" -W, --password force password prompt (should happen automatically)\n"));
121 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
122 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
123}
124
125/*
126 * Send a Standby Status Update message to server.
127 */
128static bool
130{
133
134 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
135 int len = 0;
136
137 /*
138 * we normally don't want to send superfluous feedback, but if it's
139 * because of a timeout we need to, otherwise wal_sender_timeout will kill
140 * us.
141 */
142 if (!force &&
145 return true;
146
147 if (verbose)
148 pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
152
154 len += 1;
156 len += 8;
158 len += 8;
160 len += 8;
161 fe_sendint64(now, &replybuf[len]); /* sendTime */
162 len += 8;
163 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
164 len += 1;
165
169
170 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
171 {
172 pg_log_error("could not send feedback packet: %s",
174 return false;
175 }
176
177 return true;
178}
179
180static void
182{
183 if (conn != NULL)
184 PQfinish(conn);
185}
186
187static void
189{
191
193
194 /*
195 * Save the last flushed position as the replication start point. On
196 * reconnect, replication resumes from there to avoid re-sending flushed
197 * data.
198 */
200
201 if (fsync_interval <= 0)
202 return;
203
205 return;
206
207 output_needs_fsync = false;
208
209 /* can only fsync if it's a regular file */
210 if (!output_isfile)
211 return;
212
213 if (fsync(outfd) != 0)
214 pg_fatal("could not fsync file \"%s\": %m", outfile);
215}
216
217/*
218 * Start the log streaming
219 */
220static void
222{
223 PGresult *res;
224 char *copybuf = NULL;
226 int i;
227 PQExpBuffer query;
229
231
232 /*
233 * Connect in replication mode to the server
234 */
235 if (!conn)
237 if (!conn)
238 /* Error message already written in GetConnection() */
239 return;
240
241 /*
242 * Start the replication
243 */
244 if (verbose)
245 pg_log_info("starting log streaming at %X/%08X (slot %s)",
248
249 /* Initiate the replication stream at specified location */
250 query = createPQExpBuffer();
251 appendPQExpBufferStr(query, "START_REPLICATION SLOT ");
253 appendPQExpBuffer(query, " LOGICAL %X/%08X", LSN_FORMAT_ARGS(startpos));
254
255 /* print options if there are any */
256 if (noptions)
257 appendPQExpBufferStr(query, " (");
258
259 for (i = 0; i < noptions; i++)
260 {
261 /* separator */
262 if (i > 0)
263 appendPQExpBufferStr(query, ", ");
264
265 /* write option name */
266 AppendQuotedIdentifier(query, options[i * 2]);
267
268 /* write option value if specified */
269 if (options[i * 2 + 1] != NULL)
270 {
271 appendPQExpBufferChar(query, ' ');
272 AppendQuotedLiteral(query, options[i * 2 + 1]);
273 }
274 }
275
276 if (noptions)
277 appendPQExpBufferChar(query, ')');
278
279 res = PQexec(conn, query->data);
281 {
282 pg_log_error("could not send replication command \"%s\": %s",
283 query->data, PQresultErrorMessage(res));
284 PQclear(res);
285 goto error;
286 }
287 PQclear(res);
288 resetPQExpBuffer(query);
289
290 if (verbose)
291 pg_log_info("streaming initiated");
292
293 while (!time_to_abort)
294 {
295 int r;
296 int bytes_left;
297 int bytes_written;
299 int hdr_len;
300
302
303 if (copybuf != NULL)
304 {
306 copybuf = NULL;
307 }
308
309 /*
310 * Potentially send a status message to the primary.
311 */
313
314 if (outfd != -1 &&
318
319 if (standby_message_timeout > 0 &&
322 {
323 /* Time to send feedback! */
324 if (!sendFeedback(conn, now, true, false))
325 goto error;
326
328 }
329
330 /* got SIGHUP, close output file */
331 if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
332 {
335 close(outfd);
336 outfd = -1;
337 }
338 output_reopen = false;
339
340 /* open the output file, if not open yet */
341 if (outfd == -1)
342 {
343 struct stat statbuf;
344
345 if (strcmp(outfile, "-") == 0)
346 outfd = fileno(stdout);
347 else
350 if (outfd == -1)
351 {
352 pg_log_error("could not open log file \"%s\": %m", outfile);
353 goto error;
354 }
355
356 if (fstat(outfd, &statbuf) != 0)
357 {
358 pg_log_error("could not stat file \"%s\": %m", outfile);
359 goto error;
360 }
361
362 output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
363 }
364
365 r = PQgetCopyData(conn, &copybuf, 1);
366 if (r == 0)
367 {
368 /*
369 * In async mode, and no data available. We block on reading but
370 * not more than the specified timeout, so that we can send a
371 * response back to the client.
372 */
376 struct timeval timeout;
377 struct timeval *timeoutptr = NULL;
378
379 if (PQsocket(conn) < 0)
380 {
381 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
382 goto error;
383 }
384
387
388 /* Compute when we need to wakeup to send a keepalive message. */
391 ((int64) 1000);
392
393 /* Compute when we need to wakeup to fsync the output file. */
396 ((int64) 1000);
397
398 /* Now compute when to wakeup. */
399 if (message_target > 0 || fsync_target > 0)
400 {
402 long secs;
403 int usecs;
404
406
409
412 &secs,
413 &usecs);
414 if (secs <= 0)
415 timeout.tv_sec = 1; /* Always sleep at least 1 sec */
416 else
417 timeout.tv_sec = secs;
418 timeout.tv_usec = usecs;
420 }
421
423 if (r == 0 || (r < 0 && errno == EINTR))
424 {
425 /*
426 * Got a timeout or signal. Continue the loop and either
427 * deliver a status packet to the server or just go back into
428 * blocking.
429 */
430 continue;
431 }
432 else if (r < 0)
433 {
434 pg_log_error("%s() failed: %m", "select");
435 goto error;
436 }
437
438 /* Else there is actually data on the socket */
439 if (PQconsumeInput(conn) == 0)
440 {
441 pg_log_error("could not receive data from WAL stream: %s",
443 goto error;
444 }
445 continue;
446 }
447
448 /* End of copy stream */
449 if (r == -1)
450 break;
451
452 /* Failure while reading the copy stream */
453 if (r == -2)
454 {
455 pg_log_error("could not read COPY data: %s",
457 goto error;
458 }
459
460 /* Check the message type. */
462 {
463 int pos;
464 bool replyRequested;
466 bool endposReached = false;
467
468 /*
469 * Parse the keepalive message, enclosed in the CopyData message.
470 * We just check if the server requested a reply, and ignore the
471 * rest.
472 */
473 pos = 1; /* skip msgtype PqReplMsg_Keepalive */
474 walEnd = fe_recvint64(&copybuf[pos]);
476
477 pos += 8; /* read walEnd */
478
479 pos += 8; /* skip sendTime */
480
481 if (r < pos + 1)
482 {
483 pg_log_error("streaming header too small: %d", r);
484 goto error;
485 }
486 replyRequested = copybuf[pos];
487
489 {
490 /*
491 * If there's nothing to read on the socket until a keepalive
492 * we know that the server has nothing to send us; and if
493 * walEnd has passed endpos, we know nothing else can have
494 * committed before endpos. So we can bail out now.
495 */
496 endposReached = true;
497 }
498
499 /* Send a reply, if necessary */
501 {
503 goto error;
505 }
506
507 if (endposReached)
508 {
510 time_to_abort = true;
511 break;
512 }
513
514 continue;
515 }
516 else if (copybuf[0] != PqReplMsg_WALData)
517 {
518 pg_log_error("unrecognized streaming header: \"%c\"",
519 copybuf[0]);
520 goto error;
521 }
522
523 /*
524 * Read the header of the WALData message, enclosed in the CopyData
525 * message. We only need the WAL location field (dataStart), the rest
526 * of the header is ignored.
527 */
528 hdr_len = 1; /* msgtype PqReplMsg_WALData */
529 hdr_len += 8; /* dataStart */
530 hdr_len += 8; /* walEnd */
531 hdr_len += 8; /* sendTime */
532 if (r < hdr_len + 1)
533 {
534 pg_log_error("streaming header too small: %d", r);
535 goto error;
536 }
537
538 /* Extract WAL location for this block */
540
542 {
543 /*
544 * We've read past our endpoint, so prepare to go away being
545 * cautious about what happens to our output data.
546 */
548 goto error;
550 time_to_abort = true;
551 break;
552 }
553
555
556 bytes_left = r - hdr_len;
557 bytes_written = 0;
558
559 /* signal that a fsync is needed */
560 output_needs_fsync = true;
561
562 while (bytes_left)
563 {
564 int ret;
565
566 ret = write(outfd,
568 bytes_left);
569
570 if (ret < 0)
571 {
572 pg_log_error("could not write %d bytes to log file \"%s\": %m",
574 goto error;
575 }
576
577 /* Write was successful, advance our position */
578 bytes_written += ret;
579 bytes_left -= ret;
580 }
581
582 if (write(outfd, "\n", 1) != 1)
583 {
584 pg_log_error("could not write %d bytes to log file \"%s\": %m",
585 1, outfile);
586 goto error;
587 }
588
590 {
591 /* endpos was exactly the record we just processed, we're done */
593 goto error;
595 time_to_abort = true;
596 break;
597 }
598 }
599
600 /* Clean up connection state if stream has been aborted */
601 if (time_to_abort)
603
604 res = PQgetResult(conn);
605 if (PQresultStatus(res) == PGRES_COPY_OUT)
606 {
607 PQclear(res);
608
609 /*
610 * We're doing a client-initiated clean exit and have sent CopyDone to
611 * the server. Drain any messages, so we don't miss a last-minute
612 * ErrorResponse. The walsender stops generating WALData records once
613 * it sees CopyDone, so expect this to finish quickly. After CopyDone,
614 * it's too late for sendFeedback(), even if this were to take a long
615 * time. Hence, use synchronous-mode PQgetCopyData().
616 */
617 while (1)
618 {
619 int r;
620
621 if (copybuf != NULL)
622 {
624 copybuf = NULL;
625 }
626 r = PQgetCopyData(conn, &copybuf, 0);
627 if (r == -1)
628 break;
629 if (r == -2)
630 {
631 pg_log_error("could not read COPY data: %s",
633 time_to_abort = false; /* unclean exit */
634 goto error;
635 }
636 }
637
638 res = PQgetResult(conn);
639 }
641 {
642 pg_log_error("unexpected termination of replication stream: %s",
644 PQclear(res);
645 goto error;
646 }
647 PQclear(res);
648
649 if (outfd != -1 && strcmp(outfile, "-") != 0)
650 {
652
653 OutputFsync(t);
654 if (close(outfd) != 0)
655 pg_log_error("could not close file \"%s\": %m", outfile);
656 }
657 outfd = -1;
658error:
659 if (copybuf != NULL)
660 {
662 copybuf = NULL;
663 }
664 destroyPQExpBuffer(query);
665 PQfinish(conn);
666 conn = NULL;
667}
668
669/*
670 * Unfortunately we can't do sensible signal handling on windows...
671 */
672#ifndef WIN32
673
674/*
675 * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
676 * possible moment.
677 */
678static void
684
685/*
686 * Trigger the output file to be reopened.
687 */
688static void
693#endif
694
695
696int
697main(int argc, char **argv)
698{
699 static struct option long_options[] = {
700/* general options */
701 {"file", required_argument, NULL, 'f'},
702 {"fsync-interval", required_argument, NULL, 'F'},
703 {"no-loop", no_argument, NULL, 'n'},
704 {"enable-failover", no_argument, NULL, 5},
705 {"enable-two-phase", no_argument, NULL, 't'},
706 {"two-phase", no_argument, NULL, 't'}, /* deprecated */
707 {"verbose", no_argument, NULL, 'v'},
708 {"version", no_argument, NULL, 'V'},
709 {"help", no_argument, NULL, '?'},
710/* connection options */
711 {"dbname", required_argument, NULL, 'd'},
712 {"host", required_argument, NULL, 'h'},
713 {"port", required_argument, NULL, 'p'},
714 {"username", required_argument, NULL, 'U'},
715 {"no-password", no_argument, NULL, 'w'},
716 {"password", no_argument, NULL, 'W'},
717/* replication options */
718 {"startpos", required_argument, NULL, 'I'},
719 {"endpos", required_argument, NULL, 'E'},
720 {"option", required_argument, NULL, 'o'},
721 {"plugin", required_argument, NULL, 'P'},
722 {"status-interval", required_argument, NULL, 's'},
723 {"slot", required_argument, NULL, 'S'},
724/* action */
725 {"create-slot", no_argument, NULL, 1},
726 {"start", no_argument, NULL, 2},
727 {"drop-slot", no_argument, NULL, 3},
728 {"if-not-exists", no_argument, NULL, 4},
729 {NULL, 0, NULL, 0}
730 };
731 int c;
732 int option_index;
733 uint32 hi,
734 lo;
735 char *db_name;
736
737 pg_logging_init(argv[0]);
738 progname = get_progname(argv[0]);
739 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
740
741 if (argc > 1)
742 {
743 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
744 {
745 usage();
746 exit(0);
747 }
748 else if (strcmp(argv[1], "-V") == 0 ||
749 strcmp(argv[1], "--version") == 0)
750 {
751 puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
752 exit(0);
753 }
754 }
755
756 while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
757 long_options, &option_index)) != -1)
758 {
759 switch (c)
760 {
761/* general options */
762 case 'f':
764 break;
765 case 'F':
766 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
767 INT_MAX / 1000,
769 exit(1);
770 fsync_interval *= 1000;
771 break;
772 case 'n':
773 noloop = 1;
774 break;
775 case 't':
776 two_phase = true;
777 break;
778 case 'v':
779 verbose++;
780 break;
781 case 5:
782 failover = true;
783 break;
784/* connection options */
785 case 'd':
787 break;
788 case 'h':
790 break;
791 case 'p':
793 break;
794 case 'U':
796 break;
797 case 'w':
798 dbgetpassword = -1;
799 break;
800 case 'W':
801 dbgetpassword = 1;
802 break;
803/* replication options */
804 case 'I':
805 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
806 pg_fatal("could not parse start position \"%s\"", optarg);
807 startpos = ((uint64) hi) << 32 | lo;
808 break;
809 case 'E':
810 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
811 pg_fatal("could not parse end position \"%s\"", optarg);
812 endpos = ((uint64) hi) << 32 | lo;
813 break;
814 case 'o':
815 {
816 char *data = pg_strdup(optarg);
817 char *val = strchr(data, '=');
818
819 if (val != NULL)
820 {
821 /* remove =; separate data from val */
822 *val = '\0';
823 val++;
824 }
825
826 noptions += 1;
828
829 options[(noptions - 1) * 2] = data;
830 options[(noptions - 1) * 2 + 1] = val;
831 }
832
833 break;
834 case 'P':
836 break;
837 case 's':
838 if (!option_parse_int(optarg, "-s/--status-interval", 0,
839 INT_MAX / 1000,
841 exit(1);
843 break;
844 case 'S':
846 break;
847/* action */
848 case 1:
849 do_create_slot = true;
850 break;
851 case 2:
852 do_start_slot = true;
853 break;
854 case 3:
855 do_drop_slot = true;
856 break;
857 case 4:
858 slot_exists_ok = true;
859 break;
860
861 default:
862 /* getopt_long already emitted a complaint */
863 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
864 exit(1);
865 }
866 }
867
868 /*
869 * Any non-option arguments?
870 */
871 if (optind < argc)
872 {
873 pg_log_error("too many command-line arguments (first is \"%s\")",
874 argv[optind]);
875 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
876 exit(1);
877 }
878
879 /*
880 * Required arguments
881 */
882 if (replication_slot == NULL)
883 {
884 pg_log_error("no slot specified");
885 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
886 exit(1);
887 }
888
889 if (do_start_slot && outfile == NULL)
890 {
891 pg_log_error("no target file specified");
892 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
893 exit(1);
894 }
895
896 if (!do_drop_slot && dbname == NULL)
897 {
898 pg_log_error("no database specified");
899 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
900 exit(1);
901 }
902
904 {
905 pg_log_error("at least one action needs to be specified");
906 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
907 exit(1);
908 }
909
911 {
912 pg_log_error("cannot use --create-slot or --start together with --drop-slot");
913 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
914 exit(1);
915 }
916
918 {
919 pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
920 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
921 exit(1);
922 }
923
925 {
926 pg_log_error("--endpos may only be specified with --start");
927 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
928 exit(1);
929 }
930
931 if (!do_create_slot)
932 {
933 if (two_phase)
934 {
935 pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
936 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
937 exit(1);
938 }
939
940 if (failover)
941 {
942 pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
943 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
944 exit(1);
945 }
946 }
947
948 /*
949 * Obtain a connection to server. Notably, if we need a password, we want
950 * to collect it from the user immediately.
951 */
953 if (!conn)
954 /* Error message already written in GetConnection() */
955 exit(1);
957
958 /*
959 * Trap signals. (Don't do this until after the initial password prompt,
960 * if one is needed, in GetConnection.)
961 */
962#ifndef WIN32
966#endif
967
968 /*
969 * Run IDENTIFY_SYSTEM to check the connection type for each action.
970 * --create-slot and --start actions require a database-specific
971 * replication connection because they handle logical replication slots.
972 * --drop-slot can remove replication slots from any replication
973 * connection without this restriction.
974 */
975 if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
976 exit(1);
977
978 if (!do_drop_slot && db_name == NULL)
979 pg_fatal("could not establish database-specific replication connection");
980
981 /*
982 * Set umask so that directories/files are created with the same
983 * permissions as directories/files in the source data directory.
984 *
985 * pg_mode_mask is set to owner-only by default and then updated in
986 * GetConnection() where we get the mode from the server-side with
987 * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
988 */
990
991 /* Drop a replication slot. */
992 if (do_drop_slot)
993 {
994 if (verbose)
995 pg_log_info("dropping replication slot \"%s\"", replication_slot);
996
998 exit(1);
999 }
1000
1001 /* Create a replication slot. */
1002 if (do_create_slot)
1003 {
1004 if (verbose)
1005 pg_log_info("creating replication slot \"%s\"", replication_slot);
1006
1008 false, false, slot_exists_ok, two_phase,
1009 failover))
1010 exit(1);
1012 }
1013
1014 if (!do_start_slot)
1015 exit(0);
1016
1017 /* Stream loop */
1018 while (true)
1019 {
1021 if (time_to_abort)
1022 {
1023 /*
1024 * We've been Ctrl-C'ed or reached an exit limit condition. That's
1025 * not an error, so exit without an errorcode.
1026 */
1027 exit(0);
1028 }
1029
1030 /*
1031 * Ensure all written data is flushed to disk before exiting or
1032 * starting a new replication.
1033 */
1034 if (outfd != -1)
1036
1037 if (noloop)
1038 {
1039 pg_fatal("disconnected");
1040 }
1041 else
1042 {
1043 /* translator: check source for value for %d */
1044 pg_log_info("disconnected; waiting %d seconds to try again",
1047 }
1048 }
1049}
1050
1051/*
1052 * Fsync our output data, and send a feedback message to the server. Returns
1053 * true if successful, false otherwise.
1054 *
1055 * If successful, *now is updated to the current timestamp just before sending
1056 * feedback.
1057 */
1058static bool
1060{
1061 /* flush data to disk, so that we send a recent flush pointer */
1062 OutputFsync(*now);
1064 if (!sendFeedback(conn, *now, true, false))
1065 return false;
1066
1067 return true;
1068}
1069
1070/*
1071 * Try to inform the server about our upcoming demise, but don't wait around or
1072 * retry on failure.
1073 */
1074static void
1076 XLogRecPtr lsn)
1077{
1079 (void) PQflush(conn);
1080
1081 if (verbose)
1082 {
1083 switch (reason)
1084 {
1085 case STREAM_STOP_SIGNAL:
1086 pg_log_info("received interrupt signal, exiting");
1087 break;
1089 pg_log_info("end position %X/%08X reached by keepalive",
1091 break;
1094 pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
1096 break;
1097 case STREAM_STOP_NONE:
1098 Assert(false);
1099 break;
1100 }
1101 }
1102}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
#define Max(x, y)
Definition c.h:1085
#define SIGNAL_ARGS
Definition c.h:1474
#define Assert(condition)
Definition c.h:943
#define PG_TEXTDOMAIN(domain)
Definition c.h:1303
int64_t int64
Definition c.h:621
#define PG_BINARY
Definition c.h:1386
uint64_t uint64
Definition c.h:625
uint32_t uint32
Definition c.h:624
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition exec.c:430
int main(void)
int64 TimestampTz
Definition timestamp.h:39
#define _(x)
Definition elog.c:96
void PQfinish(PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQflush(PGconn *conn)
Definition fe-exec.c:4036
void PQfreemem(void *ptr)
Definition fe-exec.c:4068
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition fe-exec.c:2766
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition fe-exec.c:2712
int PQconsumeInput(PGconn *conn)
Definition fe-exec.c:2001
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition fe-exec.c:2833
char * pg_strdup(const char *in)
Definition fe_memutils.c:91
#define pg_realloc_array(pointer, type, count)
Definition fe_memutils.h:74
int pg_file_create_mode
Definition file_perm.c:19
int pg_mode_mask
Definition file_perm.c:25
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition getopt_long.c:60
#define no_argument
Definition getopt_long.h:25
#define required_argument
Definition getopt_long.h:26
long val
Definition informix.c:689
#define close(a)
Definition win32.h:12
#define write(a, b, c)
Definition win32.h:14
int i
Definition isn.c:77
#define PQresultErrorMessage
#define PQgetResult
#define PQclear
#define PQresultStatus
@ PGRES_COPY_BOTH
Definition libpq-fe.h:143
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131
@ PGRES_COPY_OUT
Definition libpq-fe.h:137
void pg_logging_init(const char *argv0)
Definition logging.c:85
#define pg_log_error(...)
Definition logging.h:108
#define pg_log_error_hint(...)
Definition logging.h:114
#define pg_log_info(...)
Definition logging.h:126
const char * progname
Definition main.c:44
bool option_parse_int(const char *optarg, const char *optname, int min_range, int max_range, int *result)
#define pg_fatal(...)
const void size_t len
const void * data
PGDLLIMPORT int optind
Definition getopt.c:47
PGDLLIMPORT char * optarg
Definition getopt.c:49
StreamStopReason
@ STREAM_STOP_KEEPALIVE
@ STREAM_STOP_END_OF_WAL
@ STREAM_STOP_NONE
@ STREAM_STOP_SIGNAL
static volatile sig_atomic_t stop_reason
static int verbose
static int noloop
static bool do_start_slot
static void sighup_handler(SIGNAL_ARGS)
static bool two_phase
static void StreamLogicalLog(void)
static bool output_needs_fsync
static XLogRecPtr endpos
static bool failover
static const char * plugin
static volatile sig_atomic_t time_to_abort
static bool slot_exists_ok
static int fsync_interval
static bool do_drop_slot
static bool output_isfile
static char * replication_slot
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now)
static XLogRecPtr output_written_lsn
static void sigexit_handler(SIGNAL_ARGS)
static int outfd
static TimestampTz output_last_fsync
static size_t noptions
static char * outfile
static XLogRecPtr output_fsync_lsn
static bool sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
static void usage(void)
static void OutputFsync(TimestampTz now)
static XLogRecPtr startpos
#define RECONNECT_SLEEP_TIME
static bool do_create_slot
static void disconnect_atexit(void)
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason, XLogRecPtr lsn)
static int standby_message_timeout
static volatile sig_atomic_t output_reopen
static char ** options
#define pqsignal
Definition port.h:548
const char * get_progname(const char *argv0)
Definition path.c:669
#define printf(...)
Definition port.h:267
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
char * c
static int fb(int x)
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_Keepalive
Definition protocol.h:75
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
void pg_usleep(long microsec)
Definition signal.c:53
static void error(void)
int dbgetpassword
Definition streamutil.c:50
PGconn * GetConnection(void)
Definition streamutil.c:60
char * dbhost
Definition streamutil.c:46
int64 fe_recvint64(char *buf)
Definition streamutil.c:895
TimestampTz feGetCurrentTimestamp(void)
Definition streamutil.c:830
char * dbport
Definition streamutil.c:48
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition streamutil.c:849
void fe_sendint64(int64 i, char *buf)
Definition streamutil.c:884
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition streamutil.c:871
char * dbname
Definition streamutil.c:49
PGconn * conn
Definition streamutil.c:52
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition streamutil.c:409
char * dbuser
Definition streamutil.c:47
#define AppendQuotedLiteral(b, s)
Definition streamutil.h:48
#define AppendQuotedIdentifier(b, s)
Definition streamutil.h:47
static StringInfo copybuf
Definition tablesync.c:129
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition walsender.c:1249
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition walsender.c:1463
#define fsync(fd)
Definition win32_port.h:83
#define SIGHUP
Definition win32_port.h:158
#define EINTR
Definition win32_port.h:361
#define fstat
Definition win32_port.h:73
#define S_ISREG(m)
Definition win32_port.h:318
#define select(n, r, w, e, timeout)
Definition win32_port.h:500
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28