PostgreSQL Source Code  git master
pg_recvlogical.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4  * fashion and write it to a local file.
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/bin/pg_basebackup/pg_recvlogical.c
10  *-------------------------------------------------------------------------
11  */
12 
13 #include "postgres_fe.h"
14 
15 #include <dirent.h>
16 #include <limits.h>
17 #include <sys/select.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 
21 #include "access/xlog_internal.h"
22 #include "common/fe_memutils.h"
23 #include "common/file_perm.h"
24 #include "common/logging.h"
25 #include "fe_utils/option_utils.h"
26 #include "getopt_long.h"
27 #include "libpq-fe.h"
28 #include "libpq/pqsignal.h"
29 #include "pqexpbuffer.h"
30 #include "streamutil.h"
31 
32 /* Time to sleep between reconnection attempts */
33 #define RECONNECT_SLEEP_TIME 5
34 
35 typedef enum
36 {
42 
43 /* Global Options */
44 static char *outfile = NULL;
45 static int verbose = 0;
46 static bool two_phase = false;
47 static int noloop = 0;
48 static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
49 static int fsync_interval = 10 * 1000; /* 10 sec = default */
52 static bool do_create_slot = false;
53 static bool slot_exists_ok = false;
54 static bool do_start_slot = false;
55 static bool do_drop_slot = false;
56 static char *replication_slot = NULL;
57 
58 /* filled pairwise with option, value. value may be NULL */
59 static char **options;
60 static size_t noptions = 0;
61 static const char *plugin = "test_decoding";
62 
63 /* Global State */
64 static int outfd = -1;
65 static volatile sig_atomic_t time_to_abort = false;
66 static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
67 static volatile sig_atomic_t output_reopen = false;
68 static bool output_isfile;
70 static bool output_needs_fsync = false;
73 
74 static void usage(void);
75 static void StreamLogicalLog(void);
78  StreamStopReason reason,
79  XLogRecPtr lsn);
80 
81 static void
82 usage(void)
83 {
84  printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
85  progname);
86  printf(_("Usage:\n"));
87  printf(_(" %s [OPTION]...\n"), progname);
88  printf(_("\nAction to be performed:\n"));
89  printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
90  printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
91  printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
92  printf(_("\nOptions:\n"));
93  printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
94  printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
95  printf(_(" -F --fsync-interval=SECS\n"
96  " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
97  printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
98  printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
99  printf(_(" -n, --no-loop do not loop on connection lost\n"));
100  printf(_(" -o, --option=NAME[=VALUE]\n"
101  " pass option NAME with optional value VALUE to the\n"
102  " output plugin\n"));
103  printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
104  printf(_(" -s, --status-interval=SECS\n"
105  " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
106  printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
107  printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
108  printf(_(" -v, --verbose output verbose messages\n"));
109  printf(_(" -V, --version output version information, then exit\n"));
110  printf(_(" -?, --help show this help, then exit\n"));
111  printf(_("\nConnection options:\n"));
112  printf(_(" -d, --dbname=DBNAME database to connect to\n"));
113  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
114  printf(_(" -p, --port=PORT database server port number\n"));
115  printf(_(" -U, --username=NAME connect as specified database user\n"));
116  printf(_(" -w, --no-password never prompt for password\n"));
117  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
118  printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
119  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
120 }
121 
122 /*
123  * Send a Standby Status Update message to server.
124  */
125 static bool
126 sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
127 {
128  static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
129  static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
130 
131  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
132  int len = 0;
133 
134  /*
135  * we normally don't want to send superfluous feedback, but if it's
136  * because of a timeout we need to, otherwise wal_sender_timeout will kill
137  * us.
138  */
139  if (!force &&
140  last_written_lsn == output_written_lsn &&
141  last_fsync_lsn == output_fsync_lsn)
142  return true;
143 
144  if (verbose)
145  pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
149 
150  replybuf[len] = 'r';
151  len += 1;
152  fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
153  len += 8;
154  fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
155  len += 8;
156  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
157  len += 8;
158  fe_sendint64(now, &replybuf[len]); /* sendTime */
159  len += 8;
160  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
161  len += 1;
162 
164  last_written_lsn = output_written_lsn;
165  last_fsync_lsn = output_fsync_lsn;
166 
167  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
168  {
169  pg_log_error("could not send feedback packet: %s",
171  return false;
172  }
173 
174  return true;
175 }
176 
177 static void
179 {
180  if (conn != NULL)
181  PQfinish(conn);
182 }
183 
184 static bool
186 {
188 
190 
191  if (fsync_interval <= 0)
192  return true;
193 
194  if (!output_needs_fsync)
195  return true;
196 
197  output_needs_fsync = false;
198 
199  /* can only fsync if it's a regular file */
200  if (!output_isfile)
201  return true;
202 
203  if (fsync(outfd) != 0)
204  pg_fatal("could not fsync file \"%s\": %m", outfile);
205 
206  return true;
207 }
208 
209 /*
210  * Start the log streaming
211  */
212 static void
214 {
215  PGresult *res;
216  char *copybuf = NULL;
217  TimestampTz last_status = -1;
218  int i;
219  PQExpBuffer query;
220  XLogRecPtr cur_record_lsn;
221 
224  cur_record_lsn = InvalidXLogRecPtr;
225 
226  /*
227  * Connect in replication mode to the server
228  */
229  if (!conn)
230  conn = GetConnection();
231  if (!conn)
232  /* Error message already written in GetConnection() */
233  return;
234 
235  /*
236  * Start the replication
237  */
238  if (verbose)
239  pg_log_info("starting log streaming at %X/%X (slot %s)",
242 
243  /* Initiate the replication stream at specified location */
244  query = createPQExpBuffer();
245  appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
247 
248  /* print options if there are any */
249  if (noptions)
250  appendPQExpBufferStr(query, " (");
251 
252  for (i = 0; i < noptions; i++)
253  {
254  /* separator */
255  if (i > 0)
256  appendPQExpBufferStr(query, ", ");
257 
258  /* write option name */
259  appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
260 
261  /* write option value if specified */
262  if (options[(i * 2) + 1] != NULL)
263  appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
264  }
265 
266  if (noptions)
267  appendPQExpBufferChar(query, ')');
268 
269  res = PQexec(conn, query->data);
271  {
272  pg_log_error("could not send replication command \"%s\": %s",
273  query->data, PQresultErrorMessage(res));
274  PQclear(res);
275  goto error;
276  }
277  PQclear(res);
278  resetPQExpBuffer(query);
279 
280  if (verbose)
281  pg_log_info("streaming initiated");
282 
283  while (!time_to_abort)
284  {
285  int r;
286  int bytes_left;
287  int bytes_written;
289  int hdr_len;
290 
291  cur_record_lsn = InvalidXLogRecPtr;
292 
293  if (copybuf != NULL)
294  {
296  copybuf = NULL;
297  }
298 
299  /*
300  * Potentially send a status message to the primary.
301  */
303 
304  if (outfd != -1 &&
307  {
308  if (!OutputFsync(now))
309  goto error;
310  }
311 
312  if (standby_message_timeout > 0 &&
313  feTimestampDifferenceExceeds(last_status, now,
315  {
316  /* Time to send feedback! */
317  if (!sendFeedback(conn, now, true, false))
318  goto error;
319 
320  last_status = now;
321  }
322 
323  /* got SIGHUP, close output file */
324  if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
325  {
327  if (!OutputFsync(now))
328  goto error;
329  close(outfd);
330  outfd = -1;
331  }
332  output_reopen = false;
333 
334  /* open the output file, if not open yet */
335  if (outfd == -1)
336  {
337  struct stat statbuf;
338 
339  if (strcmp(outfile, "-") == 0)
340  outfd = fileno(stdout);
341  else
342  outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
343  S_IRUSR | S_IWUSR);
344  if (outfd == -1)
345  {
346  pg_log_error("could not open log file \"%s\": %m", outfile);
347  goto error;
348  }
349 
350  if (fstat(outfd, &statbuf) != 0)
351  {
352  pg_log_error("could not stat file \"%s\": %m", outfile);
353  goto error;
354  }
355 
356  output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
357  }
358 
359  r = PQgetCopyData(conn, &copybuf, 1);
360  if (r == 0)
361  {
362  /*
363  * In async mode, and no data available. We block on reading but
364  * not more than the specified timeout, so that we can send a
365  * response back to the client.
366  */
367  fd_set input_mask;
368  TimestampTz message_target = 0;
369  TimestampTz fsync_target = 0;
370  struct timeval timeout;
371  struct timeval *timeoutptr = NULL;
372 
373  if (PQsocket(conn) < 0)
374  {
375  pg_log_error("invalid socket: %s", PQerrorMessage(conn));
376  goto error;
377  }
378 
379  FD_ZERO(&input_mask);
380  FD_SET(PQsocket(conn), &input_mask);
381 
382  /* Compute when we need to wakeup to send a keepalive message. */
384  message_target = last_status + (standby_message_timeout - 1) *
385  ((int64) 1000);
386 
387  /* Compute when we need to wakeup to fsync the output file. */
389  fsync_target = output_last_fsync + (fsync_interval - 1) *
390  ((int64) 1000);
391 
392  /* Now compute when to wakeup. */
393  if (message_target > 0 || fsync_target > 0)
394  {
395  TimestampTz targettime;
396  long secs;
397  int usecs;
398 
399  targettime = message_target;
400 
401  if (fsync_target > 0 && fsync_target < targettime)
402  targettime = fsync_target;
403 
405  targettime,
406  &secs,
407  &usecs);
408  if (secs <= 0)
409  timeout.tv_sec = 1; /* Always sleep at least 1 sec */
410  else
411  timeout.tv_sec = secs;
412  timeout.tv_usec = usecs;
413  timeoutptr = &timeout;
414  }
415 
416  r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
417  if (r == 0 || (r < 0 && errno == EINTR))
418  {
419  /*
420  * Got a timeout or signal. Continue the loop and either
421  * deliver a status packet to the server or just go back into
422  * blocking.
423  */
424  continue;
425  }
426  else if (r < 0)
427  {
428  pg_log_error("%s() failed: %m", "select");
429  goto error;
430  }
431 
432  /* Else there is actually data on the socket */
433  if (PQconsumeInput(conn) == 0)
434  {
435  pg_log_error("could not receive data from WAL stream: %s",
437  goto error;
438  }
439  continue;
440  }
441 
442  /* End of copy stream */
443  if (r == -1)
444  break;
445 
446  /* Failure while reading the copy stream */
447  if (r == -2)
448  {
449  pg_log_error("could not read COPY data: %s",
451  goto error;
452  }
453 
454  /* Check the message type. */
455  if (copybuf[0] == 'k')
456  {
457  int pos;
458  bool replyRequested;
459  XLogRecPtr walEnd;
460  bool endposReached = false;
461 
462  /*
463  * Parse the keepalive message, enclosed in the CopyData message.
464  * We just check if the server requested a reply, and ignore the
465  * rest.
466  */
467  pos = 1; /* skip msgtype 'k' */
468  walEnd = fe_recvint64(&copybuf[pos]);
470 
471  pos += 8; /* read walEnd */
472 
473  pos += 8; /* skip sendTime */
474 
475  if (r < pos + 1)
476  {
477  pg_log_error("streaming header too small: %d", r);
478  goto error;
479  }
480  replyRequested = copybuf[pos];
481 
482  if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
483  {
484  /*
485  * If there's nothing to read on the socket until a keepalive
486  * we know that the server has nothing to send us; and if
487  * walEnd has passed endpos, we know nothing else can have
488  * committed before endpos. So we can bail out now.
489  */
490  endposReached = true;
491  }
492 
493  /* Send a reply, if necessary */
494  if (replyRequested || endposReached)
495  {
496  if (!flushAndSendFeedback(conn, &now))
497  goto error;
498  last_status = now;
499  }
500 
501  if (endposReached)
502  {
504  time_to_abort = true;
505  break;
506  }
507 
508  continue;
509  }
510  else if (copybuf[0] != 'w')
511  {
512  pg_log_error("unrecognized streaming header: \"%c\"",
513  copybuf[0]);
514  goto error;
515  }
516 
517  /*
518  * Read the header of the XLogData message, enclosed in the CopyData
519  * message. We only need the WAL location field (dataStart), the rest
520  * of the header is ignored.
521  */
522  hdr_len = 1; /* msgtype 'w' */
523  hdr_len += 8; /* dataStart */
524  hdr_len += 8; /* walEnd */
525  hdr_len += 8; /* sendTime */
526  if (r < hdr_len + 1)
527  {
528  pg_log_error("streaming header too small: %d", r);
529  goto error;
530  }
531 
532  /* Extract WAL location for this block */
533  cur_record_lsn = fe_recvint64(&copybuf[1]);
534 
535  if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
536  {
537  /*
538  * We've read past our endpoint, so prepare to go away being
539  * cautious about what happens to our output data.
540  */
541  if (!flushAndSendFeedback(conn, &now))
542  goto error;
544  time_to_abort = true;
545  break;
546  }
547 
548  output_written_lsn = Max(cur_record_lsn, output_written_lsn);
549 
550  bytes_left = r - hdr_len;
551  bytes_written = 0;
552 
553  /* signal that a fsync is needed */
554  output_needs_fsync = true;
555 
556  while (bytes_left)
557  {
558  int ret;
559 
560  ret = write(outfd,
561  copybuf + hdr_len + bytes_written,
562  bytes_left);
563 
564  if (ret < 0)
565  {
566  pg_log_error("could not write %d bytes to log file \"%s\": %m",
567  bytes_left, outfile);
568  goto error;
569  }
570 
571  /* Write was successful, advance our position */
572  bytes_written += ret;
573  bytes_left -= ret;
574  }
575 
576  if (write(outfd, "\n", 1) != 1)
577  {
578  pg_log_error("could not write %d bytes to log file \"%s\": %m",
579  1, outfile);
580  goto error;
581  }
582 
583  if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
584  {
585  /* endpos was exactly the record we just processed, we're done */
586  if (!flushAndSendFeedback(conn, &now))
587  goto error;
589  time_to_abort = true;
590  break;
591  }
592  }
593 
594  /* Clean up connection state if stream has been aborted */
595  if (time_to_abort)
596  prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
597 
598  res = PQgetResult(conn);
600  {
601  PQclear(res);
602 
603  /*
604  * We're doing a client-initiated clean exit and have sent CopyDone to
605  * the server. Drain any messages, so we don't miss a last-minute
606  * ErrorResponse. The walsender stops generating XLogData records once
607  * it sees CopyDone, so expect this to finish quickly. After CopyDone,
608  * it's too late for sendFeedback(), even if this were to take a long
609  * time. Hence, use synchronous-mode PQgetCopyData().
610  */
611  while (1)
612  {
613  int r;
614 
615  if (copybuf != NULL)
616  {
618  copybuf = NULL;
619  }
620  r = PQgetCopyData(conn, &copybuf, 0);
621  if (r == -1)
622  break;
623  if (r == -2)
624  {
625  pg_log_error("could not read COPY data: %s",
627  time_to_abort = false; /* unclean exit */
628  goto error;
629  }
630  }
631 
632  res = PQgetResult(conn);
633  }
635  {
636  pg_log_error("unexpected termination of replication stream: %s",
638  goto error;
639  }
640  PQclear(res);
641 
642  if (outfd != -1 && strcmp(outfile, "-") != 0)
643  {
645 
646  /* no need to jump to error on failure here, we're finishing anyway */
647  OutputFsync(t);
648 
649  if (close(outfd) != 0)
650  pg_log_error("could not close file \"%s\": %m", outfile);
651  }
652  outfd = -1;
653 error:
654  if (copybuf != NULL)
655  {
657  copybuf = NULL;
658  }
659  destroyPQExpBuffer(query);
660  PQfinish(conn);
661  conn = NULL;
662 }
663 
664 /*
665  * Unfortunately we can't do sensible signal handling on windows...
666  */
667 #ifndef WIN32
668 
669 /*
670  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
671  * possible moment.
672  */
673 static void
675 {
677  time_to_abort = true;
678 }
679 
680 /*
681  * Trigger the output file to be reopened.
682  */
683 static void
685 {
686  output_reopen = true;
687 }
688 #endif
689 
690 
691 int
692 main(int argc, char **argv)
693 {
694  static struct option long_options[] = {
695 /* general options */
696  {"file", required_argument, NULL, 'f'},
697  {"fsync-interval", required_argument, NULL, 'F'},
698  {"no-loop", no_argument, NULL, 'n'},
699  {"verbose", no_argument, NULL, 'v'},
700  {"two-phase", no_argument, NULL, 't'},
701  {"version", no_argument, NULL, 'V'},
702  {"help", no_argument, NULL, '?'},
703 /* connection options */
704  {"dbname", required_argument, NULL, 'd'},
705  {"host", required_argument, NULL, 'h'},
706  {"port", required_argument, NULL, 'p'},
707  {"username", required_argument, NULL, 'U'},
708  {"no-password", no_argument, NULL, 'w'},
709  {"password", no_argument, NULL, 'W'},
710 /* replication options */
711  {"startpos", required_argument, NULL, 'I'},
712  {"endpos", required_argument, NULL, 'E'},
713  {"option", required_argument, NULL, 'o'},
714  {"plugin", required_argument, NULL, 'P'},
715  {"status-interval", required_argument, NULL, 's'},
716  {"slot", required_argument, NULL, 'S'},
717 /* action */
718  {"create-slot", no_argument, NULL, 1},
719  {"start", no_argument, NULL, 2},
720  {"drop-slot", no_argument, NULL, 3},
721  {"if-not-exists", no_argument, NULL, 4},
722  {NULL, 0, NULL, 0}
723  };
724  int c;
725  int option_index;
726  uint32 hi,
727  lo;
728  char *db_name;
729 
730  pg_logging_init(argv[0]);
731  progname = get_progname(argv[0]);
732  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
733 
734  if (argc > 1)
735  {
736  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
737  {
738  usage();
739  exit(0);
740  }
741  else if (strcmp(argv[1], "-V") == 0 ||
742  strcmp(argv[1], "--version") == 0)
743  {
744  puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
745  exit(0);
746  }
747  }
748 
749  while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
750  long_options, &option_index)) != -1)
751  {
752  switch (c)
753  {
754 /* general options */
755  case 'f':
757  break;
758  case 'F':
759  if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
760  INT_MAX / 1000,
761  &fsync_interval))
762  exit(1);
763  fsync_interval *= 1000;
764  break;
765  case 'n':
766  noloop = 1;
767  break;
768  case 't':
769  two_phase = true;
770  break;
771  case 'v':
772  verbose++;
773  break;
774 /* connection options */
775  case 'd':
777  break;
778  case 'h':
780  break;
781  case 'p':
783  break;
784  case 'U':
786  break;
787  case 'w':
788  dbgetpassword = -1;
789  break;
790  case 'W':
791  dbgetpassword = 1;
792  break;
793 /* replication options */
794  case 'I':
795  if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
796  pg_fatal("could not parse start position \"%s\"", optarg);
797  startpos = ((uint64) hi) << 32 | lo;
798  break;
799  case 'E':
800  if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
801  pg_fatal("could not parse end position \"%s\"", optarg);
802  endpos = ((uint64) hi) << 32 | lo;
803  break;
804  case 'o':
805  {
806  char *data = pg_strdup(optarg);
807  char *val = strchr(data, '=');
808 
809  if (val != NULL)
810  {
811  /* remove =; separate data from val */
812  *val = '\0';
813  val++;
814  }
815 
816  noptions += 1;
817  options = pg_realloc(options, sizeof(char *) * noptions * 2);
818 
819  options[(noptions - 1) * 2] = data;
820  options[(noptions - 1) * 2 + 1] = val;
821  }
822 
823  break;
824  case 'P':
826  break;
827  case 's':
828  if (!option_parse_int(optarg, "-s/--status-interval", 0,
829  INT_MAX / 1000,
831  exit(1);
832  standby_message_timeout *= 1000;
833  break;
834  case 'S':
836  break;
837 /* action */
838  case 1:
839  do_create_slot = true;
840  break;
841  case 2:
842  do_start_slot = true;
843  break;
844  case 3:
845  do_drop_slot = true;
846  break;
847  case 4:
848  slot_exists_ok = true;
849  break;
850 
851  default:
852  /* getopt_long already emitted a complaint */
853  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
854  exit(1);
855  }
856  }
857 
858  /*
859  * Any non-option arguments?
860  */
861  if (optind < argc)
862  {
863  pg_log_error("too many command-line arguments (first is \"%s\")",
864  argv[optind]);
865  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
866  exit(1);
867  }
868 
869  /*
870  * Required arguments
871  */
872  if (replication_slot == NULL)
873  {
874  pg_log_error("no slot specified");
875  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
876  exit(1);
877  }
878 
879  if (do_start_slot && outfile == NULL)
880  {
881  pg_log_error("no target file specified");
882  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
883  exit(1);
884  }
885 
886  if (!do_drop_slot && dbname == NULL)
887  {
888  pg_log_error("no database specified");
889  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
890  exit(1);
891  }
892 
894  {
895  pg_log_error("at least one action needs to be specified");
896  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
897  exit(1);
898  }
899 
901  {
902  pg_log_error("cannot use --create-slot or --start together with --drop-slot");
903  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
904  exit(1);
905  }
906 
908  {
909  pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
910  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
911  exit(1);
912  }
913 
915  {
916  pg_log_error("--endpos may only be specified with --start");
917  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
918  exit(1);
919  }
920 
921  if (two_phase && !do_create_slot)
922  {
923  pg_log_error("--two-phase may only be specified with --create-slot");
924  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
925  exit(1);
926  }
927 
928  /*
929  * Obtain a connection to server. Notably, if we need a password, we want
930  * to collect it from the user immediately.
931  */
932  conn = GetConnection();
933  if (!conn)
934  /* Error message already written in GetConnection() */
935  exit(1);
936  atexit(disconnect_atexit);
937 
938  /*
939  * Trap signals. (Don't do this until after the initial password prompt,
940  * if one is needed, in GetConnection.)
941  */
942 #ifndef WIN32
943  pqsignal(SIGINT, sigexit_handler);
944  pqsignal(SIGTERM, sigexit_handler);
946 #endif
947 
948  /*
949  * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
950  * replication connection.
951  */
952  if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
953  exit(1);
954 
955  if (db_name == NULL)
956  pg_fatal("could not establish database-specific replication connection");
957 
958  /*
959  * Set umask so that directories/files are created with the same
960  * permissions as directories/files in the source data directory.
961  *
962  * pg_mode_mask is set to owner-only by default and then updated in
963  * GetConnection() where we get the mode from the server-side with
964  * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
965  */
966  umask(pg_mode_mask);
967 
968  /* Drop a replication slot. */
969  if (do_drop_slot)
970  {
971  if (verbose)
972  pg_log_info("dropping replication slot \"%s\"", replication_slot);
973 
975  exit(1);
976  }
977 
978  /* Create a replication slot. */
979  if (do_create_slot)
980  {
981  if (verbose)
982  pg_log_info("creating replication slot \"%s\"", replication_slot);
983 
985  false, false, slot_exists_ok, two_phase))
986  exit(1);
988  }
989 
990  if (!do_start_slot)
991  exit(0);
992 
993  /* Stream loop */
994  while (true)
995  {
997  if (time_to_abort)
998  {
999  /*
1000  * We've been Ctrl-C'ed or reached an exit limit condition. That's
1001  * not an error, so exit without an errorcode.
1002  */
1003  exit(0);
1004  }
1005  else if (noloop)
1006  pg_fatal("disconnected");
1007  else
1008  {
1009  /* translator: check source for value for %d */
1010  pg_log_info("disconnected; waiting %d seconds to try again",
1012  pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1013  }
1014  }
1015 }
1016 
1017 /*
1018  * Fsync our output data, and send a feedback message to the server. Returns
1019  * true if successful, false otherwise.
1020  *
1021  * If successful, *now is updated to the current timestamp just before sending
1022  * feedback.
1023  */
1024 static bool
1026 {
1027  /* flush data to disk, so that we send a recent flush pointer */
1028  if (!OutputFsync(*now))
1029  return false;
1031  if (!sendFeedback(conn, *now, true, false))
1032  return false;
1033 
1034  return true;
1035 }
1036 
1037 /*
1038  * Try to inform the server about our upcoming demise, but don't wait around or
1039  * retry on failure.
1040  */
1041 static void
1043  XLogRecPtr lsn)
1044 {
1045  (void) PQputCopyEnd(conn, NULL);
1046  (void) PQflush(conn);
1047 
1048  if (verbose)
1049  {
1050  switch (reason)
1051  {
1052  case STREAM_STOP_SIGNAL:
1053  pg_log_info("received interrupt signal, exiting");
1054  break;
1055  case STREAM_STOP_KEEPALIVE:
1056  pg_log_info("end position %X/%X reached by keepalive",
1058  break;
1060  Assert(!XLogRecPtrIsInvalid(lsn));
1061  pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1063  break;
1064  case STREAM_STOP_NONE:
1065  Assert(false);
1066  break;
1067  }
1068  }
1069 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
unsigned int uint32
Definition: c.h:506
#define Max(x, y)
Definition: c.h:998
#define SIGNAL_ARGS
Definition: c.h:1345
#define Assert(condition)
Definition: c.h:858
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1214
#define PG_BINARY
Definition: c.h:1273
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:448
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:177
int64 TimestampTz
Definition: timestamp.h:39
#define _(x)
Definition: elog.c:90
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7141
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4862
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7167
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2749
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2695
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2816
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
int pg_mode_mask
Definition: file_perm.c:25
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:24
#define required_argument
Definition: getopt_long.h:25
long val
Definition: informix.c:670
#define close(a)
Definition: win32.h:12
#define write(a, b, c)
Definition: win32.h:14
int i
Definition: isn.c:73
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:112
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
@ PGRES_COPY_OUT
Definition: libpq-fe.h:106
exit(1)
void pg_logging_init(const char *argv0)
Definition: logging.c:83
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_info(...)
Definition: logging.h:124
const char * progname
Definition: main.c:44
bool option_parse_int(const char *optarg, const char *optname, int min_range, int max_range, int *result)
Definition: option_utils.c:50
#define pg_fatal(...)
const void size_t len
const void * data
PGDLLIMPORT int optind
Definition: getopt.c:50
PGDLLIMPORT char * optarg
Definition: getopt.c:52
StreamStopReason
@ STREAM_STOP_KEEPALIVE
@ STREAM_STOP_END_OF_WAL
@ STREAM_STOP_NONE
@ STREAM_STOP_SIGNAL
static volatile sig_atomic_t stop_reason
static int verbose
static int noloop
static bool do_start_slot
static bool OutputFsync(TimestampTz now)
static void sighup_handler(SIGNAL_ARGS)
static bool two_phase
static void StreamLogicalLog(void)
static bool output_needs_fsync
static XLogRecPtr endpos
int main(int argc, char **argv)
static const char * plugin
static volatile sig_atomic_t time_to_abort
static bool slot_exists_ok
static int fsync_interval
static bool do_drop_slot
static bool output_isfile
static char * replication_slot
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now)
static XLogRecPtr output_written_lsn
static void sigexit_handler(SIGNAL_ARGS)
static int outfd
static TimestampTz output_last_fsync
static size_t noptions
static char * outfile
static XLogRecPtr output_fsync_lsn
static bool sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
static void usage(void)
static XLogRecPtr startpos
#define RECONNECT_SLEEP_TIME
static bool do_create_slot
static void disconnect_atexit(void)
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason, XLogRecPtr lsn)
static int standby_message_timeout
static volatile sig_atomic_t output_reopen
static char ** options
const char * get_progname(const char *argv0)
Definition: path.c:574
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define printf(...)
Definition: port.h:244
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
char * c
void pg_usleep(long microsec)
Definition: signal.c:53
static void error(void)
Definition: sql-dyntest.c:147
int dbgetpassword
Definition: streamutil.c:53
char * dbhost
Definition: streamutil.c:49
int64 fe_recvint64(char *buf)
Definition: streamutil.c:931
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:866
char * dbport
Definition: streamutil.c:51
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:885
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:920
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:907
char * dbname
Definition: streamutil.c:52
PGconn * conn
Definition: streamutil.c:55
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:477
char * dbuser
Definition: streamutil.c:50
unsigned short st_mode
Definition: win32_port.h:268
static StringInfo copybuf
Definition: tablesync.c:130
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1200
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1405
#define fsync(fd)
Definition: win32_port.h:85
#define SIGHUP
Definition: win32_port.h:168
#define EINTR
Definition: win32_port.h:374
#define S_IRUSR
Definition: win32_port.h:289
#define fstat
Definition: win32_port.h:283
#define S_ISREG(m)
Definition: win32_port.h:328
#define S_IWUSR
Definition: win32_port.h:292
#define select(n, r, w, e, timeout)
Definition: win32_port.h:495
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28