PostgreSQL Source Code  git master
libpqwalreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/time.h>
#include "libpq-fe.h"
#include "pqexpbuffer.h"
#include "access/xlog.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/tuplestore.h"
Include dependency graph for libpqwalreceiver.c:

Go to the source code of this file.

Data Structures

struct  WalReceiverConn
 

Functions

void _PG_init (void)
 
static WalReceiverConnlibpqrcv_connect (const char *conninfo, bool logical, const char *appname, char **err)
 
static void libpqrcv_check_conninfo (const char *conninfo)
 
static char * libpqrcv_get_conninfo (WalReceiverConn *conn)
 
static void libpqrcv_get_senderinfo (WalReceiverConn *conn, char **sender_host, int *sender_port)
 
static char * libpqrcv_identify_system (WalReceiverConn *conn, TimeLineID *primary_tli)
 
static int libpqrcv_server_version (WalReceiverConn *conn)
 
static void libpqrcv_readtimelinehistoryfile (WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
 
static bool libpqrcv_startstreaming (WalReceiverConn *conn, const WalRcvStreamOptions *options)
 
static void libpqrcv_endstreaming (WalReceiverConn *conn, TimeLineID *next_tli)
 
static int libpqrcv_receive (WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
 
static void libpqrcv_send (WalReceiverConn *conn, const char *buffer, int nbytes)
 
static char * libpqrcv_create_slot (WalReceiverConn *conn, const char *slotname, bool temporary, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
 
static WalRcvExecResultlibpqrcv_exec (WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
 
static void libpqrcv_disconnect (WalReceiverConn *conn)
 
static PGresultlibpqrcv_PQexec (PGconn *streamConn, const char *query)
 
static PGresultlibpqrcv_PQgetResult (PGconn *streamConn)
 
static char * stringlist_to_identifierstr (PGconn *conn, List *strings)
 
static void libpqrcv_processTuples (PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
 

Variables

 PG_MODULE_MAGIC
 
static WalReceiverFunctionsType PQWalReceiverFunctions
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 109 of file libpqwalreceiver.c.

References elog, ERROR, PQWalReceiverFunctions, and WalReceiverFunctions.

110 {
111  if (WalReceiverFunctions != NULL)
112  elog(ERROR, "libpqwalreceiver already loaded");
114 }
#define ERROR
Definition: elog.h:43
static WalReceiverFunctionsType PQWalReceiverFunctions
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:81
#define elog(elevel,...)
Definition: elog.h:226

◆ libpqrcv_check_conninfo()

static void libpqrcv_check_conninfo ( const char *  conninfo)
static

Definition at line 223 of file libpqwalreceiver.c.

References ereport, errcode(), errmsg(), ERROR, PQconninfoFree(), and PQconninfoParse().

224 {
225  PQconninfoOption *opts = NULL;
226  char *err = NULL;
227 
228  opts = PQconninfoParse(conninfo, &err);
229  if (opts == NULL)
230  ereport(ERROR,
231  (errcode(ERRCODE_SYNTAX_ERROR),
232  errmsg("invalid connection string syntax: %s", err)));
233 
234  PQconninfoFree(opts);
235 }
int errcode(int sqlerrcode)
Definition: elog.c:570
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5180
#define ERROR
Definition: elog.h:43
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:6410
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ libpqrcv_connect()

static WalReceiverConn * libpqrcv_connect ( const char *  conninfo,
bool  logical,
const char *  appname,
char **  err 
)
static

Definition at line 122 of file libpqwalreceiver.c.

References Assert, conn, CONNECTION_BAD, CONNECTION_OK, CONNECTION_STARTED, GetDatabaseEncodingName(), i, WalReceiverConn::logical, MyLatch, palloc0(), pchomp(), PGRES_POLLING_FAILED, PGRES_POLLING_OK, PGRES_POLLING_READING, PGRES_POLLING_WRITING, PQconnectPoll(), PQconnectStartParams(), PQerrorMessage(), PQsocket(), PQstatus(), ProcessWalRcvInterrupts(), ResetLatch(), status(), WalReceiverConn::streamConn, WAIT_EVENT_LIBPQWALRECEIVER_CONNECT, WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_CONNECTED, WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

124 {
127  const char *keys[5];
128  const char *vals[5];
129  int i = 0;
130 
131  /*
132  * We use the expand_dbname parameter to process the connection string (or
133  * URI), and pass some extra options.
134  */
135  keys[i] = "dbname";
136  vals[i] = conninfo;
137  keys[++i] = "replication";
138  vals[i] = logical ? "database" : "true";
139  if (!logical)
140  {
141  /*
142  * The database name is ignored by the server in replication mode, but
143  * specify "replication" for .pgpass lookup.
144  */
145  keys[++i] = "dbname";
146  vals[i] = "replication";
147  }
148  keys[++i] = "fallback_application_name";
149  vals[i] = appname;
150  if (logical)
151  {
152  keys[++i] = "client_encoding";
153  vals[i] = GetDatabaseEncodingName();
154  }
155  keys[++i] = NULL;
156  vals[i] = NULL;
157 
158  Assert(i < sizeof(keys));
159 
160  conn = palloc0(sizeof(WalReceiverConn));
161  conn->streamConn = PQconnectStartParams(keys, vals,
162  /* expand_dbname = */ true);
163  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
164  {
165  *err = pchomp(PQerrorMessage(conn->streamConn));
166  return NULL;
167  }
168 
169  /*
170  * Poll connection until we have OK or FAILED status.
171  *
172  * Per spec for PQconnectPoll, first wait till socket is write-ready.
173  */
174  status = PGRES_POLLING_WRITING;
175  do
176  {
177  int io_flag;
178  int rc;
179 
180  if (status == PGRES_POLLING_READING)
181  io_flag = WL_SOCKET_READABLE;
182 #ifdef WIN32
183  /* Windows needs a different test while waiting for connection-made */
184  else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
185  io_flag = WL_SOCKET_CONNECTED;
186 #endif
187  else
188  io_flag = WL_SOCKET_WRITEABLE;
189 
191  WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
192  PQsocket(conn->streamConn),
193  0,
195 
196  /* Interrupted? */
197  if (rc & WL_LATCH_SET)
198  {
201  }
202 
203  /* If socket is ready, advance the libpq state machine */
204  if (rc & io_flag)
205  status = PQconnectPoll(conn->streamConn);
206  } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
207 
208  if (PQstatus(conn->streamConn) != CONNECTION_OK)
209  {
210  *err = pchomp(PQerrorMessage(conn->streamConn));
211  return NULL;
212  }
213 
214  conn->logical = logical;
215 
216  return conn;
217 }
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6578
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(Latch *latch)
Definition: latch.c:519
char * pchomp(const char *in)
Definition: mcxt.c:1189
PGconn * conn
Definition: streamutil.c:56
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void * palloc0(Size size)
Definition: mcxt.c:955
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2139
#define Assert(condition)
Definition: c.h:732
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1002
PostgresPollingStatusType
Definition: libpq-fe.h:73
int i
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:716
struct Latch * MyLatch
Definition: globals.c:54
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6525
#define WL_SOCKET_CONNECTED
Definition: latch.h:134
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6596
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ libpqrcv_create_slot()

static char * libpqrcv_create_slot ( WalReceiverConn conn,
const char *  slotname,
bool  temporary,
CRSSnapshotAction  snapshot_action,
XLogRecPtr lsn 
)
static

Definition at line 804 of file libpqwalreceiver.c.

References appendStringInfo(), appendStringInfoString(), CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetDatum, StringInfoData::data, DatumGetLSN, DirectFunctionCall1Coll(), ereport, errmsg(), ERROR, initStringInfo(), InvalidOid, libpqrcv_PQexec(), WalReceiverConn::logical, pchomp(), pfree(), pg_lsn_in(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetisnull(), PQgetvalue(), PQresultStatus(), pstrdup(), and WalReceiverConn::streamConn.

807 {
808  PGresult *res;
809  StringInfoData cmd;
810  char *snapshot;
811 
812  initStringInfo(&cmd);
813 
814  appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
815 
816  if (temporary)
817  appendStringInfoString(&cmd, " TEMPORARY");
818 
819  if (conn->logical)
820  {
821  appendStringInfoString(&cmd, " LOGICAL pgoutput");
822  switch (snapshot_action)
823  {
824  case CRS_EXPORT_SNAPSHOT:
825  appendStringInfoString(&cmd, " EXPORT_SNAPSHOT");
826  break;
828  appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT");
829  break;
830  case CRS_USE_SNAPSHOT:
831  appendStringInfoString(&cmd, " USE_SNAPSHOT");
832  break;
833  }
834  }
835 
836  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
837  pfree(cmd.data);
838 
839  if (PQresultStatus(res) != PGRES_TUPLES_OK)
840  {
841  PQclear(res);
842  ereport(ERROR,
843  (errmsg("could not create replication slot \"%s\": %s",
844  slotname, pchomp(PQerrorMessage(conn->streamConn)))));
845  }
846 
848  CStringGetDatum(PQgetvalue(res, 0, 1))));
849  if (!PQgetisnull(res, 0, 2))
850  snapshot = pstrdup(PQgetvalue(res, 0, 2));
851  else
852  snapshot = NULL;
853 
854  PQclear(res);
855 
856  return snapshot;
857 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6578
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3164
char * pstrdup(const char *in)
Definition: mcxt.c:1161
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2693
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:63
char * pchomp(const char *in)
Definition: mcxt.c:1189
void pfree(void *pointer)
Definition: mcxt.c:1031
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
#define ERROR
Definition: elog.h:43
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:163
#define CStringGetDatum(X)
Definition: postgres.h:578
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:793
#define ereport(elevel, rest)
Definition: elog.h:141
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
#define InvalidOid
Definition: postgres_ext.h:36
#define DatumGetLSN(X)
Definition: pg_lsn.h:21
void PQclear(PGresult *res)
Definition: fe-exec.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:784
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3189

◆ libpqrcv_disconnect()

static void libpqrcv_disconnect ( WalReceiverConn conn)
static

Definition at line 677 of file libpqwalreceiver.c.

References pfree(), PQfinish(), PQfreemem(), WalReceiverConn::recvBuf, and WalReceiverConn::streamConn.

678 {
679  PQfinish(conn->streamConn);
680  if (conn->recvBuf != NULL)
681  PQfreemem(conn->recvBuf);
682  pfree(conn);
683 }
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4080
void pfree(void *pointer)
Definition: mcxt.c:1031
void PQfreemem(void *ptr)
Definition: fe-exec.c:3297

◆ libpqrcv_endstreaming()

static void libpqrcv_endstreaming ( WalReceiverConn conn,
TimeLineID next_tli 
)
static

Definition at line 456 of file libpqwalreceiver.c.

References ereport, errmsg(), ERROR, libpqrcv_PQgetResult(), pchomp(), pg_strtoint32(), PGRES_COMMAND_OK, PGRES_COPY_OUT, PGRES_TUPLES_OK, PQclear(), PQendcopy(), PQerrorMessage(), PQflush(), PQgetvalue(), PQnfields(), PQntuples(), PQputCopyEnd(), PQresultStatus(), and WalReceiverConn::streamConn.

457 {
458  PGresult *res;
459 
460  /*
461  * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
462  * block, but the risk seems small.
463  */
464  if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
465  PQflush(conn->streamConn))
466  ereport(ERROR,
467  (errmsg("could not send end-of-streaming message to primary: %s",
468  pchomp(PQerrorMessage(conn->streamConn)))));
469 
470  *next_tli = 0;
471 
472  /*
473  * After COPY is finished, we should receive a result set indicating the
474  * next timeline's ID, or just CommandComplete if the server was shut
475  * down.
476  *
477  * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
478  * also possible in case we aborted the copy in mid-stream.
479  */
480  res = libpqrcv_PQgetResult(conn->streamConn);
481  if (PQresultStatus(res) == PGRES_TUPLES_OK)
482  {
483  /*
484  * Read the next timeline's ID. The server also sends the timeline's
485  * starting point, but it is ignored.
486  */
487  if (PQnfields(res) < 2 || PQntuples(res) != 1)
488  ereport(ERROR,
489  (errmsg("unexpected result set after end-of-streaming")));
490  *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
491  PQclear(res);
492 
493  /* the result set should be followed by CommandComplete */
494  res = libpqrcv_PQgetResult(conn->streamConn);
495  }
496  else if (PQresultStatus(res) == PGRES_COPY_OUT)
497  {
498  PQclear(res);
499 
500  /* End the copy */
501  if (PQendcopy(conn->streamConn))
502  ereport(ERROR,
503  (errmsg("error while shutting down streaming COPY: %s",
504  pchomp(PQerrorMessage(conn->streamConn)))));
505 
506  /* CommandComplete should follow */
507  res = libpqrcv_PQgetResult(conn->streamConn);
508  }
509 
510  if (PQresultStatus(res) != PGRES_COMMAND_OK)
511  ereport(ERROR,
512  (errmsg("error reading result of streaming command: %s",
513  pchomp(PQerrorMessage(conn->streamConn)))));
514  PQclear(res);
515 
516  /* Verify that there are no more results */
517  res = libpqrcv_PQgetResult(conn->streamConn);
518  if (res != NULL)
519  ereport(ERROR,
520  (errmsg("unexpected result after CommandComplete: %s",
521  pchomp(PQerrorMessage(conn->streamConn)))));
522 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2778
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6578
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3164
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2385
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2770
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2693
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
char * pchomp(const char *in)
Definition: mcxt.c:1189
#define ERROR
Definition: elog.h:43
int PQflush(PGconn *conn)
Definition: fe-exec.c:3284
#define ereport(elevel, rest)
Definition: elog.h:141
void PQclear(PGresult *res)
Definition: fe-exec.c:695
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2619
int32 pg_strtoint32(const char *s)
Definition: numutils.c:199
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ libpqrcv_exec()

static WalRcvExecResult * libpqrcv_exec ( WalReceiverConn conn,
const char *  query,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 938 of file libpqwalreceiver.c.

References _, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, InvalidOid, libpqrcv_PQexec(), libpqrcv_processTuples(), MyDatabaseId, palloc0(), pchomp(), PGRES_BAD_RESPONSE, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PGRES_EMPTY_QUERY, PGRES_FATAL_ERROR, PGRES_NONFATAL_ERROR, PGRES_SINGLE_TUPLE, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQresultStatus(), WalRcvExecResult::status, WalReceiverConn::streamConn, WALRCV_ERROR, WALRCV_OK_COMMAND, WALRCV_OK_COPY_BOTH, WALRCV_OK_COPY_IN, WALRCV_OK_COPY_OUT, and WALRCV_OK_TUPLES.

940 {
941  PGresult *pgres = NULL;
942  WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
943 
944  if (MyDatabaseId == InvalidOid)
945  ereport(ERROR,
946  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
947  errmsg("the query interface requires a database connection")));
948 
949  pgres = libpqrcv_PQexec(conn->streamConn, query);
950 
951  switch (PQresultStatus(pgres))
952  {
953  case PGRES_SINGLE_TUPLE:
954  case PGRES_TUPLES_OK:
955  walres->status = WALRCV_OK_TUPLES;
956  libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
957  break;
958 
959  case PGRES_COPY_IN:
960  walres->status = WALRCV_OK_COPY_IN;
961  break;
962 
963  case PGRES_COPY_OUT:
964  walres->status = WALRCV_OK_COPY_OUT;
965  break;
966 
967  case PGRES_COPY_BOTH:
968  walres->status = WALRCV_OK_COPY_BOTH;
969  break;
970 
971  case PGRES_COMMAND_OK:
972  walres->status = WALRCV_OK_COMMAND;
973  break;
974 
975  /* Empty query is considered error. */
976  case PGRES_EMPTY_QUERY:
977  walres->status = WALRCV_ERROR;
978  walres->err = _("empty query");
979  break;
980 
982  case PGRES_FATAL_ERROR:
983  case PGRES_BAD_RESPONSE:
984  walres->status = WALRCV_ERROR;
985  walres->err = pchomp(PQerrorMessage(conn->streamConn));
986  break;
987  }
988 
989  PQclear(pgres);
990 
991  return walres;
992 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6578
int errcode(int sqlerrcode)
Definition: elog.c:570
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2693
char * pchomp(const char *in)
Definition: mcxt.c:1189
#define ERROR
Definition: elog.h:43
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
#define ereport(elevel, rest)
Definition: elog.h:141
void * palloc0(Size size)
Definition: mcxt.c:955
Oid MyDatabaseId
Definition: globals.c:85
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
#define InvalidOid
Definition: postgres_ext.h:36
void PQclear(PGresult *res)
Definition: fe-exec.c:695
WalRcvExecStatus status
Definition: walreceiver.h:195
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define _(x)
Definition: elog.c:84

◆ libpqrcv_get_conninfo()

static char * libpqrcv_get_conninfo ( WalReceiverConn conn)
static

Definition at line 242 of file libpqwalreceiver.c.

References _, appendPQExpBuffer(), Assert, buf, PQExpBufferData::data, _PQconninfoOption::dispchar, ereport, errmsg(), ERROR, initPQExpBuffer(), _PQconninfoOption::keyword, PQExpBufferData::len, PQconninfo(), PQconninfoFree(), PQExpBufferDataBroken, pstrdup(), WalReceiverConn::streamConn, termPQExpBuffer(), and _PQconninfoOption::val.

243 {
244  PQconninfoOption *conn_opts;
245  PQconninfoOption *conn_opt;
247  char *retval;
248 
249  Assert(conn->streamConn != NULL);
250 
251  initPQExpBuffer(&buf);
252  conn_opts = PQconninfo(conn->streamConn);
253 
254  if (conn_opts == NULL)
255  ereport(ERROR,
256  (errmsg("could not parse connection string: %s",
257  _("out of memory"))));
258 
259  /* build a clean connection string from pieces */
260  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
261  {
262  bool obfuscate;
263 
264  /* Skip debug and empty options */
265  if (strchr(conn_opt->dispchar, 'D') ||
266  conn_opt->val == NULL ||
267  conn_opt->val[0] == '\0')
268  continue;
269 
270  /* Obfuscate security-sensitive options */
271  obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
272 
273  appendPQExpBuffer(&buf, "%s%s=%s",
274  buf.len == 0 ? "" : " ",
275  conn_opt->keyword,
276  obfuscate ? "********" : conn_opt->val);
277  }
278 
279  PQconninfoFree(conn_opts);
280 
281  retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
282  termPQExpBuffer(&buf);
283  return retval;
284 }
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:131
char * pstrdup(const char *in)
Definition: mcxt.c:1161
#define ERROR
Definition: elog.h:43
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:267
static char * buf
Definition: pg_test_fsync.c:68
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:6410
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:6369
#define ereport(elevel, rest)
Definition: elog.h:141
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67
#define Assert(condition)
Definition: c.h:732
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define _(x)
Definition: elog.c:84
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:92

◆ libpqrcv_get_senderinfo()

static void libpqrcv_get_senderinfo ( WalReceiverConn conn,
char **  sender_host,
int *  sender_port 
)
static

Definition at line 290 of file libpqwalreceiver.c.

References Assert, PQhost(), PQport(), pstrdup(), and WalReceiverConn::streamConn.

292 {
293  char *ret = NULL;
294 
295  *sender_host = NULL;
296  *sender_port = 0;
297 
298  Assert(conn->streamConn != NULL);
299 
300  ret = PQhost(conn->streamConn);
301  if (ret && strlen(ret) != 0)
302  *sender_host = pstrdup(ret);
303 
304  ret = PQport(conn->streamConn);
305  if (ret && strlen(ret) != 0)
306  *sender_port = atoi(ret);
307 }
char * pstrdup(const char *in)
Definition: mcxt.c:1161
char * PQport(const PGconn *conn)
Definition: fe-connect.c:6497
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:6461
#define Assert(condition)
Definition: c.h:732

◆ libpqrcv_identify_system()

static char * libpqrcv_identify_system ( WalReceiverConn conn,
TimeLineID primary_tli 
)
static

Definition at line 314 of file libpqwalreceiver.c.

References ereport, errdetail(), errmsg(), ERROR, libpqrcv_PQexec(), pchomp(), pg_strtoint32(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), and WalReceiverConn::streamConn.

315 {
316  PGresult *res;
317  char *primary_sysid;
318 
319  /*
320  * Get the system identifier and timeline ID as a DataRow message from the
321  * primary server.
322  */
323  res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
324  if (PQresultStatus(res) != PGRES_TUPLES_OK)
325  {
326  PQclear(res);
327  ereport(ERROR,
328  (errmsg("could not receive database system identifier and timeline ID from "
329  "the primary server: %s",
330  pchomp(PQerrorMessage(conn->streamConn)))));
331  }
332  if (PQnfields(res) < 3 || PQntuples(res) != 1)
333  {
334  int ntuples = PQntuples(res);
335  int nfields = PQnfields(res);
336 
337  PQclear(res);
338  ereport(ERROR,
339  (errmsg("invalid response from primary server"),
340  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
341  ntuples, nfields, 3, 1)));
342  }
343  primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
344  *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
345  PQclear(res);
346 
347  return primary_sysid;
348 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2778
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6578
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3164
char * pstrdup(const char *in)
Definition: mcxt.c:1161
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2770
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2693
char * pchomp(const char *in)
Definition: mcxt.c:1189
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:860
#define ereport(elevel, rest)
Definition: elog.h:141
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
void PQclear(PGresult *res)
Definition: fe-exec.c:695
int32 pg_strtoint32(const char *s)
Definition: numutils.c:199
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ libpqrcv_PQexec()

static PGresult * libpqrcv_PQexec ( PGconn streamConn,
const char *  query 
)
static

Definition at line 583 of file libpqwalreceiver.c.

References CONNECTION_BAD, libpqrcv_PQgetResult(), PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PQclear(), PQresultStatus(), PQsendQuery(), and PQstatus().

Referenced by libpqrcv_create_slot(), libpqrcv_exec(), libpqrcv_identify_system(), libpqrcv_readtimelinehistoryfile(), and libpqrcv_startstreaming().

584 {
585  PGresult *lastResult = NULL;
586 
587  /*
588  * PQexec() silently discards any prior query results on the connection.
589  * This is not required for this function as it's expected that the caller
590  * (which is this library in all cases) will behave correctly and we don't
591  * have to be backwards compatible with old libpq.
592  */
593 
594  /*
595  * Submit the query. Since we don't use non-blocking mode, this could
596  * theoretically block. In practice, since we don't send very long query
597  * strings, the risk seems negligible.
598  */
599  if (!PQsendQuery(streamConn, query))
600  return NULL;
601 
602  for (;;)
603  {
604  /* Wait for, and collect, the next PGresult. */
605  PGresult *result;
606 
607  result = libpqrcv_PQgetResult(streamConn);
608  if (result == NULL)
609  break; /* query is complete, or failure */
610 
611  /*
612  * Emulate PQexec()'s behavior of returning the last result when there
613  * are many. We are fine with returning just last error message.
614  */
615  PQclear(lastResult);
616  lastResult = result;
617 
618  if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
619  PQresultStatus(lastResult) == PGRES_COPY_OUT ||
620  PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
621  PQstatus(streamConn) == CONNECTION_BAD)
622  break;
623  }
624 
625  return lastResult;
626 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2693
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1235
void PQclear(PGresult *res)
Definition: fe-exec.c:695
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6525

◆ libpqrcv_PQgetResult()

static PGresult * libpqrcv_PQgetResult ( PGconn streamConn)
static

Definition at line 632 of file libpqwalreceiver.c.

References MyLatch, PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ProcessWalRcvInterrupts(), ResetLatch(), WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE, WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by libpqrcv_endstreaming(), libpqrcv_PQexec(), and libpqrcv_receive().

633 {
634  /*
635  * Collect data until PQgetResult is ready to get the result without
636  * blocking.
637  */
638  while (PQisBusy(streamConn))
639  {
640  int rc;
641 
642  /*
643  * We don't need to break down the sleep into smaller increments,
644  * since we'll get interrupted by signals and can handle any
645  * interrupts here.
646  */
649  WL_LATCH_SET,
650  PQsocket(streamConn),
651  0,
653 
654  /* Interrupted? */
655  if (rc & WL_LATCH_SET)
656  {
659  }
660 
661  /* Consume whatever data is available from the socket */
662  if (PQconsumeInput(streamConn) == 0)
663  {
664  /* trouble; return NULL */
665  return NULL;
666  }
667  }
668 
669  /* Now we can collect and return the next PGresult */
670  return PQgetResult(streamConn);
671 }
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(Latch *latch)
Definition: latch.c:519
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1705
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1755
struct Latch * MyLatch
Definition: globals.c:54
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6596
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1779
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ libpqrcv_processTuples()

static void libpqrcv_processTuples ( PGresult pgres,
WalRcvExecResult walres,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 863 of file libpqwalreceiver.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, BuildTupleFromCStrings(), CreateTemplateTupleDesc(), CurrentMemoryContext, ereport, errdetail(), errmsg(), ERROR, MaxTupleAttributeNumber, MemoryContextDelete(), MemoryContextReset(), MemoryContextSwitchTo(), PQfname(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), ProcessWalRcvInterrupts(), WalRcvExecResult::tupledesc, TupleDescGetAttInMetadata(), TupleDescInitEntry(), WalRcvExecResult::tuplestore, tuplestore_begin_heap(), tuplestore_puttuple(), and work_mem.

Referenced by libpqrcv_exec().

865 {
866  int tupn;
867  int coln;
868  int nfields = PQnfields(pgres);
869  HeapTuple tuple;
870  AttInMetadata *attinmeta;
871  MemoryContext rowcontext;
872  MemoryContext oldcontext;
873 
874  /* Make sure we got expected number of fields. */
875  if (nfields != nRetTypes)
876  ereport(ERROR,
877  (errmsg("invalid query response"),
878  errdetail("Expected %d fields, got %d fields.",
879  nRetTypes, nfields)));
880 
881  walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
882 
883  /* Create tuple descriptor corresponding to expected result. */
884  walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
885  for (coln = 0; coln < nRetTypes; coln++)
886  TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
887  PQfname(pgres, coln), retTypes[coln], -1, 0);
888  attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
889 
890  /* No point in doing more here if there were no tuples returned. */
891  if (PQntuples(pgres) == 0)
892  return;
893 
894  /* Create temporary context for local allocations. */
896  "libpqrcv query result context",
898 
899  /* Process returned rows. */
900  for (tupn = 0; tupn < PQntuples(pgres); tupn++)
901  {
902  char *cstrs[MaxTupleAttributeNumber];
903 
905 
906  /* Do the allocations in temporary context. */
907  oldcontext = MemoryContextSwitchTo(rowcontext);
908 
909  /*
910  * Fill cstrs with null-terminated strings of column values.
911  */
912  for (coln = 0; coln < nfields; coln++)
913  {
914  if (PQgetisnull(pgres, tupn, coln))
915  cstrs[coln] = NULL;
916  else
917  cstrs[coln] = PQgetvalue(pgres, tupn, coln);
918  }
919 
920  /* Convert row to a tuple, and add it to the tuplestore */
921  tuple = BuildTupleFromCStrings(attinmeta, cstrs);
922  tuplestore_puttuple(walres->tuplestore, tuple);
923 
924  /* Clean up */
925  MemoryContextSwitchTo(oldcontext);
926  MemoryContextReset(rowcontext);
927  }
928 
929  MemoryContextDelete(rowcontext);
930 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2778
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:169
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3164
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
char * PQfname(const PGresult *res, int field_num)
Definition: fe-exec.c:2856
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2770
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2112
TupleDesc tupledesc
Definition: walreceiver.h:198
#define ERROR
Definition: elog.h:43
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:191
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:730
int errdetail(const char *fmt,...)
Definition: elog.c:860
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:603
#define ereport(elevel, rest)
Definition: elog.h:141
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2063
int work_mem
Definition: globals.c:121
Tuplestorestate * tuplestore
Definition: walreceiver.h:197
int errmsg(const char *fmt,...)
Definition: elog.c:784
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3189
int16 AttrNumber
Definition: attnum.h:21

◆ libpqrcv_readtimelinehistoryfile()

static void libpqrcv_readtimelinehistoryfile ( WalReceiverConn conn,
TimeLineID  tli,
char **  filename,
char **  content,
int *  len 
)
static

Definition at line 528 of file libpqwalreceiver.c.

References Assert, ereport, errdetail(), errmsg(), ERROR, libpqrcv_PQexec(), WalReceiverConn::logical, palloc(), pchomp(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetlength(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), snprintf, and WalReceiverConn::streamConn.

531 {
532  PGresult *res;
533  char cmd[64];
534 
535  Assert(!conn->logical);
536 
537  /*
538  * Request the primary to send over the history file for given timeline.
539  */
540  snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
541  res = libpqrcv_PQexec(conn->streamConn, cmd);
542  if (PQresultStatus(res) != PGRES_TUPLES_OK)
543  {
544  PQclear(res);
545  ereport(ERROR,
546  (errmsg("could not receive timeline history file from "
547  "the primary server: %s",
548  pchomp(PQerrorMessage(conn->streamConn)))));
549  }
550  if (PQnfields(res) != 2 || PQntuples(res) != 1)
551  {
552  int ntuples = PQntuples(res);
553  int nfields = PQnfields(res);
554 
555  PQclear(res);
556  ereport(ERROR,
557  (errmsg("invalid response from primary server"),
558  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
559  ntuples, nfields)));
560  }
561  *filename = pstrdup(PQgetvalue(res, 0, 0));
562 
563  *len = PQgetlength(res, 0, 1);
564  *content = palloc(*len);
565  memcpy(*content, PQgetvalue(res, 0, 1), *len);
566  PQclear(res);
567 }
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3175
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2778
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6578
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3164
char * pstrdup(const char *in)
Definition: mcxt.c:1161
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2770
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2693
char * pchomp(const char *in)
Definition: mcxt.c:1189
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:860
#define ereport(elevel, rest)
Definition: elog.h:141
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
void PQclear(PGresult *res)
Definition: fe-exec.c:695
#define Assert(condition)
Definition: c.h:732
static char * filename
Definition: pg_dumpall.c:91
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define snprintf
Definition: port.h:192

◆ libpqrcv_receive()

static int libpqrcv_receive ( WalReceiverConn conn,
char **  buffer,
pgsocket wait_fd 
)
static

Definition at line 702 of file libpqwalreceiver.c.

References CONNECTION_BAD, ereport, errmsg(), ERROR, libpqrcv_PQgetResult(), pchomp(), PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear(), PQconsumeInput(), PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQresultStatus(), PQsocket(), PQstatus(), WalReceiverConn::recvBuf, and WalReceiverConn::streamConn.

704 {
705  int rawlen;
706 
707  if (conn->recvBuf != NULL)
708  PQfreemem(conn->recvBuf);
709  conn->recvBuf = NULL;
710 
711  /* Try to receive a CopyData message */
712  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
713  if (rawlen == 0)
714  {
715  /* Try consuming some data. */
716  if (PQconsumeInput(conn->streamConn) == 0)
717  ereport(ERROR,
718  (errmsg("could not receive data from WAL stream: %s",
719  pchomp(PQerrorMessage(conn->streamConn)))));
720 
721  /* Now that we've consumed some input, try again */
722  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
723  if (rawlen == 0)
724  {
725  /* Tell caller to try again when our socket is ready. */
726  *wait_fd = PQsocket(conn->streamConn);
727  return 0;
728  }
729  }
730  if (rawlen == -1) /* end-of-streaming or error */
731  {
732  PGresult *res;
733 
734  res = libpqrcv_PQgetResult(conn->streamConn);
735  if (PQresultStatus(res) == PGRES_COMMAND_OK)
736  {
737  PQclear(res);
738 
739  /* Verify that there are no more results. */
740  res = libpqrcv_PQgetResult(conn->streamConn);
741  if (res != NULL)
742  {
743  PQclear(res);
744 
745  /*
746  * If the other side closed the connection orderly (otherwise
747  * we'd seen an error, or PGRES_COPY_IN) don't report an error
748  * here, but let callers deal with it.
749  */
750  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
751  return -1;
752 
753  ereport(ERROR,
754  (errmsg("unexpected result after CommandComplete: %s",
755  PQerrorMessage(conn->streamConn))));
756  }
757 
758  return -1;
759  }
760  else if (PQresultStatus(res) == PGRES_COPY_IN)
761  {
762  PQclear(res);
763  return -1;
764  }
765  else
766  {
767  PQclear(res);
768  ereport(ERROR,
769  (errmsg("could not receive data from WAL stream: %s",
770  pchomp(PQerrorMessage(conn->streamConn)))));
771  }
772  }
773  if (rawlen < -1)
774  ereport(ERROR,
775  (errmsg("could not receive data from WAL stream: %s",
776  pchomp(PQerrorMessage(conn->streamConn)))));
777 
778  /* Return received messages to caller */
779  *buffer = conn->recvBuf;
780  return rawlen;
781 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6578
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2693
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2474
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
char * pchomp(const char *in)
Definition: mcxt.c:1189
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1705
void PQclear(PGresult *res)
Definition: fe-exec.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:784
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6525
void PQfreemem(void *ptr)
Definition: fe-exec.c:3297
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6596

◆ libpqrcv_send()

static void libpqrcv_send ( WalReceiverConn conn,
const char *  buffer,
int  nbytes 
)
static

Definition at line 789 of file libpqwalreceiver.c.

References ereport, errmsg(), ERROR, pchomp(), PQerrorMessage(), PQflush(), PQputCopyData(), and WalReceiverConn::streamConn.

790 {
791  if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
792  PQflush(conn->streamConn))
793  ereport(ERROR,
794  (errmsg("could not send data to WAL stream: %s",
795  pchomp(PQerrorMessage(conn->streamConn)))));
796 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2318
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6578
char * pchomp(const char *in)
Definition: mcxt.c:1189
#define ERROR
Definition: elog.h:43
int PQflush(PGconn *conn)
Definition: fe-exec.c:3284
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ libpqrcv_server_version()

static int libpqrcv_server_version ( WalReceiverConn conn)
static

Definition at line 354 of file libpqwalreceiver.c.

References PQserverVersion(), and WalReceiverConn::streamConn.

355 {
356  return PQserverVersion(conn->streamConn);
357 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6568

◆ libpqrcv_startstreaming()

static bool libpqrcv_startstreaming ( WalReceiverConn conn,
const WalRcvStreamOptions options 
)
static

Definition at line 370 of file libpqwalreceiver.c.

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), Assert, StringInfoData::data, ereport, errmsg(), ERROR, initStringInfo(), libpqrcv_PQexec(), WalReceiverConn::logical, WalRcvStreamOptions::logical, pchomp(), pfree(), PGRES_COMMAND_OK, PGRES_COPY_BOTH, WalRcvStreamOptions::physical, PQclear(), PQerrorMessage(), PQescapeLiteral(), PQfreemem(), PQresultStatus(), WalRcvStreamOptions::proto, WalRcvStreamOptions::slotname, WalRcvStreamOptions::startpoint, WalReceiverConn::streamConn, and stringlist_to_identifierstr().

372 {
373  StringInfoData cmd;
374  PGresult *res;
375 
376  Assert(options->logical == conn->logical);
377  Assert(options->slotname || !options->logical);
378 
379  initStringInfo(&cmd);
380 
381  /* Build the command. */
382  appendStringInfoString(&cmd, "START_REPLICATION");
383  if (options->slotname != NULL)
384  appendStringInfo(&cmd, " SLOT \"%s\"",
385  options->slotname);
386 
387  if (options->logical)
388  appendStringInfoString(&cmd, " LOGICAL");
389 
390  appendStringInfo(&cmd, " %X/%X",
391  (uint32) (options->startpoint >> 32),
392  (uint32) options->startpoint);
393 
394  /*
395  * Additional options are different depending on if we are doing logical
396  * or physical replication.
397  */
398  if (options->logical)
399  {
400  char *pubnames_str;
401  List *pubnames;
402  char *pubnames_literal;
403 
404  appendStringInfoString(&cmd, " (");
405 
406  appendStringInfo(&cmd, "proto_version '%u'",
407  options->proto.logical.proto_version);
408 
409  pubnames = options->proto.logical.publication_names;
410  pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
411  if (!pubnames_str)
412  ereport(ERROR,
413  (errmsg("could not start WAL streaming: %s",
414  pchomp(PQerrorMessage(conn->streamConn)))));
415  pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
416  strlen(pubnames_str));
417  if (!pubnames_literal)
418  ereport(ERROR,
419  (errmsg("could not start WAL streaming: %s",
420  pchomp(PQerrorMessage(conn->streamConn)))));
421  appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
422  PQfreemem(pubnames_literal);
423  pfree(pubnames_str);
424 
425  appendStringInfoChar(&cmd, ')');
426  }
427  else
428  appendStringInfo(&cmd, " TIMELINE %u",
429  options->proto.physical.startpointTLI);
430 
431  /* Start streaming. */
432  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
433  pfree(cmd.data);
434 
435  if (PQresultStatus(res) == PGRES_COMMAND_OK)
436  {
437  PQclear(res);
438  return false;
439  }
440  else if (PQresultStatus(res) != PGRES_COPY_BOTH)
441  {
442  PQclear(res);
443  ereport(ERROR,
444  (errmsg("could not start WAL streaming: %s",
445  pchomp(PQerrorMessage(conn->streamConn)))));
446  }
447  PQclear(res);
448  return true;
449 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6578
union WalRcvStreamOptions::@106 proto
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2693
char * pchomp(const char *in)
Definition: mcxt.c:1189
void pfree(void *pointer)
Definition: mcxt.c:1031
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
#define ERROR
Definition: elog.h:43
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:163
XLogRecPtr startpoint
Definition: walreceiver.h:153
unsigned int uint32
Definition: c.h:358
struct WalRcvStreamOptions::@106::@107 physical
#define ereport(elevel, rest)
Definition: elog.h:141
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:175
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:3565
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
void PQclear(PGresult *res)
Definition: fe-exec.c:695
#define Assert(condition)
Definition: c.h:732
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
int errmsg(const char *fmt,...)
Definition: elog.c:784
Definition: pg_list.h:50
void PQfreemem(void *ptr)
Definition: fe-exec.c:3297

◆ stringlist_to_identifierstr()

static char * stringlist_to_identifierstr ( PGconn conn,
List strings 
)
static

Definition at line 1003 of file libpqwalreceiver.c.

References appendStringInfoChar(), appendStringInfoString(), StringInfoData::data, free, initStringInfo(), lfirst, PQescapeIdentifier(), PQfreemem(), strVal, and val.

Referenced by libpqrcv_startstreaming().

1004 {
1005  ListCell *lc;
1006  StringInfoData res;
1007  bool first = true;
1008 
1009  initStringInfo(&res);
1010 
1011  foreach(lc, strings)
1012  {
1013  char *val = strVal(lfirst(lc));
1014  char *val_escaped;
1015 
1016  if (first)
1017  first = false;
1018  else
1019  appendStringInfoChar(&res, ',');
1020 
1021  val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1022  if (!val_escaped)
1023  {
1024  free(res.data);
1025  return NULL;
1026  }
1027  appendStringInfoString(&res, val_escaped);
1028  PQfreemem(val_escaped);
1029  }
1030 
1031  return res.data;
1032 }
#define strVal(v)
Definition: value.h:54
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:3571
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:163
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:175
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define free(a)
Definition: header.h:65
#define lfirst(lc)
Definition: pg_list.h:190
void PQfreemem(void *ptr)
Definition: fe-exec.c:3297
long val
Definition: informix.c:684

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 36 of file libpqwalreceiver.c.

◆ PQWalReceiverFunctions

WalReceiverFunctionsType PQWalReceiverFunctions
static
Initial value:
= {
}
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static void libpqrcv_check_conninfo(const char *conninfo)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static int libpqrcv_server_version(WalReceiverConn *conn)
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err)
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)

Definition at line 83 of file libpqwalreceiver.c.

Referenced by _PG_init().