19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
35 #define RECONNECT_SLEEP_TIME 5
55 static const char *
plugin =
"test_decoding";
67 static void usage(
void);
76 printf(
_(
"%s controls PostgreSQL logical decoding streams.\n\n"),
80 printf(
_(
"\nAction to be performed:\n"));
81 printf(
_(
" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
82 printf(
_(
" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
83 printf(
_(
" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
85 printf(
_(
" -E, --endpos=LSN exit after receiving the specified LSN\n"));
86 printf(
_(
" -f, --file=FILE receive log into this file, - for stdout\n"));
87 printf(
_(
" -F --fsync-interval=SECS\n"
88 " time between fsyncs to the output file (default: %d)\n"), (
fsync_interval / 1000));
89 printf(
_(
" --if-not-exists do not error if slot already exists when creating a slot\n"));
90 printf(
_(
" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
91 printf(
_(
" -n, --no-loop do not loop on connection lost\n"));
92 printf(
_(
" -o, --option=NAME[=VALUE]\n"
93 " pass option NAME with optional value VALUE to the\n"
95 printf(
_(
" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"),
plugin);
96 printf(
_(
" -s, --status-interval=SECS\n"
98 printf(
_(
" -S, --slot=SLOTNAME name of the logical replication slot\n"));
99 printf(
_(
" -t, --two-phase enable two-phase decoding when creating a slot\n"));
100 printf(
_(
" -v, --verbose output verbose messages\n"));
101 printf(
_(
" -V, --version output version information, then exit\n"));
102 printf(
_(
" -?, --help show this help, then exit\n"));
103 printf(
_(
"\nConnection options:\n"));
104 printf(
_(
" -d, --dbname=DBNAME database to connect to\n"));
105 printf(
_(
" -h, --host=HOSTNAME database server host or socket directory\n"));
106 printf(
_(
" -p, --port=PORT database server port number\n"));
107 printf(
_(
" -U, --username=NAME connect as specified database user\n"));
108 printf(
_(
" -w, --no-password never prompt for password\n"));
109 printf(
_(
" -W, --password force password prompt (should happen automatically)\n"));
110 printf(
_(
"\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
111 printf(
_(
"%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
123 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
137 pg_log_info(
"confirming write up to %X/%X, flush to %X/%X (slot %s)",
152 replybuf[
len] = replyRequested ? 1 : 0;
229 pg_log_info(
"starting log streaming at %X/%X (slot %s)",
262 pg_log_error(
"could not send replication command \"%s\": %s",
359 struct timeval timeout;
360 struct timeval *timeoutptr = NULL;
368 FD_ZERO(&input_mask);
382 if (message_target > 0 || fsync_target > 0)
388 targettime = message_target;
390 if (fsync_target > 0 && fsync_target < targettime)
391 targettime = fsync_target;
400 timeout.tv_sec = secs;
401 timeout.tv_usec = usecs;
402 timeoutptr = &timeout;
406 if (r == 0 || (r < 0 && errno ==
EINTR))
424 pg_log_error(
"could not receive data from WAL stream: %s",
449 bool endposReached =
false;
479 endposReached =
true;
483 if (replyRequested || endposReached)
539 bytes_left = r - hdr_len;
550 copybuf + hdr_len + bytes_written,
555 pg_log_error(
"could not write %d bytes to log file \"%s\": %m",
561 bytes_written += ret;
567 pg_log_error(
"could not write %d bytes to log file \"%s\": %m",
621 pg_log_error(
"unexpected termination of replication stream: %s",
678 static struct option long_options[] = {
720 if (strcmp(argv[1],
"--help") == 0 || strcmp(argv[1],
"-?") == 0)
725 else if (strcmp(argv[1],
"-V") == 0 ||
726 strcmp(argv[1],
"--version") == 0)
728 puts(
"pg_recvlogical (PostgreSQL) " PG_VERSION);
733 while ((
c =
getopt_long(argc, argv,
"E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
734 long_options, &option_index)) != -1)
779 if (sscanf(
optarg,
"%X/%X", &hi, &lo) != 2)
781 startpos = ((uint64) hi) << 32 | lo;
784 if (sscanf(
optarg,
"%X/%X", &hi, &lo) != 2)
786 endpos = ((uint64) hi) << 32 | lo;
847 pg_log_error(
"too many command-line arguments (first is \"%s\")",
879 pg_log_error(
"at least one action needs to be specified");
886 pg_log_error(
"cannot use --create-slot or --start together with --drop-slot");
893 pg_log_error(
"cannot use --create-slot or --drop-slot together with --startpos");
900 pg_log_error(
"--endpos may only be specified with --start");
907 pg_log_error(
"--two-phase may only be specified with --create-slot");
939 pg_fatal(
"could not establish database-specific replication connection");
993 pg_log_info(
"disconnected; waiting %d seconds to try again",
1033 pg_log_info(
"end position %X/%X reached by keepalive",
1036 pg_log_info(
"end position %X/%X reached by WAL record at %X/%X",
Datum now(PG_FUNCTION_ARGS)
#define PG_TEXTDOMAIN(domain)
void set_pglocale_pgservice(const char *argv0, const char *app)
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
static void PGresult * res
char * PQerrorMessage(const PGconn *conn)
void PQfinish(PGconn *conn)
int PQsocket(const PGconn *conn)
int PQflush(PGconn *conn)
void PQfreemem(void *ptr)
ExecStatusType PQresultStatus(const PGresult *res)
void PQclear(PGresult *res)
char * PQresultErrorMessage(const PGresult *res)
int PQputCopyEnd(PGconn *conn, const char *errormsg)
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
PGresult * PQexec(PGconn *conn, const char *query)
int PQconsumeInput(PGconn *conn)
PGresult * PQgetResult(PGconn *conn)
int PQgetCopyData(PGconn *conn, char **buffer, int async)
void * pg_realloc(void *ptr, size_t size)
char * pg_strdup(const char *in)
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
#define required_argument
void pg_logging_init(const char *argv0)
#define pg_log_error(...)
#define pg_log_error_hint(...)
bool option_parse_int(const char *optarg, const char *optname, int min_range, int max_range, int *result)
PGDLLIMPORT char * optarg
static bool do_start_slot
static bool OutputFsync(TimestampTz now)
static void StreamLogicalLog(void)
static bool output_needs_fsync
static void sigint_handler(int signum)
int main(int argc, char **argv)
static const char * plugin
static volatile sig_atomic_t time_to_abort
static bool slot_exists_ok
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
static int fsync_interval
static bool output_isfile
static char * replication_slot
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now)
static XLogRecPtr output_written_lsn
static void sighup_handler(int signum)
static TimestampTz output_last_fsync
static XLogRecPtr output_fsync_lsn
static bool sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
static XLogRecPtr startpos
#define RECONNECT_SLEEP_TIME
static bool do_create_slot
static void disconnect_atexit(void)
static int standby_message_timeout
static volatile sig_atomic_t output_reopen
const char * get_progname(const char *argv0)
PQExpBuffer createPQExpBuffer(void)
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
void pg_usleep(long microsec)
pqsigfunc pqsignal(int signum, pqsigfunc handler)
int64 fe_recvint64(char *buf)
TimestampTz feGetCurrentTimestamp(void)
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
void fe_sendint64(int64 i, char *buf)
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
static StringInfo copybuf
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
#define select(n, r, w, e, timeout)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr