PostgreSQL Source Code  git master
pg_recvlogical.c File Reference
#include "postgres_fe.h"
#include <dirent.h>
#include <limits.h>
#include <sys/stat.h>
#include <unistd.h>
#include "access/xlog_internal.h"
#include "common/fe_memutils.h"
#include "common/file_perm.h"
#include "common/logging.h"
#include "fe_utils/option_utils.h"
#include "getopt_long.h"
#include "libpq-fe.h"
#include "libpq/pqsignal.h"
#include "pqexpbuffer.h"
#include "streamutil.h"
Include dependency graph for pg_recvlogical.c:

Go to the source code of this file.

Macros

#define RECONNECT_SLEEP_TIME   5
 

Functions

static void usage (void)
 
static void StreamLogicalLog (void)
 
static bool flushAndSendFeedback (PGconn *conn, TimestampTz *now)
 
static void prepareToTerminate (PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
 
static bool sendFeedback (PGconn *conn, TimestampTz now, bool force, bool replyRequested)
 
static void disconnect_atexit (void)
 
static bool OutputFsync (TimestampTz now)
 
static void sigint_handler (int signum)
 
static void sighup_handler (int signum)
 
int main (int argc, char **argv)
 

Variables

static char * outfile = NULL
 
static int verbose = 0
 
static bool two_phase = false
 
static int noloop = 0
 
static int standby_message_timeout = 10 * 1000
 
static int fsync_interval = 10 * 1000
 
static XLogRecPtr startpos = InvalidXLogRecPtr
 
static XLogRecPtr endpos = InvalidXLogRecPtr
 
static bool do_create_slot = false
 
static bool slot_exists_ok = false
 
static bool do_start_slot = false
 
static bool do_drop_slot = false
 
static char * replication_slot = NULL
 
static char ** options
 
static size_t noptions = 0
 
static const char * plugin = "test_decoding"
 
static int outfd = -1
 
static volatile sig_atomic_t time_to_abort = false
 
static volatile sig_atomic_t output_reopen = false
 
static bool output_isfile
 
static TimestampTz output_last_fsync = -1
 
static bool output_needs_fsync = false
 
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr
 
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr
 

Macro Definition Documentation

◆ RECONNECT_SLEEP_TIME

#define RECONNECT_SLEEP_TIME   5

Definition at line 35 of file pg_recvlogical.c.

Referenced by main().

Function Documentation

◆ disconnect_atexit()

static void disconnect_atexit ( void  )
static

Definition at line 170 of file pg_recvlogical.c.

References conn, and PQfinish().

Referenced by main().

171 {
172  if (conn != NULL)
173  PQfinish(conn);
174 }
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4231
PGconn * conn
Definition: streamutil.c:54

◆ flushAndSendFeedback()

static bool flushAndSendFeedback ( PGconn conn,
TimestampTz now 
)
static

Definition at line 1035 of file pg_recvlogical.c.

References feGetCurrentTimestamp(), OutputFsync(), and sendFeedback().

Referenced by StreamLogicalLog().

1036 {
1037  /* flush data to disk, so that we send a recent flush pointer */
1038  if (!OutputFsync(*now))
1039  return false;
1041  if (!sendFeedback(conn, *now, true, false))
1042  return false;
1043 
1044  return true;
1045 }
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:701
static bool OutputFsync(TimestampTz now)
static bool sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 680 of file pg_recvlogical.c.

References _, conn, CreateReplicationSlot(), dbgetpassword, dbhost, dbname, dbport, dbuser, disconnect_atexit(), do_create_slot, do_drop_slot, do_start_slot, DropReplicationSlot(), endpos, fprintf, fsync_interval, get_progname(), GetConnection(), getopt_long(), InvalidXLogRecPtr, no_argument, noloop, noptions, optarg, optind, option_parse_int(), outfile, pg_log_error, pg_log_info, pg_logging_init(), pg_mode_mask, pg_realloc(), pg_strdup(), PG_TEXTDOMAIN, pg_usleep(), plugin, pqsignal(), progname, RECONNECT_SLEEP_TIME, replication_slot, required_argument, RunIdentifySystem(), set_pglocale_pgservice(), SIGHUP, sighup_handler(), sigint_handler(), slot_exists_ok, standby_message_timeout, startpos, StreamLogicalLog(), time_to_abort, two_phase, usage(), val, and verbose.

681 {
682  static struct option long_options[] = {
683 /* general options */
684  {"file", required_argument, NULL, 'f'},
685  {"fsync-interval", required_argument, NULL, 'F'},
686  {"no-loop", no_argument, NULL, 'n'},
687  {"verbose", no_argument, NULL, 'v'},
688  {"two-phase", no_argument, NULL, 't'},
689  {"version", no_argument, NULL, 'V'},
690  {"help", no_argument, NULL, '?'},
691 /* connection options */
692  {"dbname", required_argument, NULL, 'd'},
693  {"host", required_argument, NULL, 'h'},
694  {"port", required_argument, NULL, 'p'},
695  {"username", required_argument, NULL, 'U'},
696  {"no-password", no_argument, NULL, 'w'},
697  {"password", no_argument, NULL, 'W'},
698 /* replication options */
699  {"startpos", required_argument, NULL, 'I'},
700  {"endpos", required_argument, NULL, 'E'},
701  {"option", required_argument, NULL, 'o'},
702  {"plugin", required_argument, NULL, 'P'},
703  {"status-interval", required_argument, NULL, 's'},
704  {"slot", required_argument, NULL, 'S'},
705 /* action */
706  {"create-slot", no_argument, NULL, 1},
707  {"start", no_argument, NULL, 2},
708  {"drop-slot", no_argument, NULL, 3},
709  {"if-not-exists", no_argument, NULL, 4},
710  {NULL, 0, NULL, 0}
711  };
712  int c;
713  int option_index;
714  uint32 hi,
715  lo;
716  char *db_name;
717 
718  pg_logging_init(argv[0]);
719  progname = get_progname(argv[0]);
720  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
721 
722  if (argc > 1)
723  {
724  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
725  {
726  usage();
727  exit(0);
728  }
729  else if (strcmp(argv[1], "-V") == 0 ||
730  strcmp(argv[1], "--version") == 0)
731  {
732  puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
733  exit(0);
734  }
735  }
736 
737  while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
738  long_options, &option_index)) != -1)
739  {
740  switch (c)
741  {
742 /* general options */
743  case 'f':
745  break;
746  case 'F':
747  if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
748  INT_MAX / 1000,
749  &fsync_interval))
750  exit(1);
751  fsync_interval *= 1000;
752  break;
753  case 'n':
754  noloop = 1;
755  break;
756  case 'v':
757  verbose++;
758  break;
759  case 't':
760  two_phase = true;
761  break;
762 /* connection options */
763  case 'd':
765  break;
766  case 'h':
768  break;
769  case 'p':
771  break;
772  case 'U':
774  break;
775  case 'w':
776  dbgetpassword = -1;
777  break;
778  case 'W':
779  dbgetpassword = 1;
780  break;
781 /* replication options */
782  case 'I':
783  if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
784  {
785  pg_log_error("could not parse start position \"%s\"", optarg);
786  exit(1);
787  }
788  startpos = ((uint64) hi) << 32 | lo;
789  break;
790  case 'E':
791  if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
792  {
793  pg_log_error("could not parse end position \"%s\"", optarg);
794  exit(1);
795  }
796  endpos = ((uint64) hi) << 32 | lo;
797  break;
798  case 'o':
799  {
800  char *data = pg_strdup(optarg);
801  char *val = strchr(data, '=');
802 
803  if (val != NULL)
804  {
805  /* remove =; separate data from val */
806  *val = '\0';
807  val++;
808  }
809 
810  noptions += 1;
811  options = pg_realloc(options, sizeof(char *) * noptions * 2);
812 
813  options[(noptions - 1) * 2] = data;
814  options[(noptions - 1) * 2 + 1] = val;
815  }
816 
817  break;
818  case 'P':
820  break;
821  case 's':
822  if (!option_parse_int(optarg, "-s/--status-interval", 0,
823  INT_MAX / 1000,
825  exit(1);
826  standby_message_timeout *= 1000;
827  break;
828  case 'S':
830  break;
831 /* action */
832  case 1:
833  do_create_slot = true;
834  break;
835  case 2:
836  do_start_slot = true;
837  break;
838  case 3:
839  do_drop_slot = true;
840  break;
841  case 4:
842  slot_exists_ok = true;
843  break;
844 
845  default:
846 
847  /*
848  * getopt_long already emitted a complaint
849  */
850  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
851  progname);
852  exit(1);
853  }
854  }
855 
856  /*
857  * Any non-option arguments?
858  */
859  if (optind < argc)
860  {
861  pg_log_error("too many command-line arguments (first is \"%s\")",
862  argv[optind]);
863  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
864  progname);
865  exit(1);
866  }
867 
868  /*
869  * Required arguments
870  */
871  if (replication_slot == NULL)
872  {
873  pg_log_error("no slot specified");
874  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
875  progname);
876  exit(1);
877  }
878 
879  if (do_start_slot && outfile == NULL)
880  {
881  pg_log_error("no target file specified");
882  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
883  progname);
884  exit(1);
885  }
886 
887  if (!do_drop_slot && dbname == NULL)
888  {
889  pg_log_error("no database specified");
890  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
891  progname);
892  exit(1);
893  }
894 
896  {
897  pg_log_error("at least one action needs to be specified");
898  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
899  progname);
900  exit(1);
901  }
902 
904  {
905  pg_log_error("cannot use --create-slot or --start together with --drop-slot");
906  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
907  progname);
908  exit(1);
909  }
910 
912  {
913  pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
914  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
915  progname);
916  exit(1);
917  }
918 
920  {
921  pg_log_error("--endpos may only be specified with --start");
922  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
923  progname);
924  exit(1);
925  }
926 
927  if (two_phase && !do_create_slot)
928  {
929  pg_log_error("--two-phase may only be specified with --create-slot");
930  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
931  progname);
932  exit(1);
933  }
934 
935 
936 #ifndef WIN32
937  pqsignal(SIGINT, sigint_handler);
939 #endif
940 
941  /*
942  * Obtain a connection to server. This is not really necessary but it
943  * helps to get more precise error messages about authentication, required
944  * GUC parameters and such.
945  */
946  conn = GetConnection();
947  if (!conn)
948  /* Error message already written in GetConnection() */
949  exit(1);
950  atexit(disconnect_atexit);
951 
952  /*
953  * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
954  * replication connection.
955  */
956  if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
957  exit(1);
958 
959  if (db_name == NULL)
960  {
961  pg_log_error("could not establish database-specific replication connection");
962  exit(1);
963  }
964 
965  /*
966  * Set umask so that directories/files are created with the same
967  * permissions as directories/files in the source data directory.
968  *
969  * pg_mode_mask is set to owner-only by default and then updated in
970  * GetConnection() where we get the mode from the server-side with
971  * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
972  */
973  umask(pg_mode_mask);
974 
975  /* Drop a replication slot. */
976  if (do_drop_slot)
977  {
978  if (verbose)
979  pg_log_info("dropping replication slot \"%s\"", replication_slot);
980 
982  exit(1);
983  }
984 
985  /* Create a replication slot. */
986  if (do_create_slot)
987  {
988  if (verbose)
989  pg_log_info("creating replication slot \"%s\"", replication_slot);
990 
992  false, false, slot_exists_ok, two_phase))
993  exit(1);
995  }
996 
997  if (!do_start_slot)
998  exit(0);
999 
1000  /* Stream loop */
1001  while (true)
1002  {
1003  StreamLogicalLog();
1004  if (time_to_abort)
1005  {
1006  /*
1007  * We've been Ctrl-C'ed or reached an exit limit condition. That's
1008  * not an error, so exit without an errorcode.
1009  */
1010  exit(0);
1011  }
1012  else if (noloop)
1013  {
1014  pg_log_error("disconnected");
1015  exit(1);
1016  }
1017  else
1018  {
1019  /* translator: check source for value for %d */
1020  pg_log_info("disconnected; waiting %d seconds to try again",
1022  pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1023  }
1024  }
1025 }
static const char * plugin
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * progname
Definition: main.c:46
static volatile sig_atomic_t time_to_abort
static void sigint_handler(int signum)
static int fsync_interval
#define RECONNECT_SLEEP_TIME
bool option_parse_int(const char *optarg, const char *optname, int min_range, int max_range, int *result)
Definition: option_utils.c:50
const char * get_progname(const char *argv0)
Definition: path.c:453
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1227
#define pg_log_error(...)
Definition: logging.h:80
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:57
static void sighup_handler(int signum)
void pg_logging_init(const char *argv0)
Definition: logging.c:81
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:409
static char * replication_slot
#define fprintf
Definition: port.h:221
static int noloop
static int verbose
static XLogRecPtr endpos
void pg_usleep(long microsec)
Definition: signal.c:53
#define required_argument
Definition: getopt_long.h:25
int optind
Definition: getopt.c:50
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:127
PGconn * conn
Definition: streamutil.c:54
static bool slot_exists_ok
char * c
static void disconnect_atexit(void)
#define SIGHUP
Definition: win32_port.h:167
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
unsigned int uint32
Definition: c.h:441
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
int dbgetpassword
Definition: streamutil.c:52
#define no_argument
Definition: getopt_long.h:24
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1215
static void usage(void)
static bool two_phase
char * dbport
Definition: streamutil.c:50
static bool do_drop_slot
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
char * dbhost
Definition: streamutil.c:48
char * dbname
Definition: streamutil.c:51
static XLogRecPtr startpos
static void StreamLogicalLog(void)
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1027
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:433
char * dbuser
Definition: streamutil.c:49
char * optarg
Definition: getopt.c:52
static bool do_start_slot
static char * outfile
static size_t noptions
#define _(x)
Definition: elog.c:89
int pg_mode_mask
Definition: file_perm.c:25
long val
Definition: informix.c:664
#define pg_log_info(...)
Definition: logging.h:88
static bool do_create_slot
static int standby_message_timeout

◆ OutputFsync()

static bool OutputFsync ( TimestampTz  now)
static

Definition at line 177 of file pg_recvlogical.c.

References fsync, fsync_interval, now(), outfd, outfile, output_fsync_lsn, output_isfile, output_last_fsync, output_needs_fsync, output_written_lsn, and pg_log_fatal.

Referenced by flushAndSendFeedback(), and StreamLogicalLog().

178 {
180 
182 
183  if (fsync_interval <= 0)
184  return true;
185 
186  if (!output_needs_fsync)
187  return true;
188 
189  output_needs_fsync = false;
190 
191  /* can only fsync if it's a regular file */
192  if (!output_isfile)
193  return true;
194 
195  if (fsync(outfd) != 0)
196  {
197  pg_log_fatal("could not fsync file \"%s\": %m", outfile);
198  exit(1);
199  }
200 
201  return true;
202 }
static XLogRecPtr output_fsync_lsn
static int fsync_interval
#define fsync(fd)
Definition: win32_port.h:76
static bool output_isfile
static int outfd
static XLogRecPtr output_written_lsn
static bool output_needs_fsync
static TimestampTz output_last_fsync
static char * outfile
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
#define pg_log_fatal(...)
Definition: logging.h:76

◆ prepareToTerminate()

static void prepareToTerminate ( PGconn conn,
XLogRecPtr  endpos,
bool  keepalive,
XLogRecPtr  lsn 
)
static

Definition at line 1052 of file pg_recvlogical.c.

References LSN_FORMAT_ARGS, pg_log_info, PQflush(), PQputCopyEnd(), and verbose.

Referenced by StreamLogicalLog().

1053 {
1054  (void) PQputCopyEnd(conn, NULL);
1055  (void) PQflush(conn);
1056 
1057  if (verbose)
1058  {
1059  if (keepalive)
1060  pg_log_info("end position %X/%X reached by keepalive",
1062  else
1063  pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1065  }
1066 }
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2600
static int verbose
static XLogRecPtr endpos
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
int PQflush(PGconn *conn)
Definition: fe-exec.c:3766
#define pg_log_info(...)
Definition: logging.h:88

◆ sendFeedback()

static bool sendFeedback ( PGconn conn,
TimestampTz  now,
bool  force,
bool  replyRequested 
)
static

Definition at line 118 of file pg_recvlogical.c.

References fe_sendint64(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, output_fsync_lsn, output_written_lsn, pg_log_error, pg_log_info, PQerrorMessage(), PQflush(), PQputCopyData(), replication_slot, startpos, and verbose.

Referenced by flushAndSendFeedback(), and StreamLogicalLog().

119 {
120  static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
121  static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
122 
123  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
124  int len = 0;
125 
126  /*
127  * we normally don't want to send superfluous feedback, but if it's
128  * because of a timeout we need to, otherwise wal_sender_timeout will kill
129  * us.
130  */
131  if (!force &&
132  last_written_lsn == output_written_lsn &&
133  last_fsync_lsn == output_fsync_lsn)
134  return true;
135 
136  if (verbose)
137  pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
141 
142  replybuf[len] = 'r';
143  len += 1;
144  fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
145  len += 8;
146  fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
147  len += 8;
148  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
149  len += 8;
150  fe_sendint64(now, &replybuf[len]); /* sendTime */
151  len += 8;
152  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
153  len += 1;
154 
156  last_written_lsn = output_written_lsn;
157  last_fsync_lsn = output_fsync_lsn;
158 
159  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
160  {
161  pg_log_error("could not send feedback packet: %s",
162  PQerrorMessage(conn));
163  return false;
164  }
165 
166  return true;
167 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2544
static XLogRecPtr output_fsync_lsn
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define pg_log_error(...)
Definition: logging.h:80
static char * replication_slot
static int verbose
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
int PQflush(PGconn *conn)
Definition: fe-exec.c:3766
static XLogRecPtr output_written_lsn
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static XLogRecPtr startpos
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:755
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
#define pg_log_info(...)
Definition: logging.h:88

◆ sighup_handler()

static void sighup_handler ( int  signum)
static

Definition at line 672 of file pg_recvlogical.c.

References output_reopen.

Referenced by main().

673 {
674  output_reopen = true;
675 }
static volatile sig_atomic_t output_reopen

◆ sigint_handler()

static void sigint_handler ( int  signum)
static

Definition at line 663 of file pg_recvlogical.c.

References time_to_abort.

Referenced by main().

664 {
665  time_to_abort = true;
666 }
static volatile sig_atomic_t time_to_abort

◆ StreamLogicalLog()

static void StreamLogicalLog ( void  )
static

Definition at line 208 of file pg_recvlogical.c.

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), close, conn, copybuf, createPQExpBuffer(), PQExpBufferData::data, destroyPQExpBuffer(), EINTR, endpos, error(), fe_recvint64(), feGetCurrentTimestamp(), feTimestampDifference(), feTimestampDifferenceExceeds(), flushAndSendFeedback(), fstat, fsync_interval, GetConnection(), i, InvalidXLogRecPtr, LSN_FORMAT_ARGS, Max, noptions, now(), outfd, outfile, output_fsync_lsn, output_isfile, output_last_fsync, output_needs_fsync, output_reopen, output_written_lsn, OutputFsync(), PG_BINARY, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_COPY_OUT, PQclear(), PQconsumeInput(), PQerrorMessage(), PQexec(), PQfinish(), PQfreemem(), PQgetCopyData(), PQgetResult(), PQresultErrorMessage(), PQresultStatus(), PQsocket(), prepareToTerminate(), replication_slot, resetPQExpBuffer(), S_IRUSR, S_ISREG, S_IWUSR, select, sendFeedback(), stat::st_mode, standby_message_timeout, startpos, generate_unaccent_rules::stdout, time_to_abort, verbose, and write.

Referenced by main().

209 {
210  PGresult *res;
211  char *copybuf = NULL;
212  TimestampTz last_status = -1;
213  int i;
214  PQExpBuffer query;
215 
218 
219  query = createPQExpBuffer();
220 
221  /*
222  * Connect in replication mode to the server
223  */
224  if (!conn)
225  conn = GetConnection();
226  if (!conn)
227  /* Error message already written in GetConnection() */
228  return;
229 
230  /*
231  * Start the replication
232  */
233  if (verbose)
234  pg_log_info("starting log streaming at %X/%X (slot %s)",
237 
238  /* Initiate the replication stream at specified location */
239  appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
241 
242  /* print options if there are any */
243  if (noptions)
244  appendPQExpBufferStr(query, " (");
245 
246  for (i = 0; i < noptions; i++)
247  {
248  /* separator */
249  if (i > 0)
250  appendPQExpBufferStr(query, ", ");
251 
252  /* write option name */
253  appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
254 
255  /* write option value if specified */
256  if (options[(i * 2) + 1] != NULL)
257  appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
258  }
259 
260  if (noptions)
261  appendPQExpBufferChar(query, ')');
262 
263  res = PQexec(conn, query->data);
264  if (PQresultStatus(res) != PGRES_COPY_BOTH)
265  {
266  pg_log_error("could not send replication command \"%s\": %s",
267  query->data, PQresultErrorMessage(res));
268  PQclear(res);
269  goto error;
270  }
271  PQclear(res);
272  resetPQExpBuffer(query);
273 
274  if (verbose)
275  pg_log_info("streaming initiated");
276 
277  while (!time_to_abort)
278  {
279  int r;
280  int bytes_left;
281  int bytes_written;
283  int hdr_len;
284  XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
285 
286  if (copybuf != NULL)
287  {
288  PQfreemem(copybuf);
289  copybuf = NULL;
290  }
291 
292  /*
293  * Potentially send a status message to the primary.
294  */
295  now = feGetCurrentTimestamp();
296 
297  if (outfd != -1 &&
300  {
301  if (!OutputFsync(now))
302  goto error;
303  }
304 
305  if (standby_message_timeout > 0 &&
306  feTimestampDifferenceExceeds(last_status, now,
308  {
309  /* Time to send feedback! */
310  if (!sendFeedback(conn, now, true, false))
311  goto error;
312 
313  last_status = now;
314  }
315 
316  /* got SIGHUP, close output file */
317  if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
318  {
319  now = feGetCurrentTimestamp();
320  if (!OutputFsync(now))
321  goto error;
322  close(outfd);
323  outfd = -1;
324  }
325  output_reopen = false;
326 
327  /* open the output file, if not open yet */
328  if (outfd == -1)
329  {
330  struct stat statbuf;
331 
332  if (strcmp(outfile, "-") == 0)
333  outfd = fileno(stdout);
334  else
335  outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
336  S_IRUSR | S_IWUSR);
337  if (outfd == -1)
338  {
339  pg_log_error("could not open log file \"%s\": %m", outfile);
340  goto error;
341  }
342 
343  if (fstat(outfd, &statbuf) != 0)
344  {
345  pg_log_error("could not stat file \"%s\": %m", outfile);
346  goto error;
347  }
348 
349  output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
350  }
351 
352  r = PQgetCopyData(conn, &copybuf, 1);
353  if (r == 0)
354  {
355  /*
356  * In async mode, and no data available. We block on reading but
357  * not more than the specified timeout, so that we can send a
358  * response back to the client.
359  */
360  fd_set input_mask;
361  TimestampTz message_target = 0;
362  TimestampTz fsync_target = 0;
363  struct timeval timeout;
364  struct timeval *timeoutptr = NULL;
365 
366  if (PQsocket(conn) < 0)
367  {
368  pg_log_error("invalid socket: %s", PQerrorMessage(conn));
369  goto error;
370  }
371 
372  FD_ZERO(&input_mask);
373  FD_SET(PQsocket(conn), &input_mask);
374 
375  /* Compute when we need to wakeup to send a keepalive message. */
377  message_target = last_status + (standby_message_timeout - 1) *
378  ((int64) 1000);
379 
380  /* Compute when we need to wakeup to fsync the output file. */
382  fsync_target = output_last_fsync + (fsync_interval - 1) *
383  ((int64) 1000);
384 
385  /* Now compute when to wakeup. */
386  if (message_target > 0 || fsync_target > 0)
387  {
388  TimestampTz targettime;
389  long secs;
390  int usecs;
391 
392  targettime = message_target;
393 
394  if (fsync_target > 0 && fsync_target < targettime)
395  targettime = fsync_target;
396 
398  targettime,
399  &secs,
400  &usecs);
401  if (secs <= 0)
402  timeout.tv_sec = 1; /* Always sleep at least 1 sec */
403  else
404  timeout.tv_sec = secs;
405  timeout.tv_usec = usecs;
406  timeoutptr = &timeout;
407  }
408 
409  r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
410  if (r == 0 || (r < 0 && errno == EINTR))
411  {
412  /*
413  * Got a timeout or signal. Continue the loop and either
414  * deliver a status packet to the server or just go back into
415  * blocking.
416  */
417  continue;
418  }
419  else if (r < 0)
420  {
421  pg_log_error("%s() failed: %m", "select");
422  goto error;
423  }
424 
425  /* Else there is actually data on the socket */
426  if (PQconsumeInput(conn) == 0)
427  {
428  pg_log_error("could not receive data from WAL stream: %s",
430  goto error;
431  }
432  continue;
433  }
434 
435  /* End of copy stream */
436  if (r == -1)
437  break;
438 
439  /* Failure while reading the copy stream */
440  if (r == -2)
441  {
442  pg_log_error("could not read COPY data: %s",
444  goto error;
445  }
446 
447  /* Check the message type. */
448  if (copybuf[0] == 'k')
449  {
450  int pos;
451  bool replyRequested;
452  XLogRecPtr walEnd;
453  bool endposReached = false;
454 
455  /*
456  * Parse the keepalive message, enclosed in the CopyData message.
457  * We just check if the server requested a reply, and ignore the
458  * rest.
459  */
460  pos = 1; /* skip msgtype 'k' */
461  walEnd = fe_recvint64(&copybuf[pos]);
463 
464  pos += 8; /* read walEnd */
465 
466  pos += 8; /* skip sendTime */
467 
468  if (r < pos + 1)
469  {
470  pg_log_error("streaming header too small: %d", r);
471  goto error;
472  }
473  replyRequested = copybuf[pos];
474 
475  if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
476  {
477  /*
478  * If there's nothing to read on the socket until a keepalive
479  * we know that the server has nothing to send us; and if
480  * walEnd has passed endpos, we know nothing else can have
481  * committed before endpos. So we can bail out now.
482  */
483  endposReached = true;
484  }
485 
486  /* Send a reply, if necessary */
487  if (replyRequested || endposReached)
488  {
489  if (!flushAndSendFeedback(conn, &now))
490  goto error;
491  last_status = now;
492  }
493 
494  if (endposReached)
495  {
497  time_to_abort = true;
498  break;
499  }
500 
501  continue;
502  }
503  else if (copybuf[0] != 'w')
504  {
505  pg_log_error("unrecognized streaming header: \"%c\"",
506  copybuf[0]);
507  goto error;
508  }
509 
510  /*
511  * Read the header of the XLogData message, enclosed in the CopyData
512  * message. We only need the WAL location field (dataStart), the rest
513  * of the header is ignored.
514  */
515  hdr_len = 1; /* msgtype 'w' */
516  hdr_len += 8; /* dataStart */
517  hdr_len += 8; /* walEnd */
518  hdr_len += 8; /* sendTime */
519  if (r < hdr_len + 1)
520  {
521  pg_log_error("streaming header too small: %d", r);
522  goto error;
523  }
524 
525  /* Extract WAL location for this block */
526  cur_record_lsn = fe_recvint64(&copybuf[1]);
527 
528  if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
529  {
530  /*
531  * We've read past our endpoint, so prepare to go away being
532  * cautious about what happens to our output data.
533  */
534  if (!flushAndSendFeedback(conn, &now))
535  goto error;
536  prepareToTerminate(conn, endpos, false, cur_record_lsn);
537  time_to_abort = true;
538  break;
539  }
540 
541  output_written_lsn = Max(cur_record_lsn, output_written_lsn);
542 
543  bytes_left = r - hdr_len;
544  bytes_written = 0;
545 
546  /* signal that a fsync is needed */
547  output_needs_fsync = true;
548 
549  while (bytes_left)
550  {
551  int ret;
552 
553  ret = write(outfd,
554  copybuf + hdr_len + bytes_written,
555  bytes_left);
556 
557  if (ret < 0)
558  {
559  pg_log_error("could not write %u bytes to log file \"%s\": %m",
560  bytes_left, outfile);
561  goto error;
562  }
563 
564  /* Write was successful, advance our position */
565  bytes_written += ret;
566  bytes_left -= ret;
567  }
568 
569  if (write(outfd, "\n", 1) != 1)
570  {
571  pg_log_error("could not write %u bytes to log file \"%s\": %m",
572  1, outfile);
573  goto error;
574  }
575 
576  if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
577  {
578  /* endpos was exactly the record we just processed, we're done */
579  if (!flushAndSendFeedback(conn, &now))
580  goto error;
581  prepareToTerminate(conn, endpos, false, cur_record_lsn);
582  time_to_abort = true;
583  break;
584  }
585  }
586 
587  res = PQgetResult(conn);
588  if (PQresultStatus(res) == PGRES_COPY_OUT)
589  {
590  PQclear(res);
591 
592  /*
593  * We're doing a client-initiated clean exit and have sent CopyDone to
594  * the server. Drain any messages, so we don't miss a last-minute
595  * ErrorResponse. The walsender stops generating XLogData records once
596  * it sees CopyDone, so expect this to finish quickly. After CopyDone,
597  * it's too late for sendFeedback(), even if this were to take a long
598  * time. Hence, use synchronous-mode PQgetCopyData().
599  */
600  while (1)
601  {
602  int r;
603 
604  if (copybuf != NULL)
605  {
606  PQfreemem(copybuf);
607  copybuf = NULL;
608  }
609  r = PQgetCopyData(conn, &copybuf, 0);
610  if (r == -1)
611  break;
612  if (r == -2)
613  {
614  pg_log_error("could not read COPY data: %s",
616  time_to_abort = false; /* unclean exit */
617  goto error;
618  }
619  }
620 
621  res = PQgetResult(conn);
622  }
623  if (PQresultStatus(res) != PGRES_COMMAND_OK)
624  {
625  pg_log_error("unexpected termination of replication stream: %s",
626  PQresultErrorMessage(res));
627  goto error;
628  }
629  PQclear(res);
630 
631  if (outfd != -1 && strcmp(outfile, "-") != 0)
632  {
634 
635  /* no need to jump to error on failure here, we're finishing anyway */
636  OutputFsync(t);
637 
638  if (close(outfd) != 0)
639  pg_log_error("could not close file \"%s\": %m", outfile);
640  }
641  outfd = -1;
642 error:
643  if (copybuf != NULL)
644  {
645  PQfreemem(copybuf);
646  copybuf = NULL;
647  }
648  destroyPQExpBuffer(query);
649  PQfinish(conn);
650  conn = NULL;
651 }
static XLogRecPtr output_fsync_lsn
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:701
static volatile sig_atomic_t time_to_abort
static void error(void)
Definition: sql-dyntest.c:147
static int fsync_interval
#define write(a, b, c)
Definition: win32.h:14
static bool OutputFsync(TimestampTz now)
int64 TimestampTz
Definition: timestamp.h:39
#define pg_log_error(...)
Definition: logging.h:80
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:369
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4231
static char * replication_slot
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now)
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:742
#define PG_BINARY
Definition: c.h:1271
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2668
static int verbose
static XLogRecPtr endpos
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:116
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define fstat
Definition: win32_port.h:282
static bool output_isfile
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:127
#define S_IWUSR
Definition: win32_port.h:291
PGconn * conn
Definition: streamutil.c:54
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:267
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:720
static int outfd
#define select(n, r, w, e, timeout)
Definition: win32_port.h:474
static XLogRecPtr output_written_lsn
static bool output_needs_fsync
#define S_ISREG(m)
Definition: win32_port.h:327
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:74
static bool sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1904
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:380
StringInfo copybuf
Definition: tablesync.c:124
void PQclear(PGresult *res)
Definition: fe-exec.c:694
#define Max(x, y)
Definition: c.h:980
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static XLogRecPtr startpos
#define S_IRUSR
Definition: win32_port.h:288
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3194
int64 fe_recvint64(char *buf)
Definition: streamutil.c:766
int i
static TimestampTz output_last_fsync
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
static char * outfile
#define close(a)
Definition: win32.h:12
#define EINTR
Definition: win32_port.h:351
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:148
static size_t noptions
void PQfreemem(void *ptr)
Definition: fe-exec.c:3796
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6770
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
#define pg_log_info(...)
Definition: logging.h:88
static volatile sig_atomic_t output_reopen
static int standby_message_timeout

◆ usage()

static void usage ( void  )
static

Definition at line 74 of file pg_recvlogical.c.

References _, fsync_interval, plugin, printf, progname, and standby_message_timeout.

Referenced by main().

75 {
76  printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
77  progname);
78  printf(_("Usage:\n"));
79  printf(_(" %s [OPTION]...\n"), progname);
80  printf(_("\nAction to be performed:\n"));
81  printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
82  printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
83  printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
84  printf(_("\nOptions:\n"));
85  printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
86  printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
87  printf(_(" -F --fsync-interval=SECS\n"
88  " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
89  printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
90  printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
91  printf(_(" -n, --no-loop do not loop on connection lost\n"));
92  printf(_(" -o, --option=NAME[=VALUE]\n"
93  " pass option NAME with optional value VALUE to the\n"
94  " output plugin\n"));
95  printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
96  printf(_(" -s, --status-interval=SECS\n"
97  " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
98  printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
99  printf(_(" -t, --two-phase enable two-phase decoding when creating a slot\n"));
100  printf(_(" -v, --verbose output verbose messages\n"));
101  printf(_(" -V, --version output version information, then exit\n"));
102  printf(_(" -?, --help show this help, then exit\n"));
103  printf(_("\nConnection options:\n"));
104  printf(_(" -d, --dbname=DBNAME database to connect to\n"));
105  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
106  printf(_(" -p, --port=PORT database server port number\n"));
107  printf(_(" -U, --username=NAME connect as specified database user\n"));
108  printf(_(" -w, --no-password never prompt for password\n"));
109  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
110  printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
111  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
112 }
static const char * plugin
const char * progname
Definition: main.c:46
static int fsync_interval
#define printf(...)
Definition: port.h:223
#define _(x)
Definition: elog.c:89
static int standby_message_timeout

Variable Documentation

◆ do_create_slot

bool do_create_slot = false
static

Definition at line 46 of file pg_recvlogical.c.

Referenced by main().

◆ do_drop_slot

bool do_drop_slot = false
static

Definition at line 49 of file pg_recvlogical.c.

Referenced by main().

◆ do_start_slot

bool do_start_slot = false
static

Definition at line 48 of file pg_recvlogical.c.

Referenced by main().

◆ endpos

XLogRecPtr endpos = InvalidXLogRecPtr
static

Definition at line 45 of file pg_recvlogical.c.

Referenced by main(), and StreamLogicalLog().

◆ fsync_interval

int fsync_interval = 10 * 1000
static

Definition at line 43 of file pg_recvlogical.c.

Referenced by main(), OutputFsync(), StreamLogicalLog(), and usage().

◆ noloop

int noloop = 0
static

Definition at line 41 of file pg_recvlogical.c.

Referenced by main().

◆ noptions

◆ options

◆ outfd

int outfd = -1
static

Definition at line 58 of file pg_recvlogical.c.

Referenced by OutputFsync(), and StreamLogicalLog().

◆ outfile

char* outfile = NULL
static

◆ output_fsync_lsn

XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr
static

Definition at line 65 of file pg_recvlogical.c.

Referenced by OutputFsync(), sendFeedback(), and StreamLogicalLog().

◆ output_isfile

bool output_isfile
static

Definition at line 61 of file pg_recvlogical.c.

Referenced by OutputFsync(), and StreamLogicalLog().

◆ output_last_fsync

TimestampTz output_last_fsync = -1
static

Definition at line 62 of file pg_recvlogical.c.

Referenced by OutputFsync(), and StreamLogicalLog().

◆ output_needs_fsync

bool output_needs_fsync = false
static

Definition at line 63 of file pg_recvlogical.c.

Referenced by OutputFsync(), and StreamLogicalLog().

◆ output_reopen

volatile sig_atomic_t output_reopen = false
static

Definition at line 60 of file pg_recvlogical.c.

Referenced by sighup_handler(), and StreamLogicalLog().

◆ output_written_lsn

XLogRecPtr output_written_lsn = InvalidXLogRecPtr
static

Definition at line 64 of file pg_recvlogical.c.

Referenced by OutputFsync(), sendFeedback(), and StreamLogicalLog().

◆ plugin

const char* plugin = "test_decoding"
static

◆ replication_slot

char* replication_slot = NULL
static

Definition at line 50 of file pg_recvlogical.c.

Referenced by main(), sendFeedback(), and StreamLogicalLog().

◆ slot_exists_ok

bool slot_exists_ok = false
static

Definition at line 47 of file pg_recvlogical.c.

Referenced by main().

◆ standby_message_timeout

int standby_message_timeout = 10 * 1000
static

Definition at line 42 of file pg_recvlogical.c.

Referenced by main(), StreamLogicalLog(), and usage().

◆ startpos

◆ time_to_abort

volatile sig_atomic_t time_to_abort = false
static

Definition at line 59 of file pg_recvlogical.c.

Referenced by main(), sigint_handler(), and StreamLogicalLog().

◆ two_phase

◆ verbose

int verbose = 0
static

Definition at line 39 of file pg_recvlogical.c.

Referenced by main(), prepareToTerminate(), sendFeedback(), and StreamLogicalLog().