PostgreSQL Source Code  git master
pg_recvlogical.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4  * fashion and write it to a local file.
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/bin/pg_basebackup/pg_recvlogical.c
10  *-------------------------------------------------------------------------
11  */
12 
13 #include "postgres_fe.h"
14 
15 #include <dirent.h>
16 #include <limits.h>
17 #include <sys/select.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 
21 #include "common/file_perm.h"
22 #include "common/logging.h"
23 #include "fe_utils/option_utils.h"
24 #include "getopt_long.h"
25 #include "libpq-fe.h"
26 #include "libpq/pqsignal.h"
27 #include "pqexpbuffer.h"
28 #include "streamutil.h"
29 
30 /* Time to sleep between reconnection attempts */
31 #define RECONNECT_SLEEP_TIME 5
32 
33 typedef enum
34 {
40 
41 /* Global Options */
42 static char *outfile = NULL;
43 static int verbose = 0;
44 static bool two_phase = false;
45 static int noloop = 0;
46 static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
47 static int fsync_interval = 10 * 1000; /* 10 sec = default */
50 static bool do_create_slot = false;
51 static bool slot_exists_ok = false;
52 static bool do_start_slot = false;
53 static bool do_drop_slot = false;
54 static char *replication_slot = NULL;
55 
56 /* filled pairwise with option, value. value may be NULL */
57 static char **options;
58 static size_t noptions = 0;
59 static const char *plugin = "test_decoding";
60 
61 /* Global State */
62 static int outfd = -1;
63 static volatile sig_atomic_t time_to_abort = false;
64 static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
65 static volatile sig_atomic_t output_reopen = false;
66 static bool output_isfile;
68 static bool output_needs_fsync = false;
71 
72 static void usage(void);
73 static void StreamLogicalLog(void);
76  StreamStopReason reason,
77  XLogRecPtr lsn);
78 
79 static void
80 usage(void)
81 {
82  printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
83  progname);
84  printf(_("Usage:\n"));
85  printf(_(" %s [OPTION]...\n"), progname);
86  printf(_("\nAction to be performed:\n"));
87  printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
88  printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
89  printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
90  printf(_("\nOptions:\n"));
91  printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
92  printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
93  printf(_(" -F --fsync-interval=SECS\n"
94  " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
95  printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
96  printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
97  printf(_(" -n, --no-loop do not loop on connection lost\n"));
98  printf(_(" -o, --option=NAME[=VALUE]\n"
99  " pass option NAME with optional value VALUE to the\n"
100  " output plugin\n"));
101  printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
102  printf(_(" -s, --status-interval=SECS\n"
103  " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
104  printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
105  printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
106  printf(_(" -v, --verbose output verbose messages\n"));
107  printf(_(" -V, --version output version information, then exit\n"));
108  printf(_(" -?, --help show this help, then exit\n"));
109  printf(_("\nConnection options:\n"));
110  printf(_(" -d, --dbname=DBNAME database to connect to\n"));
111  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
112  printf(_(" -p, --port=PORT database server port number\n"));
113  printf(_(" -U, --username=NAME connect as specified database user\n"));
114  printf(_(" -w, --no-password never prompt for password\n"));
115  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
116  printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
117  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
118 }
119 
120 /*
121  * Send a Standby Status Update message to server.
122  */
123 static bool
124 sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
125 {
126  static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
127  static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
128 
129  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
130  int len = 0;
131 
132  /*
133  * we normally don't want to send superfluous feedback, but if it's
134  * because of a timeout we need to, otherwise wal_sender_timeout will kill
135  * us.
136  */
137  if (!force &&
138  last_written_lsn == output_written_lsn &&
139  last_fsync_lsn == output_fsync_lsn)
140  return true;
141 
142  if (verbose)
143  pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
147 
148  replybuf[len] = 'r';
149  len += 1;
150  fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
151  len += 8;
152  fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
153  len += 8;
154  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
155  len += 8;
156  fe_sendint64(now, &replybuf[len]); /* sendTime */
157  len += 8;
158  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
159  len += 1;
160 
162  last_written_lsn = output_written_lsn;
163  last_fsync_lsn = output_fsync_lsn;
164 
165  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
166  {
167  pg_log_error("could not send feedback packet: %s",
169  return false;
170  }
171 
172  return true;
173 }
174 
175 static void
177 {
178  if (conn != NULL)
179  PQfinish(conn);
180 }
181 
182 static bool
184 {
186 
188 
189  if (fsync_interval <= 0)
190  return true;
191 
192  if (!output_needs_fsync)
193  return true;
194 
195  output_needs_fsync = false;
196 
197  /* can only fsync if it's a regular file */
198  if (!output_isfile)
199  return true;
200 
201  if (fsync(outfd) != 0)
202  pg_fatal("could not fsync file \"%s\": %m", outfile);
203 
204  return true;
205 }
206 
207 /*
208  * Start the log streaming
209  */
210 static void
212 {
213  PGresult *res;
214  char *copybuf = NULL;
215  TimestampTz last_status = -1;
216  int i;
217  PQExpBuffer query;
218  XLogRecPtr cur_record_lsn;
219 
222  cur_record_lsn = InvalidXLogRecPtr;
223 
224  /*
225  * Connect in replication mode to the server
226  */
227  if (!conn)
228  conn = GetConnection();
229  if (!conn)
230  /* Error message already written in GetConnection() */
231  return;
232 
233  /*
234  * Start the replication
235  */
236  if (verbose)
237  pg_log_info("starting log streaming at %X/%X (slot %s)",
240 
241  /* Initiate the replication stream at specified location */
242  query = createPQExpBuffer();
243  appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
245 
246  /* print options if there are any */
247  if (noptions)
248  appendPQExpBufferStr(query, " (");
249 
250  for (i = 0; i < noptions; i++)
251  {
252  /* separator */
253  if (i > 0)
254  appendPQExpBufferStr(query, ", ");
255 
256  /* write option name */
257  appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
258 
259  /* write option value if specified */
260  if (options[(i * 2) + 1] != NULL)
261  appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
262  }
263 
264  if (noptions)
265  appendPQExpBufferChar(query, ')');
266 
267  res = PQexec(conn, query->data);
269  {
270  pg_log_error("could not send replication command \"%s\": %s",
271  query->data, PQresultErrorMessage(res));
272  PQclear(res);
273  goto error;
274  }
275  PQclear(res);
276  resetPQExpBuffer(query);
277 
278  if (verbose)
279  pg_log_info("streaming initiated");
280 
281  while (!time_to_abort)
282  {
283  int r;
284  int bytes_left;
285  int bytes_written;
287  int hdr_len;
288 
289  cur_record_lsn = InvalidXLogRecPtr;
290 
291  if (copybuf != NULL)
292  {
294  copybuf = NULL;
295  }
296 
297  /*
298  * Potentially send a status message to the primary.
299  */
301 
302  if (outfd != -1 &&
305  {
306  if (!OutputFsync(now))
307  goto error;
308  }
309 
310  if (standby_message_timeout > 0 &&
311  feTimestampDifferenceExceeds(last_status, now,
313  {
314  /* Time to send feedback! */
315  if (!sendFeedback(conn, now, true, false))
316  goto error;
317 
318  last_status = now;
319  }
320 
321  /* got SIGHUP, close output file */
322  if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
323  {
325  if (!OutputFsync(now))
326  goto error;
327  close(outfd);
328  outfd = -1;
329  }
330  output_reopen = false;
331 
332  /* open the output file, if not open yet */
333  if (outfd == -1)
334  {
335  struct stat statbuf;
336 
337  if (strcmp(outfile, "-") == 0)
338  outfd = fileno(stdout);
339  else
340  outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
341  S_IRUSR | S_IWUSR);
342  if (outfd == -1)
343  {
344  pg_log_error("could not open log file \"%s\": %m", outfile);
345  goto error;
346  }
347 
348  if (fstat(outfd, &statbuf) != 0)
349  {
350  pg_log_error("could not stat file \"%s\": %m", outfile);
351  goto error;
352  }
353 
354  output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
355  }
356 
357  r = PQgetCopyData(conn, &copybuf, 1);
358  if (r == 0)
359  {
360  /*
361  * In async mode, and no data available. We block on reading but
362  * not more than the specified timeout, so that we can send a
363  * response back to the client.
364  */
365  fd_set input_mask;
366  TimestampTz message_target = 0;
367  TimestampTz fsync_target = 0;
368  struct timeval timeout;
369  struct timeval *timeoutptr = NULL;
370 
371  if (PQsocket(conn) < 0)
372  {
373  pg_log_error("invalid socket: %s", PQerrorMessage(conn));
374  goto error;
375  }
376 
377  FD_ZERO(&input_mask);
378  FD_SET(PQsocket(conn), &input_mask);
379 
380  /* Compute when we need to wakeup to send a keepalive message. */
382  message_target = last_status + (standby_message_timeout - 1) *
383  ((int64) 1000);
384 
385  /* Compute when we need to wakeup to fsync the output file. */
387  fsync_target = output_last_fsync + (fsync_interval - 1) *
388  ((int64) 1000);
389 
390  /* Now compute when to wakeup. */
391  if (message_target > 0 || fsync_target > 0)
392  {
393  TimestampTz targettime;
394  long secs;
395  int usecs;
396 
397  targettime = message_target;
398 
399  if (fsync_target > 0 && fsync_target < targettime)
400  targettime = fsync_target;
401 
403  targettime,
404  &secs,
405  &usecs);
406  if (secs <= 0)
407  timeout.tv_sec = 1; /* Always sleep at least 1 sec */
408  else
409  timeout.tv_sec = secs;
410  timeout.tv_usec = usecs;
411  timeoutptr = &timeout;
412  }
413 
414  r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
415  if (r == 0 || (r < 0 && errno == EINTR))
416  {
417  /*
418  * Got a timeout or signal. Continue the loop and either
419  * deliver a status packet to the server or just go back into
420  * blocking.
421  */
422  continue;
423  }
424  else if (r < 0)
425  {
426  pg_log_error("%s() failed: %m", "select");
427  goto error;
428  }
429 
430  /* Else there is actually data on the socket */
431  if (PQconsumeInput(conn) == 0)
432  {
433  pg_log_error("could not receive data from WAL stream: %s",
435  goto error;
436  }
437  continue;
438  }
439 
440  /* End of copy stream */
441  if (r == -1)
442  break;
443 
444  /* Failure while reading the copy stream */
445  if (r == -2)
446  {
447  pg_log_error("could not read COPY data: %s",
449  goto error;
450  }
451 
452  /* Check the message type. */
453  if (copybuf[0] == 'k')
454  {
455  int pos;
456  bool replyRequested;
457  XLogRecPtr walEnd;
458  bool endposReached = false;
459 
460  /*
461  * Parse the keepalive message, enclosed in the CopyData message.
462  * We just check if the server requested a reply, and ignore the
463  * rest.
464  */
465  pos = 1; /* skip msgtype 'k' */
466  walEnd = fe_recvint64(&copybuf[pos]);
468 
469  pos += 8; /* read walEnd */
470 
471  pos += 8; /* skip sendTime */
472 
473  if (r < pos + 1)
474  {
475  pg_log_error("streaming header too small: %d", r);
476  goto error;
477  }
478  replyRequested = copybuf[pos];
479 
480  if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
481  {
482  /*
483  * If there's nothing to read on the socket until a keepalive
484  * we know that the server has nothing to send us; and if
485  * walEnd has passed endpos, we know nothing else can have
486  * committed before endpos. So we can bail out now.
487  */
488  endposReached = true;
489  }
490 
491  /* Send a reply, if necessary */
492  if (replyRequested || endposReached)
493  {
494  if (!flushAndSendFeedback(conn, &now))
495  goto error;
496  last_status = now;
497  }
498 
499  if (endposReached)
500  {
502  time_to_abort = true;
503  break;
504  }
505 
506  continue;
507  }
508  else if (copybuf[0] != 'w')
509  {
510  pg_log_error("unrecognized streaming header: \"%c\"",
511  copybuf[0]);
512  goto error;
513  }
514 
515  /*
516  * Read the header of the XLogData message, enclosed in the CopyData
517  * message. We only need the WAL location field (dataStart), the rest
518  * of the header is ignored.
519  */
520  hdr_len = 1; /* msgtype 'w' */
521  hdr_len += 8; /* dataStart */
522  hdr_len += 8; /* walEnd */
523  hdr_len += 8; /* sendTime */
524  if (r < hdr_len + 1)
525  {
526  pg_log_error("streaming header too small: %d", r);
527  goto error;
528  }
529 
530  /* Extract WAL location for this block */
531  cur_record_lsn = fe_recvint64(&copybuf[1]);
532 
533  if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
534  {
535  /*
536  * We've read past our endpoint, so prepare to go away being
537  * cautious about what happens to our output data.
538  */
539  if (!flushAndSendFeedback(conn, &now))
540  goto error;
542  time_to_abort = true;
543  break;
544  }
545 
546  output_written_lsn = Max(cur_record_lsn, output_written_lsn);
547 
548  bytes_left = r - hdr_len;
549  bytes_written = 0;
550 
551  /* signal that a fsync is needed */
552  output_needs_fsync = true;
553 
554  while (bytes_left)
555  {
556  int ret;
557 
558  ret = write(outfd,
559  copybuf + hdr_len + bytes_written,
560  bytes_left);
561 
562  if (ret < 0)
563  {
564  pg_log_error("could not write %d bytes to log file \"%s\": %m",
565  bytes_left, outfile);
566  goto error;
567  }
568 
569  /* Write was successful, advance our position */
570  bytes_written += ret;
571  bytes_left -= ret;
572  }
573 
574  if (write(outfd, "\n", 1) != 1)
575  {
576  pg_log_error("could not write %d bytes to log file \"%s\": %m",
577  1, outfile);
578  goto error;
579  }
580 
581  if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
582  {
583  /* endpos was exactly the record we just processed, we're done */
584  if (!flushAndSendFeedback(conn, &now))
585  goto error;
587  time_to_abort = true;
588  break;
589  }
590  }
591 
592  /* Clean up connection state if stream has been aborted */
593  if (time_to_abort)
594  prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
595 
596  res = PQgetResult(conn);
598  {
599  PQclear(res);
600 
601  /*
602  * We're doing a client-initiated clean exit and have sent CopyDone to
603  * the server. Drain any messages, so we don't miss a last-minute
604  * ErrorResponse. The walsender stops generating XLogData records once
605  * it sees CopyDone, so expect this to finish quickly. After CopyDone,
606  * it's too late for sendFeedback(), even if this were to take a long
607  * time. Hence, use synchronous-mode PQgetCopyData().
608  */
609  while (1)
610  {
611  int r;
612 
613  if (copybuf != NULL)
614  {
616  copybuf = NULL;
617  }
618  r = PQgetCopyData(conn, &copybuf, 0);
619  if (r == -1)
620  break;
621  if (r == -2)
622  {
623  pg_log_error("could not read COPY data: %s",
625  time_to_abort = false; /* unclean exit */
626  goto error;
627  }
628  }
629 
630  res = PQgetResult(conn);
631  }
633  {
634  pg_log_error("unexpected termination of replication stream: %s",
636  goto error;
637  }
638  PQclear(res);
639 
640  if (outfd != -1 && strcmp(outfile, "-") != 0)
641  {
643 
644  /* no need to jump to error on failure here, we're finishing anyway */
645  OutputFsync(t);
646 
647  if (close(outfd) != 0)
648  pg_log_error("could not close file \"%s\": %m", outfile);
649  }
650  outfd = -1;
651 error:
652  if (copybuf != NULL)
653  {
655  copybuf = NULL;
656  }
657  destroyPQExpBuffer(query);
658  PQfinish(conn);
659  conn = NULL;
660 }
661 
662 /*
663  * Unfortunately we can't do sensible signal handling on windows...
664  */
665 #ifndef WIN32
666 
667 /*
668  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
669  * possible moment.
670  */
671 static void
673 {
675  time_to_abort = true;
676 }
677 
678 /*
679  * Trigger the output file to be reopened.
680  */
681 static void
683 {
684  output_reopen = true;
685 }
686 #endif
687 
688 
689 int
690 main(int argc, char **argv)
691 {
692  static struct option long_options[] = {
693 /* general options */
694  {"file", required_argument, NULL, 'f'},
695  {"fsync-interval", required_argument, NULL, 'F'},
696  {"no-loop", no_argument, NULL, 'n'},
697  {"verbose", no_argument, NULL, 'v'},
698  {"two-phase", no_argument, NULL, 't'},
699  {"version", no_argument, NULL, 'V'},
700  {"help", no_argument, NULL, '?'},
701 /* connection options */
702  {"dbname", required_argument, NULL, 'd'},
703  {"host", required_argument, NULL, 'h'},
704  {"port", required_argument, NULL, 'p'},
705  {"username", required_argument, NULL, 'U'},
706  {"no-password", no_argument, NULL, 'w'},
707  {"password", no_argument, NULL, 'W'},
708 /* replication options */
709  {"startpos", required_argument, NULL, 'I'},
710  {"endpos", required_argument, NULL, 'E'},
711  {"option", required_argument, NULL, 'o'},
712  {"plugin", required_argument, NULL, 'P'},
713  {"status-interval", required_argument, NULL, 's'},
714  {"slot", required_argument, NULL, 'S'},
715 /* action */
716  {"create-slot", no_argument, NULL, 1},
717  {"start", no_argument, NULL, 2},
718  {"drop-slot", no_argument, NULL, 3},
719  {"if-not-exists", no_argument, NULL, 4},
720  {NULL, 0, NULL, 0}
721  };
722  int c;
723  int option_index;
724  uint32 hi,
725  lo;
726  char *db_name;
727 
728  pg_logging_init(argv[0]);
729  progname = get_progname(argv[0]);
730  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
731 
732  if (argc > 1)
733  {
734  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
735  {
736  usage();
737  exit(0);
738  }
739  else if (strcmp(argv[1], "-V") == 0 ||
740  strcmp(argv[1], "--version") == 0)
741  {
742  puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
743  exit(0);
744  }
745  }
746 
747  while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
748  long_options, &option_index)) != -1)
749  {
750  switch (c)
751  {
752 /* general options */
753  case 'f':
755  break;
756  case 'F':
757  if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
758  INT_MAX / 1000,
759  &fsync_interval))
760  exit(1);
761  fsync_interval *= 1000;
762  break;
763  case 'n':
764  noloop = 1;
765  break;
766  case 't':
767  two_phase = true;
768  break;
769  case 'v':
770  verbose++;
771  break;
772 /* connection options */
773  case 'd':
775  break;
776  case 'h':
778  break;
779  case 'p':
781  break;
782  case 'U':
784  break;
785  case 'w':
786  dbgetpassword = -1;
787  break;
788  case 'W':
789  dbgetpassword = 1;
790  break;
791 /* replication options */
792  case 'I':
793  if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
794  pg_fatal("could not parse start position \"%s\"", optarg);
795  startpos = ((uint64) hi) << 32 | lo;
796  break;
797  case 'E':
798  if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
799  pg_fatal("could not parse end position \"%s\"", optarg);
800  endpos = ((uint64) hi) << 32 | lo;
801  break;
802  case 'o':
803  {
804  char *data = pg_strdup(optarg);
805  char *val = strchr(data, '=');
806 
807  if (val != NULL)
808  {
809  /* remove =; separate data from val */
810  *val = '\0';
811  val++;
812  }
813 
814  noptions += 1;
815  options = pg_realloc(options, sizeof(char *) * noptions * 2);
816 
817  options[(noptions - 1) * 2] = data;
818  options[(noptions - 1) * 2 + 1] = val;
819  }
820 
821  break;
822  case 'P':
824  break;
825  case 's':
826  if (!option_parse_int(optarg, "-s/--status-interval", 0,
827  INT_MAX / 1000,
829  exit(1);
830  standby_message_timeout *= 1000;
831  break;
832  case 'S':
834  break;
835 /* action */
836  case 1:
837  do_create_slot = true;
838  break;
839  case 2:
840  do_start_slot = true;
841  break;
842  case 3:
843  do_drop_slot = true;
844  break;
845  case 4:
846  slot_exists_ok = true;
847  break;
848 
849  default:
850  /* getopt_long already emitted a complaint */
851  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
852  exit(1);
853  }
854  }
855 
856  /*
857  * Any non-option arguments?
858  */
859  if (optind < argc)
860  {
861  pg_log_error("too many command-line arguments (first is \"%s\")",
862  argv[optind]);
863  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
864  exit(1);
865  }
866 
867  /*
868  * Required arguments
869  */
870  if (replication_slot == NULL)
871  {
872  pg_log_error("no slot specified");
873  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
874  exit(1);
875  }
876 
877  if (do_start_slot && outfile == NULL)
878  {
879  pg_log_error("no target file specified");
880  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
881  exit(1);
882  }
883 
884  if (!do_drop_slot && dbname == NULL)
885  {
886  pg_log_error("no database specified");
887  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
888  exit(1);
889  }
890 
892  {
893  pg_log_error("at least one action needs to be specified");
894  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
895  exit(1);
896  }
897 
899  {
900  pg_log_error("cannot use --create-slot or --start together with --drop-slot");
901  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
902  exit(1);
903  }
904 
906  {
907  pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
908  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
909  exit(1);
910  }
911 
913  {
914  pg_log_error("--endpos may only be specified with --start");
915  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
916  exit(1);
917  }
918 
919  if (two_phase && !do_create_slot)
920  {
921  pg_log_error("--two-phase may only be specified with --create-slot");
922  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
923  exit(1);
924  }
925 
926  /*
927  * Obtain a connection to server. Notably, if we need a password, we want
928  * to collect it from the user immediately.
929  */
930  conn = GetConnection();
931  if (!conn)
932  /* Error message already written in GetConnection() */
933  exit(1);
934  atexit(disconnect_atexit);
935 
936  /*
937  * Trap signals. (Don't do this until after the initial password prompt,
938  * if one is needed, in GetConnection.)
939  */
940 #ifndef WIN32
941  pqsignal(SIGINT, sigexit_handler);
942  pqsignal(SIGTERM, sigexit_handler);
944 #endif
945 
946  /*
947  * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
948  * replication connection.
949  */
950  if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
951  exit(1);
952 
953  if (db_name == NULL)
954  pg_fatal("could not establish database-specific replication connection");
955 
956  /*
957  * Set umask so that directories/files are created with the same
958  * permissions as directories/files in the source data directory.
959  *
960  * pg_mode_mask is set to owner-only by default and then updated in
961  * GetConnection() where we get the mode from the server-side with
962  * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
963  */
964  umask(pg_mode_mask);
965 
966  /* Drop a replication slot. */
967  if (do_drop_slot)
968  {
969  if (verbose)
970  pg_log_info("dropping replication slot \"%s\"", replication_slot);
971 
973  exit(1);
974  }
975 
976  /* Create a replication slot. */
977  if (do_create_slot)
978  {
979  if (verbose)
980  pg_log_info("creating replication slot \"%s\"", replication_slot);
981 
983  false, false, slot_exists_ok, two_phase))
984  exit(1);
986  }
987 
988  if (!do_start_slot)
989  exit(0);
990 
991  /* Stream loop */
992  while (true)
993  {
995  if (time_to_abort)
996  {
997  /*
998  * We've been Ctrl-C'ed or reached an exit limit condition. That's
999  * not an error, so exit without an errorcode.
1000  */
1001  exit(0);
1002  }
1003  else if (noloop)
1004  pg_fatal("disconnected");
1005  else
1006  {
1007  /* translator: check source for value for %d */
1008  pg_log_info("disconnected; waiting %d seconds to try again",
1010  pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1011  }
1012  }
1013 }
1014 
1015 /*
1016  * Fsync our output data, and send a feedback message to the server. Returns
1017  * true if successful, false otherwise.
1018  *
1019  * If successful, *now is updated to the current timestamp just before sending
1020  * feedback.
1021  */
1022 static bool
1024 {
1025  /* flush data to disk, so that we send a recent flush pointer */
1026  if (!OutputFsync(*now))
1027  return false;
1029  if (!sendFeedback(conn, *now, true, false))
1030  return false;
1031 
1032  return true;
1033 }
1034 
1035 /*
1036  * Try to inform the server about our upcoming demise, but don't wait around or
1037  * retry on failure.
1038  */
1039 static void
1041  XLogRecPtr lsn)
1042 {
1043  (void) PQputCopyEnd(conn, NULL);
1044  (void) PQflush(conn);
1045 
1046  if (verbose)
1047  {
1048  switch (reason)
1049  {
1050  case STREAM_STOP_SIGNAL:
1051  pg_log_info("received interrupt signal, exiting");
1052  break;
1053  case STREAM_STOP_KEEPALIVE:
1054  pg_log_info("end position %X/%X reached by keepalive",
1056  break;
1058  Assert(!XLogRecPtrIsInvalid(lsn));
1059  pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1061  break;
1062  case STREAM_STOP_NONE:
1063  Assert(false);
1064  break;
1065  }
1066  }
1067 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
unsigned int uint32
Definition: c.h:518
#define Max(x, y)
Definition: c.h:1001
#define SIGNAL_ARGS
Definition: c.h:1348
#define Assert(condition)
Definition: c.h:861
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1217
#define PG_BINARY
Definition: c.h:1276
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:429
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:191
int64 TimestampTz
Definition: timestamp.h:39
#define _(x)
Definition: elog.c:90
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7198
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4879
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7224
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2749
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2695
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2816
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
int pg_mode_mask
Definition: file_perm.c:25
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:24
#define required_argument
Definition: getopt_long.h:25
long val
Definition: informix.c:689
#define close(a)
Definition: win32.h:12
#define write(a, b, c)
Definition: win32.h:14
int i
Definition: isn.c:72
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:132
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
@ PGRES_COPY_OUT
Definition: libpq-fe.h:126
exit(1)
void pg_logging_init(const char *argv0)
Definition: logging.c:83
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_info(...)
Definition: logging.h:124
const char * progname
Definition: main.c:43
bool option_parse_int(const char *optarg, const char *optname, int min_range, int max_range, int *result)
Definition: option_utils.c:50
#define pg_fatal(...)
const void size_t len
const void * data
PGDLLIMPORT int optind
Definition: getopt.c:51
PGDLLIMPORT char * optarg
Definition: getopt.c:53
StreamStopReason
@ STREAM_STOP_KEEPALIVE
@ STREAM_STOP_END_OF_WAL
@ STREAM_STOP_NONE
@ STREAM_STOP_SIGNAL
static volatile sig_atomic_t stop_reason
static int verbose
static int noloop
static bool do_start_slot
static bool OutputFsync(TimestampTz now)
static void sighup_handler(SIGNAL_ARGS)
static bool two_phase
static void StreamLogicalLog(void)
static bool output_needs_fsync
static XLogRecPtr endpos
int main(int argc, char **argv)
static const char * plugin
static volatile sig_atomic_t time_to_abort
static bool slot_exists_ok
static int fsync_interval
static bool do_drop_slot
static bool output_isfile
static char * replication_slot
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now)
static XLogRecPtr output_written_lsn
static void sigexit_handler(SIGNAL_ARGS)
static int outfd
static TimestampTz output_last_fsync
static size_t noptions
static char * outfile
static XLogRecPtr output_fsync_lsn
static bool sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
static void usage(void)
static XLogRecPtr startpos
#define RECONNECT_SLEEP_TIME
static bool do_create_slot
static void disconnect_atexit(void)
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason, XLogRecPtr lsn)
static int standby_message_timeout
static volatile sig_atomic_t output_reopen
static char ** options
const char * get_progname(const char *argv0)
Definition: path.c:575
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define printf(...)
Definition: port.h:244
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
char * c
void pg_usleep(long microsec)
Definition: signal.c:53
static void error(void)
Definition: sql-dyntest.c:147
int dbgetpassword
Definition: streamutil.c:51
char * dbhost
Definition: streamutil.c:47
int64 fe_recvint64(char *buf)
Definition: streamutil.c:932
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:867
char * dbport
Definition: streamutil.c:49
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:886
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:921
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:908
char * dbname
Definition: streamutil.c:50
PGconn * conn
Definition: streamutil.c:53
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:478
char * dbuser
Definition: streamutil.c:48
unsigned short st_mode
Definition: win32_port.h:268
static StringInfo copybuf
Definition: tablesync.c:137
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1171
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1376
#define fsync(fd)
Definition: win32_port.h:85
#define SIGHUP
Definition: win32_port.h:168
#define EINTR
Definition: win32_port.h:374
#define S_IRUSR
Definition: win32_port.h:289
#define fstat
Definition: win32_port.h:283
#define S_ISREG(m)
Definition: win32_port.h:328
#define S_IWUSR
Definition: win32_port.h:292
#define select(n, r, w, e, timeout)
Definition: win32_port.h:513
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28