31 #define RECONNECT_SLEEP_TIME 5
59 static const char *
plugin =
"test_decoding";
72 static void usage(
void);
82 printf(
_(
"%s controls PostgreSQL logical decoding streams.\n\n"),
86 printf(
_(
"\nAction to be performed:\n"));
87 printf(
_(
" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
88 printf(
_(
" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
89 printf(
_(
" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
91 printf(
_(
" -E, --endpos=LSN exit after receiving the specified LSN\n"));
92 printf(
_(
" -f, --file=FILE receive log into this file, - for stdout\n"));
93 printf(
_(
" -F --fsync-interval=SECS\n"
94 " time between fsyncs to the output file (default: %d)\n"), (
fsync_interval / 1000));
95 printf(
_(
" --if-not-exists do not error if slot already exists when creating a slot\n"));
96 printf(
_(
" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
97 printf(
_(
" -n, --no-loop do not loop on connection lost\n"));
98 printf(
_(
" -o, --option=NAME[=VALUE]\n"
99 " pass option NAME with optional value VALUE to the\n"
100 " output plugin\n"));
101 printf(
_(
" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"),
plugin);
102 printf(
_(
" -s, --status-interval=SECS\n"
104 printf(
_(
" -S, --slot=SLOTNAME name of the logical replication slot\n"));
105 printf(
_(
" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
106 printf(
_(
" -v, --verbose output verbose messages\n"));
107 printf(
_(
" -V, --version output version information, then exit\n"));
108 printf(
_(
" -?, --help show this help, then exit\n"));
109 printf(
_(
"\nConnection options:\n"));
110 printf(
_(
" -d, --dbname=DBNAME database to connect to\n"));
111 printf(
_(
" -h, --host=HOSTNAME database server host or socket directory\n"));
112 printf(
_(
" -p, --port=PORT database server port number\n"));
113 printf(
_(
" -U, --username=NAME connect as specified database user\n"));
114 printf(
_(
" -w, --no-password never prompt for password\n"));
115 printf(
_(
" -W, --password force password prompt (should happen automatically)\n"));
116 printf(
_(
"\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
117 printf(
_(
"%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
129 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
143 pg_log_info(
"confirming write up to %X/%X, flush to %X/%X (slot %s)",
158 replybuf[
len] = replyRequested ? 1 : 0;
237 pg_log_info(
"starting log streaming at %X/%X (slot %s)",
270 pg_log_error(
"could not send replication command \"%s\": %s",
368 struct timeval timeout;
369 struct timeval *timeoutptr = NULL;
377 FD_ZERO(&input_mask);
391 if (message_target > 0 || fsync_target > 0)
397 targettime = message_target;
399 if (fsync_target > 0 && fsync_target < targettime)
400 targettime = fsync_target;
409 timeout.tv_sec = secs;
410 timeout.tv_usec = usecs;
411 timeoutptr = &timeout;
415 if (r == 0 || (r < 0 && errno ==
EINTR))
433 pg_log_error(
"could not receive data from WAL stream: %s",
458 bool endposReached =
false;
488 endposReached =
true;
492 if (replyRequested || endposReached)
548 bytes_left = r - hdr_len;
559 copybuf + hdr_len + bytes_written,
564 pg_log_error(
"could not write %d bytes to log file \"%s\": %m",
570 bytes_written += ret;
576 pg_log_error(
"could not write %d bytes to log file \"%s\": %m",
634 pg_log_error(
"unexpected termination of replication stream: %s",
692 static struct option long_options[] = {
734 if (strcmp(argv[1],
"--help") == 0 || strcmp(argv[1],
"-?") == 0)
739 else if (strcmp(argv[1],
"-V") == 0 ||
740 strcmp(argv[1],
"--version") == 0)
742 puts(
"pg_recvlogical (PostgreSQL) " PG_VERSION);
747 while ((
c =
getopt_long(argc, argv,
"E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
748 long_options, &option_index)) != -1)
793 if (sscanf(
optarg,
"%X/%X", &hi, &lo) != 2)
795 startpos = ((uint64) hi) << 32 | lo;
798 if (sscanf(
optarg,
"%X/%X", &hi, &lo) != 2)
800 endpos = ((uint64) hi) << 32 | lo;
861 pg_log_error(
"too many command-line arguments (first is \"%s\")",
893 pg_log_error(
"at least one action needs to be specified");
900 pg_log_error(
"cannot use --create-slot or --start together with --drop-slot");
907 pg_log_error(
"cannot use --create-slot or --drop-slot together with --startpos");
914 pg_log_error(
"--endpos may only be specified with --start");
921 pg_log_error(
"--two-phase may only be specified with --create-slot");
954 pg_fatal(
"could not establish database-specific replication connection");
1008 pg_log_info(
"disconnected; waiting %d seconds to try again",
1051 pg_log_info(
"received interrupt signal, exiting");
1054 pg_log_info(
"end position %X/%X reached by keepalive",
1059 pg_log_info(
"end position %X/%X reached by WAL record at %X/%X",
Datum now(PG_FUNCTION_ARGS)
#define Assert(condition)
#define PG_TEXTDOMAIN(domain)
void set_pglocale_pgservice(const char *argv0, const char *app)
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
static void PGresult * res
char * PQerrorMessage(const PGconn *conn)
void PQfinish(PGconn *conn)
int PQsocket(const PGconn *conn)
int PQflush(PGconn *conn)
void PQfreemem(void *ptr)
ExecStatusType PQresultStatus(const PGresult *res)
char * PQresultErrorMessage(const PGresult *res)
int PQputCopyEnd(PGconn *conn, const char *errormsg)
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
PGresult * PQexec(PGconn *conn, const char *query)
int PQconsumeInput(PGconn *conn)
PGresult * PQgetResult(PGconn *conn)
int PQgetCopyData(PGconn *conn, char **buffer, int async)
void * pg_realloc(void *ptr, size_t size)
char * pg_strdup(const char *in)
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
#define required_argument
void pg_logging_init(const char *argv0)
#define pg_log_error(...)
#define pg_log_error_hint(...)
bool option_parse_int(const char *optarg, const char *optname, int min_range, int max_range, int *result)
PGDLLIMPORT char * optarg
static volatile sig_atomic_t stop_reason
static bool do_start_slot
static bool OutputFsync(TimestampTz now)
static void sighup_handler(SIGNAL_ARGS)
static void StreamLogicalLog(void)
static bool output_needs_fsync
int main(int argc, char **argv)
static const char * plugin
static volatile sig_atomic_t time_to_abort
static bool slot_exists_ok
static int fsync_interval
static bool output_isfile
static char * replication_slot
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now)
static XLogRecPtr output_written_lsn
static void sigexit_handler(SIGNAL_ARGS)
static TimestampTz output_last_fsync
static XLogRecPtr output_fsync_lsn
static bool sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
static XLogRecPtr startpos
#define RECONNECT_SLEEP_TIME
static bool do_create_slot
static void disconnect_atexit(void)
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason, XLogRecPtr lsn)
static int standby_message_timeout
static volatile sig_atomic_t output_reopen
const char * get_progname(const char *argv0)
pqsigfunc pqsignal(int signo, pqsigfunc func)
PQExpBuffer createPQExpBuffer(void)
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
void pg_usleep(long microsec)
int64 fe_recvint64(char *buf)
TimestampTz feGetCurrentTimestamp(void)
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
void fe_sendint64(int64 i, char *buf)
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
static StringInfo copybuf
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
#define select(n, r, w, e, timeout)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr