PostgreSQL Source Code  git master
pg_receivewal.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_receivewal.c - receive streaming WAL data and write it
4  * to a local file.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/pg_receivewal.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <dirent.h>
18 #include <limits.h>
19 #include <signal.h>
20 #include <sys/stat.h>
21 #include <unistd.h>
22 
23 #ifdef USE_LZ4
24 #include <lz4frame.h>
25 #endif
26 #ifdef HAVE_LIBZ
27 #include <zlib.h>
28 #endif
29 
30 #include "access/xlog_internal.h"
31 #include "common/file_perm.h"
32 #include "common/logging.h"
33 #include "fe_utils/option_utils.h"
34 #include "getopt_long.h"
35 #include "libpq-fe.h"
36 #include "receivelog.h"
37 #include "streamutil.h"
38 
39 /* Time to sleep between reconnection attempts */
40 #define RECONNECT_SLEEP_TIME 5
41 
42 /* Global options */
43 static char *basedir = NULL;
44 static int verbose = 0;
45 static int compresslevel = 0;
46 static bool noloop = false;
47 static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
48 static volatile sig_atomic_t time_to_stop = false;
49 static bool do_create_slot = false;
50 static bool slot_exists_ok = false;
51 static bool do_drop_slot = false;
52 static bool do_sync = true;
53 static bool synchronous = false;
54 static char *replication_slot = NULL;
57 
58 
59 static void usage(void);
60 static DIR *get_destination_dir(char *dest_folder);
61 static void close_destination_dir(DIR *dest_dir, char *dest_folder);
63 static void StreamLog(void);
64 static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline,
65  bool segment_finished);
66 
67 static void
69 {
70  if (conn != NULL)
71  PQfinish(conn);
72 }
73 
74 static void
75 usage(void)
76 {
77  printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n"),
78  progname);
79  printf(_("Usage:\n"));
80  printf(_(" %s [OPTION]...\n"), progname);
81  printf(_("\nOptions:\n"));
82  printf(_(" -D, --directory=DIR receive write-ahead log files into this directory\n"));
83  printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
84  printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
85  printf(_(" -n, --no-loop do not loop on connection lost\n"));
86  printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
87  printf(_(" -s, --status-interval=SECS\n"
88  " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
89  printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
90  printf(_(" --synchronous flush write-ahead log immediately after writing\n"));
91  printf(_(" -v, --verbose output verbose messages\n"));
92  printf(_(" -V, --version output version information, then exit\n"));
93  printf(_(" -Z, --compress=METHOD[:DETAIL]\n"
94  " compress as specified\n"));
95  printf(_(" -?, --help show this help, then exit\n"));
96  printf(_("\nConnection options:\n"));
97  printf(_(" -d, --dbname=CONNSTR connection string\n"));
98  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
99  printf(_(" -p, --port=PORT database server port number\n"));
100  printf(_(" -U, --username=NAME connect as specified database user\n"));
101  printf(_(" -w, --no-password never prompt for password\n"));
102  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
103  printf(_("\nOptional actions:\n"));
104  printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
105  printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
106  printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
107  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
108 }
109 
110 
111 /*
112  * Check if the filename looks like a WAL file, letting caller know if this
113  * WAL segment is partial and/or compressed.
114  */
115 static bool
116 is_xlogfilename(const char *filename, bool *ispartial,
117  pg_compress_algorithm *wal_compression_algorithm)
118 {
119  size_t fname_len = strlen(filename);
120  size_t xlog_pattern_len = strspn(filename, "0123456789ABCDEF");
121 
122  /* File does not look like a WAL file */
123  if (xlog_pattern_len != XLOG_FNAME_LEN)
124  return false;
125 
126  /* File looks like a completed uncompressed WAL file */
127  if (fname_len == XLOG_FNAME_LEN)
128  {
129  *ispartial = false;
130  *wal_compression_algorithm = PG_COMPRESSION_NONE;
131  return true;
132  }
133 
134  /* File looks like a completed gzip-compressed WAL file */
135  if (fname_len == XLOG_FNAME_LEN + strlen(".gz") &&
136  strcmp(filename + XLOG_FNAME_LEN, ".gz") == 0)
137  {
138  *ispartial = false;
139  *wal_compression_algorithm = PG_COMPRESSION_GZIP;
140  return true;
141  }
142 
143  /* File looks like a completed LZ4-compressed WAL file */
144  if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") &&
145  strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0)
146  {
147  *ispartial = false;
148  *wal_compression_algorithm = PG_COMPRESSION_LZ4;
149  return true;
150  }
151 
152  /* File looks like a partial uncompressed WAL file */
153  if (fname_len == XLOG_FNAME_LEN + strlen(".partial") &&
154  strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0)
155  {
156  *ispartial = true;
157  *wal_compression_algorithm = PG_COMPRESSION_NONE;
158  return true;
159  }
160 
161  /* File looks like a partial gzip-compressed WAL file */
162  if (fname_len == XLOG_FNAME_LEN + strlen(".gz.partial") &&
163  strcmp(filename + XLOG_FNAME_LEN, ".gz.partial") == 0)
164  {
165  *ispartial = true;
166  *wal_compression_algorithm = PG_COMPRESSION_GZIP;
167  return true;
168  }
169 
170  /* File looks like a partial LZ4-compressed WAL file */
171  if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") &&
172  strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0)
173  {
174  *ispartial = true;
175  *wal_compression_algorithm = PG_COMPRESSION_LZ4;
176  return true;
177  }
178 
179  /* File does not look like something we know */
180  return false;
181 }
182 
183 static bool
184 stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
185 {
186  static uint32 prevtimeline = 0;
187  static XLogRecPtr prevpos = InvalidXLogRecPtr;
188 
189  /* we assume that we get called once at the end of each segment */
190  if (verbose && segment_finished)
191  pg_log_info("finished segment at %X/%X (timeline %u)",
192  LSN_FORMAT_ARGS(xlogpos),
193  timeline);
194 
195  if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos)
196  {
197  if (verbose)
198  pg_log_info("stopped log streaming at %X/%X (timeline %u)",
199  LSN_FORMAT_ARGS(xlogpos),
200  timeline);
201  time_to_stop = true;
202  return true;
203  }
204 
205  /*
206  * Note that we report the previous, not current, position here. After a
207  * timeline switch, xlogpos points to the beginning of the segment because
208  * that's where we always begin streaming. Reporting the end of previous
209  * timeline isn't totally accurate, because the next timeline can begin
210  * slightly before the end of the WAL that we received on the previous
211  * timeline, but it's close enough for reporting purposes.
212  */
213  if (verbose && prevtimeline != 0 && prevtimeline != timeline)
214  pg_log_info("switched to timeline %u at %X/%X",
215  timeline,
216  LSN_FORMAT_ARGS(prevpos));
217 
218  prevtimeline = timeline;
219  prevpos = xlogpos;
220 
221  if (time_to_stop)
222  {
223  if (verbose)
224  pg_log_info("received interrupt signal, exiting");
225  return true;
226  }
227  return false;
228 }
229 
230 
231 /*
232  * Get destination directory.
233  */
234 static DIR *
235 get_destination_dir(char *dest_folder)
236 {
237  DIR *dir;
238 
239  Assert(dest_folder != NULL);
240  dir = opendir(dest_folder);
241  if (dir == NULL)
242  pg_fatal("could not open directory \"%s\": %m", dest_folder);
243 
244  return dir;
245 }
246 
247 
248 /*
249  * Close existing directory.
250  */
251 static void
252 close_destination_dir(DIR *dest_dir, char *dest_folder)
253 {
254  Assert(dest_dir != NULL && dest_folder != NULL);
255  if (closedir(dest_dir))
256  pg_fatal("could not close directory \"%s\": %m", dest_folder);
257 }
258 
259 
260 /*
261  * Determine starting location for streaming, based on any existing xlog
262  * segments in the directory. We start at the end of the last one that is
263  * complete (size matches wal segment size), on the timeline with highest ID.
264  *
265  * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
266  */
267 static XLogRecPtr
269 {
270  DIR *dir;
271  struct dirent *dirent;
272  XLogSegNo high_segno = 0;
273  uint32 high_tli = 0;
274  bool high_ispartial = false;
275 
277 
278  while (errno = 0, (dirent = readdir(dir)) != NULL)
279  {
280  uint32 tli;
281  XLogSegNo segno;
282  pg_compress_algorithm wal_compression_algorithm;
283  bool ispartial;
284 
286  &ispartial, &wal_compression_algorithm))
287  continue;
288 
289  /*
290  * Looks like an xlog file. Parse its position.
291  */
292  XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
293 
294  /*
295  * Check that the segment has the right size, if it's supposed to be
296  * completed. For non-compressed segments just check the on-disk size
297  * and see if it matches a completed segment. For gzip-compressed
298  * segments, look at the last 4 bytes of the compressed file, which is
299  * where the uncompressed size is located for files with a size lower
300  * than 4GB, and then compare it to the size of a completed segment.
301  * The 4 last bytes correspond to the ISIZE member according to
302  * http://www.zlib.org/rfc-gzip.html.
303  *
304  * For LZ4-compressed segments, uncompress the file in a throw-away
305  * buffer keeping track of the uncompressed size, then compare it to
306  * the size of a completed segment. Per its protocol, LZ4 does not
307  * store the uncompressed size of an object by default. contentSize
308  * is one possible way to do that, but we need to rely on a method
309  * where WAL segments could have been compressed by a different source
310  * than pg_receivewal, like an archive_command with lz4.
311  */
312  if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_NONE)
313  {
314  struct stat statbuf;
315  char fullpath[MAXPGPATH * 2];
316 
317  snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
318  if (stat(fullpath, &statbuf) != 0)
319  pg_fatal("could not stat file \"%s\": %m", fullpath);
320 
321  if (statbuf.st_size != WalSegSz)
322  {
323  pg_log_warning("segment file \"%s\" has incorrect size %lld, skipping",
324  dirent->d_name, (long long int) statbuf.st_size);
325  continue;
326  }
327  }
328  else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_GZIP)
329  {
330  int fd;
331  char buf[4];
332  int bytes_out;
333  char fullpath[MAXPGPATH * 2];
334  int r;
335 
336  snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
337 
338  fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
339  if (fd < 0)
340  pg_fatal("could not open compressed file \"%s\": %m",
341  fullpath);
342  if (lseek(fd, (off_t) (-4), SEEK_END) < 0)
343  pg_fatal("could not seek in compressed file \"%s\": %m",
344  fullpath);
345  r = read(fd, (char *) buf, sizeof(buf));
346  if (r != sizeof(buf))
347  {
348  if (r < 0)
349  pg_fatal("could not read compressed file \"%s\": %m",
350  fullpath);
351  else
352  pg_fatal("could not read compressed file \"%s\": read %d of %zu",
353  fullpath, r, sizeof(buf));
354  }
355 
356  close(fd);
357  bytes_out = (buf[3] << 24) | (buf[2] << 16) |
358  (buf[1] << 8) | buf[0];
359 
360  if (bytes_out != WalSegSz)
361  {
362  pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %d, skipping",
363  dirent->d_name, bytes_out);
364  continue;
365  }
366  }
367  else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_LZ4)
368  {
369 #ifdef USE_LZ4
370 #define LZ4_CHUNK_SZ 64 * 1024 /* 64kB as maximum chunk size read */
371  int fd;
372  ssize_t r;
373  size_t uncompressed_size = 0;
374  char fullpath[MAXPGPATH * 2];
375  char *outbuf;
376  char *readbuf;
377  LZ4F_decompressionContext_t ctx = NULL;
378  LZ4F_decompressOptions_t dec_opt;
379  LZ4F_errorCode_t status;
380 
381  memset(&dec_opt, 0, sizeof(dec_opt));
382  snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
383 
384  fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
385  if (fd < 0)
386  pg_fatal("could not open file \"%s\": %m", fullpath);
387 
388  status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
389  if (LZ4F_isError(status))
390  pg_fatal("could not create LZ4 decompression context: %s",
391  LZ4F_getErrorName(status));
392 
393  outbuf = pg_malloc0(LZ4_CHUNK_SZ);
394  readbuf = pg_malloc0(LZ4_CHUNK_SZ);
395  do
396  {
397  char *readp;
398  char *readend;
399 
400  r = read(fd, readbuf, LZ4_CHUNK_SZ);
401  if (r < 0)
402  pg_fatal("could not read file \"%s\": %m", fullpath);
403 
404  /* Done reading the file */
405  if (r == 0)
406  break;
407 
408  /* Process one chunk */
409  readp = readbuf;
410  readend = readbuf + r;
411  while (readp < readend)
412  {
413  size_t out_size = LZ4_CHUNK_SZ;
414  size_t read_size = readend - readp;
415 
416  memset(outbuf, 0, LZ4_CHUNK_SZ);
417  status = LZ4F_decompress(ctx, outbuf, &out_size,
418  readp, &read_size, &dec_opt);
419  if (LZ4F_isError(status))
420  pg_fatal("could not decompress file \"%s\": %s",
421  fullpath,
422  LZ4F_getErrorName(status));
423 
424  readp += read_size;
425  uncompressed_size += out_size;
426  }
427 
428  /*
429  * No need to continue reading the file when the
430  * uncompressed_size exceeds WalSegSz, even if there are still
431  * data left to read. However, if uncompressed_size is equal
432  * to WalSegSz, it should verify that there is no more data to
433  * read.
434  */
435  } while (uncompressed_size <= WalSegSz && r > 0);
436 
437  close(fd);
438  pg_free(outbuf);
439  pg_free(readbuf);
440 
441  status = LZ4F_freeDecompressionContext(ctx);
442  if (LZ4F_isError(status))
443  pg_fatal("could not free LZ4 decompression context: %s",
444  LZ4F_getErrorName(status));
445 
446  if (uncompressed_size != WalSegSz)
447  {
448  pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %zu, skipping",
449  dirent->d_name, uncompressed_size);
450  continue;
451  }
452 #else
453  pg_log_error("cannot check file \"%s\": compression with %s not supported by this build",
454  dirent->d_name, "LZ4");
455  exit(1);
456 #endif
457  }
458 
459  /* Looks like a valid segment. Remember that we saw it. */
460  if ((segno > high_segno) ||
461  (segno == high_segno && tli > high_tli) ||
462  (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
463  {
464  high_segno = segno;
465  high_tli = tli;
466  high_ispartial = ispartial;
467  }
468  }
469 
470  if (errno)
471  pg_fatal("could not read directory \"%s\": %m", basedir);
472 
474 
475  if (high_segno > 0)
476  {
477  XLogRecPtr high_ptr;
478 
479  /*
480  * Move the starting pointer to the start of the next segment, if the
481  * highest one we saw was completed. Otherwise start streaming from
482  * the beginning of the .partial segment.
483  */
484  if (!high_ispartial)
485  high_segno++;
486 
487  XLogSegNoOffsetToRecPtr(high_segno, 0, WalSegSz, high_ptr);
488 
489  *tli = high_tli;
490  return high_ptr;
491  }
492  else
493  return InvalidXLogRecPtr;
494 }
495 
496 /*
497  * Start the log streaming
498  */
499 static void
501 {
502  XLogRecPtr serverpos;
503  TimeLineID servertli;
504  StreamCtl stream = {0};
505  char *sysidentifier;
506 
507  /*
508  * Connect in replication mode to the server
509  */
510  if (conn == NULL)
511  conn = GetConnection();
512  if (!conn)
513  /* Error message already written in GetConnection() */
514  return;
515 
517  {
518  /*
519  * Error message already written in CheckServerVersionForStreaming().
520  * There's no hope of recovering from a version mismatch, so don't
521  * retry.
522  */
523  exit(1);
524  }
525 
526  /*
527  * Identify server, obtaining start LSN position and current timeline ID
528  * at the same time, necessary if not valid data can be found in the
529  * existing output directory.
530  */
531  if (!RunIdentifySystem(conn, &sysidentifier, &servertli, &serverpos, NULL))
532  exit(1);
533 
534  /*
535  * Figure out where to start streaming. First scan the local directory.
536  */
537  stream.startpos = FindStreamingStart(&stream.timeline);
538  if (stream.startpos == InvalidXLogRecPtr)
539  {
540  /*
541  * Try to get the starting point from the slot if any. This is
542  * supported in PostgreSQL 15 and newer.
543  */
544  if (replication_slot != NULL &&
545  PQserverVersion(conn) >= 150000)
546  {
548  &stream.timeline))
549  {
550  /* Error is logged by GetSlotInformation() */
551  return;
552  }
553  }
554 
555  /*
556  * If it the starting point is still not known, use the current WAL
557  * flush value as last resort.
558  */
559  if (stream.startpos == InvalidXLogRecPtr)
560  {
561  stream.startpos = serverpos;
562  stream.timeline = servertli;
563  }
564  }
565 
566  Assert(stream.startpos != InvalidXLogRecPtr &&
567  stream.timeline != 0);
568 
569  /*
570  * Always start streaming at the beginning of a segment
571  */
572  stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
573 
574  /*
575  * Start the replication
576  */
577  if (verbose)
578  pg_log_info("starting log streaming at %X/%X (timeline %u)",
579  LSN_FORMAT_ARGS(stream.startpos),
580  stream.timeline);
581 
582  stream.stream_stop = stop_streaming;
583  stream.stop_socket = PGINVALID_SOCKET;
585  stream.synchronous = synchronous;
586  stream.do_sync = do_sync;
587  stream.mark_done = false;
591  stream.do_sync);
592  stream.partial_suffix = ".partial";
594  stream.sysidentifier = sysidentifier;
595 
596  ReceiveXlogStream(conn, &stream);
597 
598  if (!stream.walmethod->ops->finish(stream.walmethod))
599  {
600  pg_log_info("could not finish writing WAL files: %m");
601  return;
602  }
603 
604  PQfinish(conn);
605  conn = NULL;
606 
607  stream.walmethod->ops->free(stream.walmethod);
608 }
609 
610 /*
611  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
612  * possible moment.
613  */
614 #ifndef WIN32
615 
616 static void
618 {
619  time_to_stop = true;
620 }
621 #endif
622 
623 int
624 main(int argc, char **argv)
625 {
626  static struct option long_options[] = {
627  {"help", no_argument, NULL, '?'},
628  {"version", no_argument, NULL, 'V'},
629  {"directory", required_argument, NULL, 'D'},
630  {"dbname", required_argument, NULL, 'd'},
631  {"endpos", required_argument, NULL, 'E'},
632  {"host", required_argument, NULL, 'h'},
633  {"port", required_argument, NULL, 'p'},
634  {"username", required_argument, NULL, 'U'},
635  {"no-loop", no_argument, NULL, 'n'},
636  {"no-password", no_argument, NULL, 'w'},
637  {"password", no_argument, NULL, 'W'},
638  {"status-interval", required_argument, NULL, 's'},
639  {"slot", required_argument, NULL, 'S'},
640  {"verbose", no_argument, NULL, 'v'},
641  {"compress", required_argument, NULL, 'Z'},
642 /* action */
643  {"create-slot", no_argument, NULL, 1},
644  {"drop-slot", no_argument, NULL, 2},
645  {"if-not-exists", no_argument, NULL, 3},
646  {"synchronous", no_argument, NULL, 4},
647  {"no-sync", no_argument, NULL, 5},
648  {NULL, 0, NULL, 0}
649  };
650 
651  int c;
652  int option_index;
653  char *db_name;
654  uint32 hi,
655  lo;
656  pg_compress_specification compression_spec;
657  char *compression_detail = NULL;
658  char *compression_algorithm_str = "none";
659  char *error_detail = NULL;
660 
661  pg_logging_init(argv[0]);
662  progname = get_progname(argv[0]);
663  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
664 
665  if (argc > 1)
666  {
667  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
668  {
669  usage();
670  exit(0);
671  }
672  else if (strcmp(argv[1], "-V") == 0 ||
673  strcmp(argv[1], "--version") == 0)
674  {
675  puts("pg_receivewal (PostgreSQL) " PG_VERSION);
676  exit(0);
677  }
678  }
679 
680  while ((c = getopt_long(argc, argv, "d:D:E:h:np:s:S:U:vwWZ:",
681  long_options, &option_index)) != -1)
682  {
683  switch (c)
684  {
685  case 'd':
687  break;
688  case 'D':
690  break;
691  case 'E':
692  if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
693  pg_fatal("could not parse end position \"%s\"", optarg);
694  endpos = ((uint64) hi) << 32 | lo;
695  break;
696  case 'h':
698  break;
699  case 'n':
700  noloop = true;
701  break;
702  case 'p':
704  break;
705  case 's':
706  if (!option_parse_int(optarg, "-s/--status-interval", 0,
707  INT_MAX / 1000,
709  exit(1);
710  standby_message_timeout *= 1000;
711  break;
712  case 'S':
714  break;
715  case 'U':
717  break;
718  case 'v':
719  verbose++;
720  break;
721  case 'w':
722  dbgetpassword = -1;
723  break;
724  case 'W':
725  dbgetpassword = 1;
726  break;
727  case 'Z':
728  parse_compress_options(optarg, &compression_algorithm_str,
729  &compression_detail);
730  break;
731  case 1:
732  do_create_slot = true;
733  break;
734  case 2:
735  do_drop_slot = true;
736  break;
737  case 3:
738  slot_exists_ok = true;
739  break;
740  case 4:
741  synchronous = true;
742  break;
743  case 5:
744  do_sync = false;
745  break;
746  default:
747  /* getopt_long already emitted a complaint */
748  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
749  exit(1);
750  }
751  }
752 
753  /*
754  * Any non-option arguments?
755  */
756  if (optind < argc)
757  {
758  pg_log_error("too many command-line arguments (first is \"%s\")",
759  argv[optind]);
760  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
761  exit(1);
762  }
763 
765  {
766  pg_log_error("cannot use --create-slot together with --drop-slot");
767  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
768  exit(1);
769  }
770 
771  if (replication_slot == NULL && (do_drop_slot || do_create_slot))
772  {
773  /* translator: second %s is an option name */
774  pg_log_error("%s needs a slot to be specified using --slot",
775  do_drop_slot ? "--drop-slot" : "--create-slot");
776  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
777  exit(1);
778  }
779 
780  if (synchronous && !do_sync)
781  {
782  pg_log_error("cannot use --synchronous together with --no-sync");
783  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
784  exit(1);
785  }
786 
787  /*
788  * Required arguments
789  */
790  if (basedir == NULL && !do_drop_slot && !do_create_slot)
791  {
792  pg_log_error("no target directory specified");
793  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
794  exit(1);
795  }
796 
797  /*
798  * Compression options
799  */
800  if (!parse_compress_algorithm(compression_algorithm_str,
802  pg_fatal("unrecognized compression algorithm: \"%s\"",
803  compression_algorithm_str);
804 
806  &compression_spec);
807  error_detail = validate_compress_specification(&compression_spec);
808  if (error_detail != NULL)
809  pg_fatal("invalid compression specification: %s",
810  error_detail);
811 
812  /* Extract the compression level */
813  compresslevel = compression_spec.level;
814 
816  pg_fatal("compression with %s is not yet supported", "ZSTD");
817 
818  /*
819  * Check existence of destination folder.
820  */
821  if (!do_drop_slot && !do_create_slot)
822  {
824 
826  }
827 
828  /*
829  * Obtain a connection before doing anything.
830  */
831  conn = GetConnection();
832  if (!conn)
833  /* error message already written in GetConnection() */
834  exit(1);
835  atexit(disconnect_atexit);
836 
837  /*
838  * Trap signals. (Don't do this until after the initial password prompt,
839  * if one is needed, in GetConnection.)
840  */
841 #ifndef WIN32
842  pqsignal(SIGINT, sigexit_handler);
843  pqsignal(SIGTERM, sigexit_handler);
844 #endif
845 
846  /*
847  * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
848  * replication connection and haven't connected using a database specific
849  * connection.
850  */
851  if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
852  exit(1);
853 
854  /*
855  * Check that there is a database associated with connection, none should
856  * be defined in this context.
857  */
858  if (db_name)
859  pg_fatal("replication connection using slot \"%s\" is unexpectedly database specific",
861 
862  /*
863  * Set umask so that directories/files are created with the same
864  * permissions as directories/files in the source data directory.
865  *
866  * pg_mode_mask is set to owner-only by default and then updated in
867  * GetConnection() where we get the mode from the server-side with
868  * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
869  */
870  umask(pg_mode_mask);
871 
872  /*
873  * Drop a replication slot.
874  */
875  if (do_drop_slot)
876  {
877  if (verbose)
878  pg_log_info("dropping replication slot \"%s\"", replication_slot);
879 
881  exit(1);
882  exit(0);
883  }
884 
885  /* Create a replication slot */
886  if (do_create_slot)
887  {
888  if (verbose)
889  pg_log_info("creating replication slot \"%s\"", replication_slot);
890 
891  if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
892  slot_exists_ok, false))
893  exit(1);
894  exit(0);
895  }
896 
897  /* determine remote server's xlog segment size */
898  if (!RetrieveWalSegSize(conn))
899  exit(1);
900 
901  /*
902  * Don't close the connection here so that subsequent StreamLog() can
903  * reuse it.
904  */
905 
906  while (true)
907  {
908  StreamLog();
909  if (time_to_stop)
910  {
911  /*
912  * We've been Ctrl-C'ed or end of streaming position has been
913  * willingly reached, so exit without an error code.
914  */
915  exit(0);
916  }
917  else if (noloop)
918  pg_fatal("disconnected");
919  else
920  {
921  /* translator: check source for value for %d */
922  pg_log_info("disconnected; waiting %d seconds to try again",
924  pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
925  }
926  }
927 }
unsigned int uint32
Definition: c.h:506
#define SIGNAL_ARGS
Definition: c.h:1345
#define Assert(condition)
Definition: c.h:858
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1214
#define PG_BINARY
Definition: c.h:1273
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:429
bool parse_compress_algorithm(char *name, pg_compress_algorithm *algorithm)
Definition: compression.c:49
void parse_compress_specification(pg_compress_algorithm algorithm, char *specification, pg_compress_specification *result)
Definition: compression.c:107
char * validate_compress_specification(pg_compress_specification *spec)
Definition: compression.c:344
pg_compress_algorithm
Definition: compression.h:22
@ PG_COMPRESSION_GZIP
Definition: compression.h:24
@ PG_COMPRESSION_LZ4
Definition: compression.h:25
@ PG_COMPRESSION_NONE
Definition: compression.h:23
@ PG_COMPRESSION_ZSTD
Definition: compression.h:26
void parse_compress_options(const char *option, char **algorithm, char **detail)
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:195
int closedir(DIR *)
Definition: dirent.c:127
struct dirent * readdir(DIR *)
Definition: dirent.c:78
DIR * opendir(const char *)
Definition: dirent.c:33
#define _(x)
Definition: elog.c:90
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7172
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4893
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
int pg_mode_mask
Definition: file_perm.c:25
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:24
#define required_argument
Definition: getopt_long.h:25
#define close(a)
Definition: win32.h:12
#define read(a, b, c)
Definition: win32.h:13
exit(1)
void pg_logging_init(const char *argv0)
Definition: logging.c:83
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_info(...)
Definition: logging.h:124
const char * progname
Definition: main.c:44
bool option_parse_int(const char *optarg, const char *optname, int min_range, int max_range, int *result)
Definition: option_utils.c:50
#define pg_fatal(...)
#define MAXPGPATH
static char * filename
Definition: pg_dumpall.c:119
PGDLLIMPORT int optind
Definition: getopt.c:50
PGDLLIMPORT char * optarg
Definition: getopt.c:52
static int verbose
Definition: pg_receivewal.c:44
static void close_destination_dir(DIR *dest_dir, char *dest_folder)
static pg_compress_algorithm compression_algorithm
Definition: pg_receivewal.c:55
static XLogRecPtr endpos
Definition: pg_receivewal.c:56
int main(int argc, char **argv)
static volatile sig_atomic_t time_to_stop
Definition: pg_receivewal.c:48
static bool slot_exists_ok
Definition: pg_receivewal.c:50
static bool synchronous
Definition: pg_receivewal.c:53
static DIR * get_destination_dir(char *dest_folder)
static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
static int compresslevel
Definition: pg_receivewal.c:45
static bool do_drop_slot
Definition: pg_receivewal.c:51
static char * replication_slot
Definition: pg_receivewal.c:54
static XLogRecPtr FindStreamingStart(uint32 *tli)
static bool do_sync
Definition: pg_receivewal.c:52
static void sigexit_handler(SIGNAL_ARGS)
static void usage(void)
Definition: pg_receivewal.c:75
#define RECONNECT_SLEEP_TIME
Definition: pg_receivewal.c:40
static bool do_create_slot
Definition: pg_receivewal.c:49
static char * basedir
Definition: pg_receivewal.c:43
static void disconnect_atexit(void)
Definition: pg_receivewal.c:68
static bool noloop
Definition: pg_receivewal.c:46
static int standby_message_timeout
Definition: pg_receivewal.c:47
static bool is_xlogfilename(const char *filename, bool *ispartial, pg_compress_algorithm *wal_compression_algorithm)
static void StreamLog(void)
static char * buf
Definition: pg_test_fsync.c:73
#define pg_log_warning(...)
Definition: pgfnames.c:24
const char * get_progname(const char *argv0)
Definition: path.c:575
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define snprintf
Definition: port.h:238
#define PGINVALID_SOCKET
Definition: port.h:31
#define printf(...)
Definition: port.h:244
char * c
static int fd(const char *x, int i)
Definition: preproc-init.c:105
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
Definition: receivelog.c:453
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:375
void pg_usleep(long microsec)
Definition: signal.c:53
int dbgetpassword
Definition: streamutil.c:53
bool RetrieveWalSegSize(PGconn *conn)
Definition: streamutil.c:344
int WalSegSz
Definition: streamutil.c:34
char * dbhost
Definition: streamutil.c:49
char * dbport
Definition: streamutil.c:51
char * connection_string
Definition: streamutil.c:48
bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
Definition: streamutil.c:558
PGconn * conn
Definition: streamutil.c:55
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:477
char * dbuser
Definition: streamutil.c:50
Definition: dirent.c:26
char * sysidentifier
Definition: receivelog.h:33
TimeLineID timeline
Definition: receivelog.h:32
stream_stop_callback stream_stop
Definition: receivelog.h:41
char * replication_slot
Definition: receivelog.h:48
XLogRecPtr startpos
Definition: receivelog.h:31
bool do_sync
Definition: receivelog.h:38
pgsocket stop_socket
Definition: receivelog.h:43
int standby_message_timeout
Definition: receivelog.h:35
WalWriteMethod * walmethod
Definition: receivelog.h:46
bool mark_done
Definition: receivelog.h:37
char * partial_suffix
Definition: receivelog.h:47
bool synchronous
Definition: receivelog.h:36
void(* free)(WalWriteMethod *wwmethod)
Definition: walmethods.h:92
bool(* finish)(WalWriteMethod *wwmethod)
Definition: walmethods.h:86
const WalWriteMethodOps * ops
Definition: walmethods.h:105
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
__int64 st_size
Definition: win32_port.h:273
WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
Definition: walmethods.c:640
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1199
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1404
#define stat
Definition: win32_port.h:284
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
static void XLogFromFileName(const char *fname, TimeLineID *tli, XLogSegNo *logSegNo, int wal_segsz_bytes)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLOG_FNAME_LEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48