53 bool logical,
const char *appname,
58 char **sender_host,
int *sender_port);
64 char **content,
int *len);
115 elog(
ERROR,
"libpqwalreceiver already loaded");
140 keys[++
i] =
"replication";
141 vals[
i] = logical ?
"database" :
"true";
148 keys[++
i] =
"dbname";
149 vals[
i] =
"replication";
151 keys[++
i] =
"fallback_application_name";
155 keys[++
i] =
"client_encoding";
227 (
errmsg(
"could not clear search path: %s",
250 (
errcode(ERRCODE_SYNTAX_ERROR),
251 errmsg(
"invalid connection string syntax: %s", err)));
273 if (conn_opts == NULL)
275 (
errmsg(
"could not parse connection string: %s",
276 _(
"out of memory"))));
279 for (conn_opt = conn_opts; conn_opt->
keyword != NULL; conn_opt++)
284 if (strchr(conn_opt->
dispchar,
'D') ||
285 conn_opt->
val == NULL ||
286 conn_opt->
val[0] ==
'\0')
290 obfuscate = strchr(conn_opt->
dispchar,
'*') != NULL;
293 buf.
len == 0 ?
"" :
" ",
295 obfuscate ?
"********" : conn_opt->
val);
320 if (ret && strlen(ret) != 0)
324 if (ret && strlen(ret) != 0)
325 *sender_port = atoi(ret);
347 (
errmsg(
"could not receive database system identifier and timeline ID from " 348 "the primary server: %s",
358 (
errmsg(
"invalid response from primary server"),
359 errdetail(
"Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
360 ntuples, nfields, 3, 1)));
366 return primary_sysid;
421 char *pubnames_literal;
436 (
errmsg(
"could not start WAL streaming: %s",
439 strlen(pubnames_str));
440 if (!pubnames_literal)
442 (
errmsg(
"could not start WAL streaming: %s",
471 (
errmsg(
"could not start WAL streaming: %s",
494 (
errmsg(
"could not send end-of-streaming message to primary: %s",
516 (
errmsg(
"unexpected result set after end-of-streaming")));
530 (
errmsg(
"error while shutting down streaming COPY: %s",
539 (
errmsg(
"error reading result of streaming command: %s",
547 (
errmsg(
"unexpected result after CommandComplete: %s",
557 char **content,
int *len)
567 snprintf(cmd,
sizeof(cmd),
"TIMELINE_HISTORY %u", tli);
573 (
errmsg(
"could not receive timeline history file from " 574 "the primary server: %s",
584 (
errmsg(
"invalid response from primary server"),
585 errdetail(
"Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
592 memcpy(*content,
PQgetvalue(res, 0, 1), *len);
745 (
errmsg(
"could not receive data from WAL stream: %s",
781 (
errmsg(
"unexpected result after CommandComplete: %s",
796 (
errmsg(
"could not receive data from WAL stream: %s",
802 (
errmsg(
"could not receive data from WAL stream: %s",
821 (
errmsg(
"could not send data to WAL stream: %s",
849 switch (snapshot_action)
874 (
errmsg(
"could not create replication slot \"%s\": %s",
906 const int nRetTypes,
const Oid *retTypes)
917 if (nfields != nRetTypes)
919 (
errmsg(
"invalid query response"),
920 errdetail(
"Expected %d fields, got %d fields.",
921 nRetTypes, nfields)));
927 for (coln = 0; coln < nRetTypes; coln++)
929 PQfname(pgres, coln), retTypes[coln], -1, 0);
938 "libpqrcv query result context",
942 for (tupn = 0; tupn <
PQntuples(pgres); tupn++)
954 for (coln = 0; coln < nfields; coln++)
981 const int nRetTypes,
const Oid *retTypes)
988 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
989 errmsg(
"the query interface requires a database connection")));
1020 walres->
err =
_(
"empty query");
1053 foreach(lc, strings)
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
int PQgetlength(const PGresult *res, int tup_num, int field_num)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
int PQnfields(const PGresult *res)
#define WL_SOCKET_WRITEABLE
char * PQerrorMessage(const PGconn *conn)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
static void libpqrcv_check_conninfo(const char *conninfo)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
#define MaxTupleAttributeNumber
TupleDesc CreateTemplateTupleDesc(int natts)
char * PQfname(const PGresult *res, int field_num)
void termPQExpBuffer(PQExpBuffer str)
char * pstrdup(const char *in)
void ProcessWalRcvInterrupts(void)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
int errcode(int sqlerrcode)
void PQfinish(PGconn *conn)
char * PQport(const PGconn *conn)
int PQputCopyEnd(PGconn *conn, const char *errormsg)
#define WL_SOCKET_READABLE
void MemoryContextReset(MemoryContext context)
int PQserverVersion(const PGconn *conn)
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
int PQntuples(const PGresult *res)
ExecStatusType PQresultStatus(const PGresult *res)
Datum pg_lsn_in(PG_FUNCTION_ARGS)
void ResetLatch(Latch *latch)
int PQgetCopyData(PGconn *conn, char **buffer, int async)
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
char * pchomp(const char *in)
int PQsendQuery(PGconn *conn, const char *query)
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
void pfree(void *pointer)
void appendStringInfo(StringInfo str, const char *fmt,...)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
#define ALLOCSET_DEFAULT_SIZES
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
int PQflush(PGconn *conn)
void appendStringInfoString(StringInfo str, const char *s)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
int errdetail(const char *fmt,...)
void PQconninfoFree(PQconninfoOption *connOptions)
#define CStringGetDatum(X)
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
PQconninfoOption * PQconninfo(PGconn *conn)
MemoryContext CurrentMemoryContext
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
static WalReceiverFunctionsType PQWalReceiverFunctions
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
void * palloc0(Size size)
int PQbackendPID(const PGconn *conn)
WalReceiverFunctionsType * WalReceiverFunctions
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
int PQconsumeInput(PGconn *conn)
static void libpqrcv_disconnect(WalReceiverConn *conn)
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
static int libpqrcv_server_version(WalReceiverConn *conn)
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
#define ereport(elevel,...)
char * PQhost(const PGconn *conn)
void PQclear(PGresult *res)
#define PQExpBufferDataBroken(buf)
int PQendcopy(PGconn *conn)
Tuplestorestate * tuplestore
#define Assert(condition)
const char * GetDatabaseEncodingName(void)
int PQisBusy(PGconn *conn)
#define ALWAYS_SECURE_SEARCH_PATH_SQL
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
int32 pg_strtoint32(const char *s)
PostgresPollingStatusType
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err)
int errmsg(const char *fmt,...)
union WalRcvStreamOptions::@105 proto
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
struct WalRcvStreamOptions::@105::@106 physical
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
static void static void status(const char *fmt,...) pg_attribute_printf(1
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
ConnStatusType PQstatus(const PGconn *conn)
#define WL_SOCKET_CONNECTED
void PQfreemem(void *ptr)
int PQsocket(const PGconn *conn)
PGresult * PQgetResult(PGconn *conn)
void initPQExpBuffer(PQExpBuffer str)
#define WL_EXIT_ON_PM_DEATH