PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
pg_recvlogical.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 * fashion and write it to a local file.
5 *
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_recvlogical.c
10 *-------------------------------------------------------------------------
11 */
12
13#include "postgres_fe.h"
14
15#include <dirent.h>
16#include <limits.h>
17#include <sys/select.h>
18#include <sys/stat.h>
19#include <unistd.h>
20
21#include "common/file_perm.h"
22#include "common/logging.h"
24#include "getopt_long.h"
25#include "libpq-fe.h"
26#include "libpq/pqsignal.h"
27#include "pqexpbuffer.h"
28#include "streamutil.h"
29
30/* Time to sleep between reconnection attempts */
31#define RECONNECT_SLEEP_TIME 5
32
33typedef enum
34{
40
41/* Global Options */
42static char *outfile = NULL;
43static int verbose = 0;
44static bool two_phase = false;
45static bool failover = false;
46static int noloop = 0;
47static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
48static int fsync_interval = 10 * 1000; /* 10 sec = default */
51static bool do_create_slot = false;
52static bool slot_exists_ok = false;
53static bool do_start_slot = false;
54static bool do_drop_slot = false;
55static char *replication_slot = NULL;
56
57/* filled pairwise with option, value. value may be NULL */
58static char **options;
59static size_t noptions = 0;
60static const char *plugin = "test_decoding";
61
62/* Global State */
63static int outfd = -1;
64static volatile sig_atomic_t time_to_abort = false;
65static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
66static volatile sig_atomic_t output_reopen = false;
67static bool output_isfile;
69static bool output_needs_fsync = false;
72
73static void usage(void);
74static void StreamLogicalLog(void);
77 StreamStopReason reason,
78 XLogRecPtr lsn);
79
80static void
81usage(void)
82{
83 printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
84 progname);
85 printf(_("Usage:\n"));
86 printf(_(" %s [OPTION]...\n"), progname);
87 printf(_("\nAction to be performed:\n"));
88 printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
89 printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
90 printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
91 printf(_("\nOptions:\n"));
92 printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
93 printf(_(" --failover enable replication slot synchronization to standby servers when\n"
94 " creating a slot\n"));
95 printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
96 printf(_(" -F --fsync-interval=SECS\n"
97 " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
98 printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
99 printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
100 printf(_(" -n, --no-loop do not loop on connection lost\n"));
101 printf(_(" -o, --option=NAME[=VALUE]\n"
102 " pass option NAME with optional value VALUE to the\n"
103 " output plugin\n"));
104 printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
105 printf(_(" -s, --status-interval=SECS\n"
106 " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
107 printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
108 printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
109 printf(_(" -v, --verbose output verbose messages\n"));
110 printf(_(" -V, --version output version information, then exit\n"));
111 printf(_(" -?, --help show this help, then exit\n"));
112 printf(_("\nConnection options:\n"));
113 printf(_(" -d, --dbname=DBNAME database to connect to\n"));
114 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
115 printf(_(" -p, --port=PORT database server port number\n"));
116 printf(_(" -U, --username=NAME connect as specified database user\n"));
117 printf(_(" -w, --no-password never prompt for password\n"));
118 printf(_(" -W, --password force password prompt (should happen automatically)\n"));
119 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
120 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
121}
122
123/*
124 * Send a Standby Status Update message to server.
125 */
126static bool
127sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
128{
129 static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
130 static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
131
132 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
133 int len = 0;
134
135 /*
136 * we normally don't want to send superfluous feedback, but if it's
137 * because of a timeout we need to, otherwise wal_sender_timeout will kill
138 * us.
139 */
140 if (!force &&
141 last_written_lsn == output_written_lsn &&
142 last_fsync_lsn == output_fsync_lsn)
143 return true;
144
145 if (verbose)
146 pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
150
151 replybuf[len] = 'r';
152 len += 1;
153 fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
154 len += 8;
155 fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
156 len += 8;
157 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
158 len += 8;
159 fe_sendint64(now, &replybuf[len]); /* sendTime */
160 len += 8;
161 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
162 len += 1;
163
165 last_written_lsn = output_written_lsn;
166 last_fsync_lsn = output_fsync_lsn;
167
168 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
169 {
170 pg_log_error("could not send feedback packet: %s",
172 return false;
173 }
174
175 return true;
176}
177
178static void
180{
181 if (conn != NULL)
182 PQfinish(conn);
183}
184
185static bool
187{
189
191
192 if (fsync_interval <= 0)
193 return true;
194
196 return true;
197
198 output_needs_fsync = false;
199
200 /* can only fsync if it's a regular file */
201 if (!output_isfile)
202 return true;
203
204 if (fsync(outfd) != 0)
205 pg_fatal("could not fsync file \"%s\": %m", outfile);
206
207 return true;
208}
209
210/*
211 * Start the log streaming
212 */
213static void
215{
216 PGresult *res;
217 char *copybuf = NULL;
218 TimestampTz last_status = -1;
219 int i;
220 PQExpBuffer query;
221 XLogRecPtr cur_record_lsn;
222
225 cur_record_lsn = InvalidXLogRecPtr;
226
227 /*
228 * Connect in replication mode to the server
229 */
230 if (!conn)
232 if (!conn)
233 /* Error message already written in GetConnection() */
234 return;
235
236 /*
237 * Start the replication
238 */
239 if (verbose)
240 pg_log_info("starting log streaming at %X/%X (slot %s)",
243
244 /* Initiate the replication stream at specified location */
245 query = createPQExpBuffer();
246 appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
248
249 /* print options if there are any */
250 if (noptions)
251 appendPQExpBufferStr(query, " (");
252
253 for (i = 0; i < noptions; i++)
254 {
255 /* separator */
256 if (i > 0)
257 appendPQExpBufferStr(query, ", ");
258
259 /* write option name */
260 appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
261
262 /* write option value if specified */
263 if (options[(i * 2) + 1] != NULL)
264 appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
265 }
266
267 if (noptions)
268 appendPQExpBufferChar(query, ')');
269
270 res = PQexec(conn, query->data);
272 {
273 pg_log_error("could not send replication command \"%s\": %s",
274 query->data, PQresultErrorMessage(res));
275 PQclear(res);
276 goto error;
277 }
278 PQclear(res);
279 resetPQExpBuffer(query);
280
281 if (verbose)
282 pg_log_info("streaming initiated");
283
284 while (!time_to_abort)
285 {
286 int r;
287 int bytes_left;
288 int bytes_written;
290 int hdr_len;
291
292 cur_record_lsn = InvalidXLogRecPtr;
293
294 if (copybuf != NULL)
295 {
297 copybuf = NULL;
298 }
299
300 /*
301 * Potentially send a status message to the primary.
302 */
304
305 if (outfd != -1 &&
308 {
309 if (!OutputFsync(now))
310 goto error;
311 }
312
313 if (standby_message_timeout > 0 &&
316 {
317 /* Time to send feedback! */
318 if (!sendFeedback(conn, now, true, false))
319 goto error;
320
321 last_status = now;
322 }
323
324 /* got SIGHUP, close output file */
325 if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
326 {
328 if (!OutputFsync(now))
329 goto error;
330 close(outfd);
331 outfd = -1;
332 }
333 output_reopen = false;
334
335 /* open the output file, if not open yet */
336 if (outfd == -1)
337 {
338 struct stat statbuf;
339
340 if (strcmp(outfile, "-") == 0)
341 outfd = fileno(stdout);
342 else
343 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
344 S_IRUSR | S_IWUSR);
345 if (outfd == -1)
346 {
347 pg_log_error("could not open log file \"%s\": %m", outfile);
348 goto error;
349 }
350
351 if (fstat(outfd, &statbuf) != 0)
352 {
353 pg_log_error("could not stat file \"%s\": %m", outfile);
354 goto error;
355 }
356
357 output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
358 }
359
360 r = PQgetCopyData(conn, &copybuf, 1);
361 if (r == 0)
362 {
363 /*
364 * In async mode, and no data available. We block on reading but
365 * not more than the specified timeout, so that we can send a
366 * response back to the client.
367 */
368 fd_set input_mask;
369 TimestampTz message_target = 0;
370 TimestampTz fsync_target = 0;
371 struct timeval timeout;
372 struct timeval *timeoutptr = NULL;
373
374 if (PQsocket(conn) < 0)
375 {
376 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
377 goto error;
378 }
379
380 FD_ZERO(&input_mask);
381 FD_SET(PQsocket(conn), &input_mask);
382
383 /* Compute when we need to wakeup to send a keepalive message. */
385 message_target = last_status + (standby_message_timeout - 1) *
386 ((int64) 1000);
387
388 /* Compute when we need to wakeup to fsync the output file. */
390 fsync_target = output_last_fsync + (fsync_interval - 1) *
391 ((int64) 1000);
392
393 /* Now compute when to wakeup. */
394 if (message_target > 0 || fsync_target > 0)
395 {
396 TimestampTz targettime;
397 long secs;
398 int usecs;
399
400 targettime = message_target;
401
402 if (fsync_target > 0 && fsync_target < targettime)
403 targettime = fsync_target;
404
406 targettime,
407 &secs,
408 &usecs);
409 if (secs <= 0)
410 timeout.tv_sec = 1; /* Always sleep at least 1 sec */
411 else
412 timeout.tv_sec = secs;
413 timeout.tv_usec = usecs;
414 timeoutptr = &timeout;
415 }
416
417 r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
418 if (r == 0 || (r < 0 && errno == EINTR))
419 {
420 /*
421 * Got a timeout or signal. Continue the loop and either
422 * deliver a status packet to the server or just go back into
423 * blocking.
424 */
425 continue;
426 }
427 else if (r < 0)
428 {
429 pg_log_error("%s() failed: %m", "select");
430 goto error;
431 }
432
433 /* Else there is actually data on the socket */
434 if (PQconsumeInput(conn) == 0)
435 {
436 pg_log_error("could not receive data from WAL stream: %s",
438 goto error;
439 }
440 continue;
441 }
442
443 /* End of copy stream */
444 if (r == -1)
445 break;
446
447 /* Failure while reading the copy stream */
448 if (r == -2)
449 {
450 pg_log_error("could not read COPY data: %s",
452 goto error;
453 }
454
455 /* Check the message type. */
456 if (copybuf[0] == 'k')
457 {
458 int pos;
459 bool replyRequested;
460 XLogRecPtr walEnd;
461 bool endposReached = false;
462
463 /*
464 * Parse the keepalive message, enclosed in the CopyData message.
465 * We just check if the server requested a reply, and ignore the
466 * rest.
467 */
468 pos = 1; /* skip msgtype 'k' */
469 walEnd = fe_recvint64(&copybuf[pos]);
471
472 pos += 8; /* read walEnd */
473
474 pos += 8; /* skip sendTime */
475
476 if (r < pos + 1)
477 {
478 pg_log_error("streaming header too small: %d", r);
479 goto error;
480 }
481 replyRequested = copybuf[pos];
482
483 if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
484 {
485 /*
486 * If there's nothing to read on the socket until a keepalive
487 * we know that the server has nothing to send us; and if
488 * walEnd has passed endpos, we know nothing else can have
489 * committed before endpos. So we can bail out now.
490 */
491 endposReached = true;
492 }
493
494 /* Send a reply, if necessary */
495 if (replyRequested || endposReached)
496 {
498 goto error;
499 last_status = now;
500 }
501
502 if (endposReached)
503 {
505 time_to_abort = true;
506 break;
507 }
508
509 continue;
510 }
511 else if (copybuf[0] != 'w')
512 {
513 pg_log_error("unrecognized streaming header: \"%c\"",
514 copybuf[0]);
515 goto error;
516 }
517
518 /*
519 * Read the header of the XLogData message, enclosed in the CopyData
520 * message. We only need the WAL location field (dataStart), the rest
521 * of the header is ignored.
522 */
523 hdr_len = 1; /* msgtype 'w' */
524 hdr_len += 8; /* dataStart */
525 hdr_len += 8; /* walEnd */
526 hdr_len += 8; /* sendTime */
527 if (r < hdr_len + 1)
528 {
529 pg_log_error("streaming header too small: %d", r);
530 goto error;
531 }
532
533 /* Extract WAL location for this block */
534 cur_record_lsn = fe_recvint64(&copybuf[1]);
535
536 if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
537 {
538 /*
539 * We've read past our endpoint, so prepare to go away being
540 * cautious about what happens to our output data.
541 */
543 goto error;
545 time_to_abort = true;
546 break;
547 }
548
549 output_written_lsn = Max(cur_record_lsn, output_written_lsn);
550
551 bytes_left = r - hdr_len;
552 bytes_written = 0;
553
554 /* signal that a fsync is needed */
555 output_needs_fsync = true;
556
557 while (bytes_left)
558 {
559 int ret;
560
561 ret = write(outfd,
562 copybuf + hdr_len + bytes_written,
563 bytes_left);
564
565 if (ret < 0)
566 {
567 pg_log_error("could not write %d bytes to log file \"%s\": %m",
568 bytes_left, outfile);
569 goto error;
570 }
571
572 /* Write was successful, advance our position */
573 bytes_written += ret;
574 bytes_left -= ret;
575 }
576
577 if (write(outfd, "\n", 1) != 1)
578 {
579 pg_log_error("could not write %d bytes to log file \"%s\": %m",
580 1, outfile);
581 goto error;
582 }
583
584 if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
585 {
586 /* endpos was exactly the record we just processed, we're done */
588 goto error;
590 time_to_abort = true;
591 break;
592 }
593 }
594
595 /* Clean up connection state if stream has been aborted */
596 if (time_to_abort)
597 prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
598
599 res = PQgetResult(conn);
600 if (PQresultStatus(res) == PGRES_COPY_OUT)
601 {
602 PQclear(res);
603
604 /*
605 * We're doing a client-initiated clean exit and have sent CopyDone to
606 * the server. Drain any messages, so we don't miss a last-minute
607 * ErrorResponse. The walsender stops generating XLogData records once
608 * it sees CopyDone, so expect this to finish quickly. After CopyDone,
609 * it's too late for sendFeedback(), even if this were to take a long
610 * time. Hence, use synchronous-mode PQgetCopyData().
611 */
612 while (1)
613 {
614 int r;
615
616 if (copybuf != NULL)
617 {
619 copybuf = NULL;
620 }
621 r = PQgetCopyData(conn, &copybuf, 0);
622 if (r == -1)
623 break;
624 if (r == -2)
625 {
626 pg_log_error("could not read COPY data: %s",
628 time_to_abort = false; /* unclean exit */
629 goto error;
630 }
631 }
632
633 res = PQgetResult(conn);
634 }
636 {
637 pg_log_error("unexpected termination of replication stream: %s",
639 PQclear(res);
640 goto error;
641 }
642 PQclear(res);
643
644 if (outfd != -1 && strcmp(outfile, "-") != 0)
645 {
647
648 /* no need to jump to error on failure here, we're finishing anyway */
649 OutputFsync(t);
650
651 if (close(outfd) != 0)
652 pg_log_error("could not close file \"%s\": %m", outfile);
653 }
654 outfd = -1;
655error:
656 if (copybuf != NULL)
657 {
659 copybuf = NULL;
660 }
661 destroyPQExpBuffer(query);
662 PQfinish(conn);
663 conn = NULL;
664}
665
666/*
667 * Unfortunately we can't do sensible signal handling on windows...
668 */
669#ifndef WIN32
670
671/*
672 * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
673 * possible moment.
674 */
675static void
677{
679 time_to_abort = true;
680}
681
682/*
683 * Trigger the output file to be reopened.
684 */
685static void
687{
688 output_reopen = true;
689}
690#endif
691
692
693int
694main(int argc, char **argv)
695{
696 static struct option long_options[] = {
697/* general options */
698 {"file", required_argument, NULL, 'f'},
699 {"fsync-interval", required_argument, NULL, 'F'},
700 {"no-loop", no_argument, NULL, 'n'},
701 {"failover", no_argument, NULL, 5},
702 {"verbose", no_argument, NULL, 'v'},
703 {"two-phase", no_argument, NULL, 't'},
704 {"version", no_argument, NULL, 'V'},
705 {"help", no_argument, NULL, '?'},
706/* connection options */
707 {"dbname", required_argument, NULL, 'd'},
708 {"host", required_argument, NULL, 'h'},
709 {"port", required_argument, NULL, 'p'},
710 {"username", required_argument, NULL, 'U'},
711 {"no-password", no_argument, NULL, 'w'},
712 {"password", no_argument, NULL, 'W'},
713/* replication options */
714 {"startpos", required_argument, NULL, 'I'},
715 {"endpos", required_argument, NULL, 'E'},
716 {"option", required_argument, NULL, 'o'},
717 {"plugin", required_argument, NULL, 'P'},
718 {"status-interval", required_argument, NULL, 's'},
719 {"slot", required_argument, NULL, 'S'},
720/* action */
721 {"create-slot", no_argument, NULL, 1},
722 {"start", no_argument, NULL, 2},
723 {"drop-slot", no_argument, NULL, 3},
724 {"if-not-exists", no_argument, NULL, 4},
725 {NULL, 0, NULL, 0}
726 };
727 int c;
728 int option_index;
729 uint32 hi,
730 lo;
731 char *db_name;
732
733 pg_logging_init(argv[0]);
734 progname = get_progname(argv[0]);
735 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
736
737 if (argc > 1)
738 {
739 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
740 {
741 usage();
742 exit(0);
743 }
744 else if (strcmp(argv[1], "-V") == 0 ||
745 strcmp(argv[1], "--version") == 0)
746 {
747 puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
748 exit(0);
749 }
750 }
751
752 while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
753 long_options, &option_index)) != -1)
754 {
755 switch (c)
756 {
757/* general options */
758 case 'f':
760 break;
761 case 'F':
762 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
763 INT_MAX / 1000,
765 exit(1);
766 fsync_interval *= 1000;
767 break;
768 case 'n':
769 noloop = 1;
770 break;
771 case 't':
772 two_phase = true;
773 break;
774 case 'v':
775 verbose++;
776 break;
777 case 5:
778 failover = true;
779 break;
780/* connection options */
781 case 'd':
783 break;
784 case 'h':
786 break;
787 case 'p':
789 break;
790 case 'U':
792 break;
793 case 'w':
794 dbgetpassword = -1;
795 break;
796 case 'W':
797 dbgetpassword = 1;
798 break;
799/* replication options */
800 case 'I':
801 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
802 pg_fatal("could not parse start position \"%s\"", optarg);
803 startpos = ((uint64) hi) << 32 | lo;
804 break;
805 case 'E':
806 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
807 pg_fatal("could not parse end position \"%s\"", optarg);
808 endpos = ((uint64) hi) << 32 | lo;
809 break;
810 case 'o':
811 {
812 char *data = pg_strdup(optarg);
813 char *val = strchr(data, '=');
814
815 if (val != NULL)
816 {
817 /* remove =; separate data from val */
818 *val = '\0';
819 val++;
820 }
821
822 noptions += 1;
823 options = pg_realloc(options, sizeof(char *) * noptions * 2);
824
825 options[(noptions - 1) * 2] = data;
826 options[(noptions - 1) * 2 + 1] = val;
827 }
828
829 break;
830 case 'P':
832 break;
833 case 's':
834 if (!option_parse_int(optarg, "-s/--status-interval", 0,
835 INT_MAX / 1000,
837 exit(1);
839 break;
840 case 'S':
842 break;
843/* action */
844 case 1:
845 do_create_slot = true;
846 break;
847 case 2:
848 do_start_slot = true;
849 break;
850 case 3:
851 do_drop_slot = true;
852 break;
853 case 4:
854 slot_exists_ok = true;
855 break;
856
857 default:
858 /* getopt_long already emitted a complaint */
859 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
860 exit(1);
861 }
862 }
863
864 /*
865 * Any non-option arguments?
866 */
867 if (optind < argc)
868 {
869 pg_log_error("too many command-line arguments (first is \"%s\")",
870 argv[optind]);
871 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
872 exit(1);
873 }
874
875 /*
876 * Required arguments
877 */
878 if (replication_slot == NULL)
879 {
880 pg_log_error("no slot specified");
881 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
882 exit(1);
883 }
884
885 if (do_start_slot && outfile == NULL)
886 {
887 pg_log_error("no target file specified");
888 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
889 exit(1);
890 }
891
892 if (!do_drop_slot && dbname == NULL)
893 {
894 pg_log_error("no database specified");
895 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
896 exit(1);
897 }
898
900 {
901 pg_log_error("at least one action needs to be specified");
902 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
903 exit(1);
904 }
905
907 {
908 pg_log_error("cannot use --create-slot or --start together with --drop-slot");
909 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
910 exit(1);
911 }
912
914 {
915 pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
916 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
917 exit(1);
918 }
919
921 {
922 pg_log_error("--endpos may only be specified with --start");
923 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
924 exit(1);
925 }
926
927 if (!do_create_slot)
928 {
929 if (two_phase)
930 {
931 pg_log_error("--two-phase may only be specified with --create-slot");
932 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
933 exit(1);
934 }
935
936 if (failover)
937 {
938 pg_log_error("--failover may only be specified with --create-slot");
939 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
940 exit(1);
941 }
942 }
943
944 /*
945 * Obtain a connection to server. Notably, if we need a password, we want
946 * to collect it from the user immediately.
947 */
949 if (!conn)
950 /* Error message already written in GetConnection() */
951 exit(1);
952 atexit(disconnect_atexit);
953
954 /*
955 * Trap signals. (Don't do this until after the initial password prompt,
956 * if one is needed, in GetConnection.)
957 */
958#ifndef WIN32
959 pqsignal(SIGINT, sigexit_handler);
960 pqsignal(SIGTERM, sigexit_handler);
962#endif
963
964 /*
965 * Run IDENTIFY_SYSTEM to check the connection type for each action.
966 * --create-slot and --start actions require a database-specific
967 * replication connection because they handle logical replication slots.
968 * --drop-slot can remove replication slots from any replication
969 * connection without this restriction.
970 */
971 if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
972 exit(1);
973
974 if (!do_drop_slot && db_name == NULL)
975 pg_fatal("could not establish database-specific replication connection");
976
977 /*
978 * Set umask so that directories/files are created with the same
979 * permissions as directories/files in the source data directory.
980 *
981 * pg_mode_mask is set to owner-only by default and then updated in
982 * GetConnection() where we get the mode from the server-side with
983 * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
984 */
985 umask(pg_mode_mask);
986
987 /* Drop a replication slot. */
988 if (do_drop_slot)
989 {
990 if (verbose)
991 pg_log_info("dropping replication slot \"%s\"", replication_slot);
992
994 exit(1);
995 }
996
997 /* Create a replication slot. */
998 if (do_create_slot)
999 {
1000 if (verbose)
1001 pg_log_info("creating replication slot \"%s\"", replication_slot);
1002
1004 false, false, slot_exists_ok, two_phase,
1005 failover))
1006 exit(1);
1008 }
1009
1010 if (!do_start_slot)
1011 exit(0);
1012
1013 /* Stream loop */
1014 while (true)
1015 {
1017 if (time_to_abort)
1018 {
1019 /*
1020 * We've been Ctrl-C'ed or reached an exit limit condition. That's
1021 * not an error, so exit without an errorcode.
1022 */
1023 exit(0);
1024 }
1025 else if (noloop)
1026 pg_fatal("disconnected");
1027 else
1028 {
1029 /* translator: check source for value for %d */
1030 pg_log_info("disconnected; waiting %d seconds to try again",
1033 }
1034 }
1035}
1036
1037/*
1038 * Fsync our output data, and send a feedback message to the server. Returns
1039 * true if successful, false otherwise.
1040 *
1041 * If successful, *now is updated to the current timestamp just before sending
1042 * feedback.
1043 */
1044static bool
1046{
1047 /* flush data to disk, so that we send a recent flush pointer */
1048 if (!OutputFsync(*now))
1049 return false;
1051 if (!sendFeedback(conn, *now, true, false))
1052 return false;
1053
1054 return true;
1055}
1056
1057/*
1058 * Try to inform the server about our upcoming demise, but don't wait around or
1059 * retry on failure.
1060 */
1061static void
1063 XLogRecPtr lsn)
1064{
1065 (void) PQputCopyEnd(conn, NULL);
1066 (void) PQflush(conn);
1067
1068 if (verbose)
1069 {
1070 switch (reason)
1071 {
1072 case STREAM_STOP_SIGNAL:
1073 pg_log_info("received interrupt signal, exiting");
1074 break;
1076 pg_log_info("end position %X/%X reached by keepalive",
1078 break;
1081 pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1083 break;
1084 case STREAM_STOP_NONE:
1085 Assert(false);
1086 break;
1087 }
1088 }
1089}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
#define Max(x, y)
Definition: c.h:969
#define SIGNAL_ARGS
Definition: c.h:1320
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1185
int64_t int64
Definition: c.h:499
#define PG_BINARY
Definition: c.h:1244
uint64_t uint64
Definition: c.h:503
uint32_t uint32
Definition: c.h:502
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:429
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:203
int64 TimestampTz
Definition: timestamp.h:39
#define _(x)
Definition: elog.c:91
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5290
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7619
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7645
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
void PQclear(PGresult *res)
Definition: fe-exec.c:721
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2749
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2695
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2816
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
int pg_mode_mask
Definition: file_perm.c:25
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:25
#define required_argument
Definition: getopt_long.h:26
Assert(PointerIsAligned(start, uint64))
long val
Definition: informix.c:689
#define close(a)
Definition: win32.h:12
#define write(a, b, c)
Definition: win32.h:14
int i
Definition: isn.c:77
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:137
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
@ PGRES_COPY_OUT
Definition: libpq-fe.h:131
void pg_logging_init(const char *argv0)
Definition: logging.c:83
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_info(...)
Definition: logging.h:124
const char * progname
Definition: main.c:44
bool option_parse_int(const char *optarg, const char *optname, int min_range, int max_range, int *result)
Definition: option_utils.c:50
#define pg_fatal(...)
const void size_t len
const void * data
PGDLLIMPORT int optind
Definition: getopt.c:51
PGDLLIMPORT char * optarg
Definition: getopt.c:53
StreamStopReason
@ STREAM_STOP_KEEPALIVE
@ STREAM_STOP_END_OF_WAL
@ STREAM_STOP_NONE
@ STREAM_STOP_SIGNAL
static volatile sig_atomic_t stop_reason
static int verbose
static int noloop
static bool do_start_slot
static bool OutputFsync(TimestampTz now)
static void sighup_handler(SIGNAL_ARGS)
static bool two_phase
static void StreamLogicalLog(void)
static bool output_needs_fsync
static XLogRecPtr endpos
int main(int argc, char **argv)
static bool failover
static const char * plugin
static volatile sig_atomic_t time_to_abort
static bool slot_exists_ok
static int fsync_interval
static bool do_drop_slot
static bool output_isfile
static char * replication_slot
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now)
static XLogRecPtr output_written_lsn
static void sigexit_handler(SIGNAL_ARGS)
static int outfd
static TimestampTz output_last_fsync
static size_t noptions
static char * outfile
static XLogRecPtr output_fsync_lsn
static bool sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
static void usage(void)
static XLogRecPtr startpos
#define RECONNECT_SLEEP_TIME
static bool do_create_slot
static void disconnect_atexit(void)
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason, XLogRecPtr lsn)
static int standby_message_timeout
static volatile sig_atomic_t output_reopen
static char ** options
#define pqsignal
Definition: port.h:531
const char * get_progname(const char *argv0)
Definition: path.c:652
#define printf(...)
Definition: port.h:245
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
char * c
void pg_usleep(long microsec)
Definition: signal.c:53
static void error(void)
Definition: sql-dyntest.c:147
int dbgetpassword
Definition: streamutil.c:50
char * dbhost
Definition: streamutil.c:46
int64 fe_recvint64(char *buf)
Definition: streamutil.c:868
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:803
char * dbport
Definition: streamutil.c:48
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:822
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:857
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:844
char * dbname
Definition: streamutil.c:49
PGconn * conn
Definition: streamutil.c:52
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:409
char * dbuser
Definition: streamutil.c:47
unsigned short st_mode
Definition: win32_port.h:258
static StringInfo copybuf
Definition: tablesync.c:137
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1177
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1382
#define fsync(fd)
Definition: win32_port.h:83
#define SIGHUP
Definition: win32_port.h:158
#define EINTR
Definition: win32_port.h:364
#define S_IRUSR
Definition: win32_port.h:279
#define fstat
Definition: win32_port.h:273
#define S_ISREG(m)
Definition: win32_port.h:318
#define S_IWUSR
Definition: win32_port.h:282
#define select(n, r, w, e, timeout)
Definition: win32_port.h:503
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28