PostgreSQL Source Code  git master
libpqwalreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/time.h>
#include "access/xlog.h"
#include "catalog/pg_type.h"
#include "common/connect.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "pqexpbuffer.h"
#include "replication/walreceiver.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/tuplestore.h"
Include dependency graph for libpqwalreceiver.c:

Go to the source code of this file.

Data Structures

struct  WalReceiverConn
 

Functions

void _PG_init (void)
 
static WalReceiverConnlibpqrcv_connect (const char *conninfo, bool logical, const char *appname, char **err)
 
static void libpqrcv_check_conninfo (const char *conninfo)
 
static char * libpqrcv_get_conninfo (WalReceiverConn *conn)
 
static void libpqrcv_get_senderinfo (WalReceiverConn *conn, char **sender_host, int *sender_port)
 
static char * libpqrcv_identify_system (WalReceiverConn *conn, TimeLineID *primary_tli)
 
static int libpqrcv_server_version (WalReceiverConn *conn)
 
static void libpqrcv_readtimelinehistoryfile (WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
 
static bool libpqrcv_startstreaming (WalReceiverConn *conn, const WalRcvStreamOptions *options)
 
static void libpqrcv_endstreaming (WalReceiverConn *conn, TimeLineID *next_tli)
 
static int libpqrcv_receive (WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
 
static void libpqrcv_send (WalReceiverConn *conn, const char *buffer, int nbytes)
 
static char * libpqrcv_create_slot (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
 
static pid_t libpqrcv_get_backend_pid (WalReceiverConn *conn)
 
static WalRcvExecResultlibpqrcv_exec (WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
 
static void libpqrcv_disconnect (WalReceiverConn *conn)
 
static PGresultlibpqrcv_PQexec (PGconn *streamConn, const char *query)
 
static PGresultlibpqrcv_PQgetResult (PGconn *streamConn)
 
static char * stringlist_to_identifierstr (PGconn *conn, List *strings)
 
static void libpqrcv_processTuples (PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
 

Variables

 PG_MODULE_MAGIC
 
static WalReceiverFunctionsType PQWalReceiverFunctions
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 113 of file libpqwalreceiver.c.

114 {
115  if (WalReceiverFunctions != NULL)
116  elog(ERROR, "libpqwalreceiver already loaded");
118 }
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
static WalReceiverFunctionsType PQWalReceiverFunctions
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:96

References elog, ERROR, PQWalReceiverFunctions, and WalReceiverFunctions.

◆ libpqrcv_check_conninfo()

static void libpqrcv_check_conninfo ( const char *  conninfo)
static

Definition at line 255 of file libpqwalreceiver.c.

256 {
257  PQconninfoOption *opts = NULL;
258  char *err = NULL;
259 
260  opts = PQconninfoParse(conninfo, &err);
261  if (opts == NULL)
262  {
263  /* The error string is malloc'd, so we must free it explicitly */
264  char *errcopy = err ? pstrdup(err) : "out of memory";
265 
266  PQfreemem(err);
267  ereport(ERROR,
268  (errcode(ERRCODE_SYNTAX_ERROR),
269  errmsg("invalid connection string syntax: %s", errcopy)));
270  }
271 
273 }
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ereport(elevel,...)
Definition: elog.h:143
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5501
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:6736
void PQfreemem(void *ptr)
Definition: fe-exec.c:3891
char * pstrdup(const char *in)
Definition: mcxt.c:1305
static AmcheckOptions opts
Definition: pg_amcheck.c:110

References ereport, errcode(), errmsg(), ERROR, opts, PQconninfoFree(), PQconninfoParse(), PQfreemem(), and pstrdup().

◆ libpqrcv_connect()

static WalReceiverConn * libpqrcv_connect ( const char *  conninfo,
bool  logical,
const char *  appname,
char **  err 
)
static

Definition at line 126 of file libpqwalreceiver.c.

128 {
131  const char *keys[6];
132  const char *vals[6];
133  int i = 0;
134 
135  /*
136  * We use the expand_dbname parameter to process the connection string (or
137  * URI), and pass some extra options.
138  */
139  keys[i] = "dbname";
140  vals[i] = conninfo;
141  keys[++i] = "replication";
142  vals[i] = logical ? "database" : "true";
143  if (!logical)
144  {
145  /*
146  * The database name is ignored by the server in replication mode, but
147  * specify "replication" for .pgpass lookup.
148  */
149  keys[++i] = "dbname";
150  vals[i] = "replication";
151  }
152  keys[++i] = "fallback_application_name";
153  vals[i] = appname;
154  if (logical)
155  {
156  /* Tell the publisher to translate to our encoding */
157  keys[++i] = "client_encoding";
158  vals[i] = GetDatabaseEncodingName();
159 
160  /*
161  * Force assorted GUC parameters to settings that ensure that the
162  * publisher will output data values in a form that is unambiguous to
163  * the subscriber. (We don't want to modify the subscriber's GUC
164  * settings, since that might surprise user-defined code running in
165  * the subscriber, such as triggers.) This should match what pg_dump
166  * does.
167  */
168  keys[++i] = "options";
169  vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
170  }
171  keys[++i] = NULL;
172  vals[i] = NULL;
173 
174  Assert(i < sizeof(keys));
175 
176  conn = palloc0(sizeof(WalReceiverConn));
177  conn->streamConn = PQconnectStartParams(keys, vals,
178  /* expand_dbname = */ true);
179  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
180  {
181  *err = pchomp(PQerrorMessage(conn->streamConn));
182  return NULL;
183  }
184 
185  /*
186  * Poll connection until we have OK or FAILED status.
187  *
188  * Per spec for PQconnectPoll, first wait till socket is write-ready.
189  */
191  do
192  {
193  int io_flag;
194  int rc;
195 
197  io_flag = WL_SOCKET_READABLE;
198 #ifdef WIN32
199  /* Windows needs a different test while waiting for connection-made */
200  else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
201  io_flag = WL_SOCKET_CONNECTED;
202 #endif
203  else
204  io_flag = WL_SOCKET_WRITEABLE;
205 
207  WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
208  PQsocket(conn->streamConn),
209  0,
211 
212  /* Interrupted? */
213  if (rc & WL_LATCH_SET)
214  {
217  }
218 
219  /* If socket is ready, advance the libpq state machine */
220  if (rc & io_flag)
221  status = PQconnectPoll(conn->streamConn);
223 
224  if (PQstatus(conn->streamConn) != CONNECTION_OK)
225  {
226  *err = pchomp(PQerrorMessage(conn->streamConn));
227  return NULL;
228  }
229 
230  if (logical)
231  {
232  PGresult *res;
233 
234  res = libpqrcv_PQexec(conn->streamConn,
237  {
238  PQclear(res);
239  ereport(ERROR,
240  (errmsg("could not clear search path: %s",
241  pchomp(PQerrorMessage(conn->streamConn)))));
242  }
243  PQclear(res);
244  }
245 
246  conn->logical = logical;
247 
248  return conn;
249 }
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:760
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2262
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6908
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6855
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6934
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3270
void PQclear(PGresult *res)
Definition: fe-exec.c:718
struct Latch * MyLatch
Definition: globals.c:58
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:499
void ResetLatch(Latch *latch)
Definition: latch.c:658
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define WL_SOCKET_CONNECTED
Definition: latch.h:135
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
@ CONNECTION_STARTED
Definition: libpq-fe.h:68
@ CONNECTION_BAD
Definition: libpq-fe.h:61
@ CONNECTION_OK
Definition: libpq-fe.h:60
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:100
PostgresPollingStatusType
Definition: libpq-fe.h:85
@ PGRES_POLLING_OK
Definition: libpq-fe.h:89
@ PGRES_POLLING_READING
Definition: libpq-fe.h:87
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:88
@ PGRES_POLLING_FAILED
Definition: libpq-fe.h:86
Assert(fmt[strlen(fmt) - 1] !='\n')
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1216
char * pchomp(const char *in)
Definition: mcxt.c:1333
void * palloc0(Size size)
Definition: mcxt.c:1099
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229
PGconn * conn
Definition: streamutil.c:54
@ WAIT_EVENT_LIBPQWALRECEIVER_CONNECT
Definition: wait_event.h:65
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:150

References ALWAYS_SECURE_SEARCH_PATH_SQL, Assert(), conn, CONNECTION_BAD, CONNECTION_OK, CONNECTION_STARTED, ereport, errmsg(), ERROR, GetDatabaseEncodingName(), i, libpqrcv_PQexec(), MyLatch, palloc0(), pchomp(), PGRES_POLLING_FAILED, PGRES_POLLING_OK, PGRES_POLLING_READING, PGRES_POLLING_WRITING, PGRES_TUPLES_OK, PQclear(), PQconnectPoll(), PQconnectStartParams(), PQerrorMessage(), PQresultStatus(), PQsocket(), PQstatus(), ProcessWalRcvInterrupts(), res, ResetLatch(), status(), WAIT_EVENT_LIBPQWALRECEIVER_CONNECT, WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_CONNECTED, WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

◆ libpqrcv_create_slot()

static char * libpqrcv_create_slot ( WalReceiverConn conn,
const char *  slotname,
bool  temporary,
bool  two_phase,
CRSSnapshotAction  snapshot_action,
XLogRecPtr lsn 
)
static

Definition at line 870 of file libpqwalreceiver.c.

873 {
874  PGresult *res;
875  StringInfoData cmd;
876  char *snapshot;
877  int use_new_options_syntax;
878 
879  use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
880 
881  initStringInfo(&cmd);
882 
883  appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
884 
885  if (temporary)
886  appendStringInfoString(&cmd, " TEMPORARY");
887 
888  if (conn->logical)
889  {
890  appendStringInfoString(&cmd, " LOGICAL pgoutput ");
891  if (use_new_options_syntax)
892  appendStringInfoChar(&cmd, '(');
893  if (two_phase)
894  {
895  appendStringInfoString(&cmd, "TWO_PHASE");
896  if (use_new_options_syntax)
897  appendStringInfoString(&cmd, ", ");
898  else
899  appendStringInfoChar(&cmd, ' ');
900  }
901 
902  if (use_new_options_syntax)
903  {
904  switch (snapshot_action)
905  {
906  case CRS_EXPORT_SNAPSHOT:
907  appendStringInfoString(&cmd, "SNAPSHOT 'export'");
908  break;
910  appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
911  break;
912  case CRS_USE_SNAPSHOT:
913  appendStringInfoString(&cmd, "SNAPSHOT 'use'");
914  break;
915  }
916  }
917  else
918  {
919  switch (snapshot_action)
920  {
921  case CRS_EXPORT_SNAPSHOT:
922  appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
923  break;
925  appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
926  break;
927  case CRS_USE_SNAPSHOT:
928  appendStringInfoString(&cmd, "USE_SNAPSHOT");
929  break;
930  }
931  }
932 
933  if (use_new_options_syntax)
934  appendStringInfoChar(&cmd, ')');
935  }
936  else
937  {
938  if (use_new_options_syntax)
939  appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
940  else
941  appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
942  }
943 
944  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
945  pfree(cmd.data);
946 
948  {
949  PQclear(res);
950  ereport(ERROR,
951  (errcode(ERRCODE_PROTOCOL_VIOLATION),
952  errmsg("could not create replication slot \"%s\": %s",
953  slotname, pchomp(PQerrorMessage(conn->streamConn)))));
954  }
955 
956  if (lsn)
958  CStringGetDatum(PQgetvalue(res, 0, 1))));
959 
960  if (!PQgetisnull(res, 0, 2))
961  snapshot = pstrdup(PQgetvalue(res, 0, 2));
962  else
963  snapshot = NULL;
964 
965  PQclear(res);
966 
967  return snapshot;
968 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6898
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3735
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3760
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:777
void pfree(void *pointer)
Definition: mcxt.c:1175
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
#define DatumGetLSN(X)
Definition: pg_lsn.h:21
static bool two_phase
#define CStringGetDatum(X)
Definition: postgres.h:622
#define InvalidOid
Definition: postgres_ext.h:36
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), conn, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetDatum, StringInfoData::data, DatumGetLSN, DirectFunctionCall1Coll(), ereport, errcode(), errmsg(), ERROR, initStringInfo(), InvalidOid, libpqrcv_PQexec(), pchomp(), pfree(), pg_lsn_in(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetisnull(), PQgetvalue(), PQresultStatus(), PQserverVersion(), pstrdup(), res, and two_phase.

◆ libpqrcv_disconnect()

static void libpqrcv_disconnect ( WalReceiverConn conn)
static

Definition at line 738 of file libpqwalreceiver.c.

739 {
740  PQfinish(conn->streamConn);
741  if (conn->recvBuf != NULL)
742  PQfreemem(conn->recvBuf);
743  pfree(conn);
744 }
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4261

References conn, pfree(), PQfinish(), and PQfreemem().

◆ libpqrcv_endstreaming()

static void libpqrcv_endstreaming ( WalReceiverConn conn,
TimeLineID next_tli 
)
static

Definition at line 510 of file libpqwalreceiver.c.

511 {
512  PGresult *res;
513 
514  /*
515  * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
516  * block, but the risk seems small.
517  */
518  if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
519  PQflush(conn->streamConn))
520  ereport(ERROR,
521  (errcode(ERRCODE_CONNECTION_FAILURE),
522  errmsg("could not send end-of-streaming message to primary: %s",
523  pchomp(PQerrorMessage(conn->streamConn)))));
524 
525  *next_tli = 0;
526 
527  /*
528  * After COPY is finished, we should receive a result set indicating the
529  * next timeline's ID, or just CommandComplete if the server was shut
530  * down.
531  *
532  * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
533  * also possible in case we aborted the copy in mid-stream.
534  */
535  res = libpqrcv_PQgetResult(conn->streamConn);
537  {
538  /*
539  * Read the next timeline's ID. The server also sends the timeline's
540  * starting point, but it is ignored.
541  */
542  if (PQnfields(res) < 2 || PQntuples(res) != 1)
543  ereport(ERROR,
544  (errcode(ERRCODE_PROTOCOL_VIOLATION),
545  errmsg("unexpected result set after end-of-streaming")));
546  *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
547  PQclear(res);
548 
549  /* the result set should be followed by CommandComplete */
550  res = libpqrcv_PQgetResult(conn->streamConn);
551  }
552  else if (PQresultStatus(res) == PGRES_COPY_OUT)
553  {
554  PQclear(res);
555 
556  /* End the copy */
557  if (PQendcopy(conn->streamConn))
558  ereport(ERROR,
559  (errcode(ERRCODE_CONNECTION_FAILURE),
560  errmsg("error while shutting down streaming COPY: %s",
561  pchomp(PQerrorMessage(conn->streamConn)))));
562 
563  /* CommandComplete should follow */
564  res = libpqrcv_PQgetResult(conn->streamConn);
565  }
566 
568  ereport(ERROR,
569  (errcode(ERRCODE_PROTOCOL_VIOLATION),
570  errmsg("error reading result of streaming command: %s",
571  pchomp(PQerrorMessage(conn->streamConn)))));
572  PQclear(res);
573 
574  /* Verify that there are no more results */
575  res = libpqrcv_PQgetResult(conn->streamConn);
576  if (res != NULL)
577  ereport(ERROR,
578  (errcode(ERRCODE_PROTOCOL_VIOLATION),
579  errmsg("unexpected result after CommandComplete: %s",
580  pchomp(PQerrorMessage(conn->streamConn)))));
581 }
int PQflush(PGconn *conn)
Definition: fe-exec.c:3861
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2885
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2683
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3340
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3348
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
@ PGRES_COPY_OUT
Definition: libpq-fe.h:103
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
int32 pg_strtoint32(const char *s)
Definition: numutils.c:175

References conn, ereport, errcode(), errmsg(), ERROR, libpqrcv_PQgetResult(), pchomp(), pg_strtoint32(), PGRES_COMMAND_OK, PGRES_COPY_OUT, PGRES_TUPLES_OK, PQclear(), PQendcopy(), PQerrorMessage(), PQflush(), PQgetvalue(), PQnfields(), PQntuples(), PQputCopyEnd(), PQresultStatus(), and res.

◆ libpqrcv_exec()

static WalRcvExecResult * libpqrcv_exec ( WalReceiverConn conn,
const char *  query,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 1059 of file libpqwalreceiver.c.

1061 {
1062  PGresult *pgres = NULL;
1063  WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1064  char *diag_sqlstate;
1065 
1066  if (MyDatabaseId == InvalidOid)
1067  ereport(ERROR,
1068  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1069  errmsg("the query interface requires a database connection")));
1070 
1071  pgres = libpqrcv_PQexec(conn->streamConn, query);
1072 
1073  switch (PQresultStatus(pgres))
1074  {
1075  case PGRES_SINGLE_TUPLE:
1076  case PGRES_TUPLES_OK:
1077  walres->status = WALRCV_OK_TUPLES;
1078  libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1079  break;
1080 
1081  case PGRES_COPY_IN:
1082  walres->status = WALRCV_OK_COPY_IN;
1083  break;
1084 
1085  case PGRES_COPY_OUT:
1086  walres->status = WALRCV_OK_COPY_OUT;
1087  break;
1088 
1089  case PGRES_COPY_BOTH:
1090  walres->status = WALRCV_OK_COPY_BOTH;
1091  break;
1092 
1093  case PGRES_COMMAND_OK:
1094  walres->status = WALRCV_OK_COMMAND;
1095  break;
1096 
1097  /* Empty query is considered error. */
1098  case PGRES_EMPTY_QUERY:
1099  walres->status = WALRCV_ERROR;
1100  walres->err = _("empty query");
1101  break;
1102 
1103  case PGRES_PIPELINE_SYNC:
1105  walres->status = WALRCV_ERROR;
1106  walres->err = _("unexpected pipeline mode");
1107  break;
1108 
1109  case PGRES_NONFATAL_ERROR:
1110  case PGRES_FATAL_ERROR:
1111  case PGRES_BAD_RESPONSE:
1112  walres->status = WALRCV_ERROR;
1113  walres->err = pchomp(PQerrorMessage(conn->streamConn));
1114  diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1115  if (diag_sqlstate)
1116  walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1117  diag_sqlstate[1],
1118  diag_sqlstate[2],
1119  diag_sqlstate[3],
1120  diag_sqlstate[4]);
1121  break;
1122  }
1123 
1124  PQclear(pgres);
1125 
1126  return walres;
1127 }
#define _(x)
Definition: elog.c:89
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:50
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3325
Oid MyDatabaseId
Definition: globals.c:89
@ PGRES_COPY_IN
Definition: libpq-fe.h:104
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:109
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:108
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:110
@ PGRES_EMPTY_QUERY
Definition: libpq-fe.h:96
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:111
@ PGRES_BAD_RESPONSE
Definition: libpq-fe.h:105
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:112
@ PGRES_NONFATAL_ERROR
Definition: libpq-fe.h:107
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
WalRcvExecStatus status
Definition: walreceiver.h:216
@ WALRCV_OK_COPY_IN
Definition: walreceiver.h:204
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:201
@ WALRCV_ERROR
Definition: walreceiver.h:200
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:203
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:205
@ WALRCV_OK_COPY_BOTH
Definition: walreceiver.h:206

References _, conn, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, InvalidOid, libpqrcv_PQexec(), libpqrcv_processTuples(), MAKE_SQLSTATE, MyDatabaseId, palloc0(), pchomp(), PG_DIAG_SQLSTATE, PGRES_BAD_RESPONSE, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PGRES_EMPTY_QUERY, PGRES_FATAL_ERROR, PGRES_NONFATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQresultErrorField(), PQresultStatus(), WalRcvExecResult::sqlstate, WalRcvExecResult::status, WALRCV_ERROR, WALRCV_OK_COMMAND, WALRCV_OK_COPY_BOTH, WALRCV_OK_COPY_IN, WALRCV_OK_COPY_OUT, and WALRCV_OK_TUPLES.

◆ libpqrcv_get_backend_pid()

static pid_t libpqrcv_get_backend_pid ( WalReceiverConn conn)
static

Definition at line 974 of file libpqwalreceiver.c.

975 {
976  return PQbackendPID(conn->streamConn);
977 }
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:6942

References conn, and PQbackendPID().

◆ libpqrcv_get_conninfo()

static char * libpqrcv_get_conninfo ( WalReceiverConn conn)
static

Definition at line 280 of file libpqwalreceiver.c.

281 {
282  PQconninfoOption *conn_opts;
283  PQconninfoOption *conn_opt;
285  char *retval;
286 
287  Assert(conn->streamConn != NULL);
288 
290  conn_opts = PQconninfo(conn->streamConn);
291 
292  if (conn_opts == NULL)
293  ereport(ERROR,
294  (errcode(ERRCODE_OUT_OF_MEMORY),
295  errmsg("could not parse connection string: %s",
296  _("out of memory"))));
297 
298  /* build a clean connection string from pieces */
299  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
300  {
301  bool obfuscate;
302 
303  /* Skip debug and empty options */
304  if (strchr(conn_opt->dispchar, 'D') ||
305  conn_opt->val == NULL ||
306  conn_opt->val[0] == '\0')
307  continue;
308 
309  /* Obfuscate security-sensitive options */
310  obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
311 
312  appendPQExpBuffer(&buf, "%s%s=%s",
313  buf.len == 0 ? "" : " ",
314  conn_opt->keyword,
315  obfuscate ? "********" : conn_opt->val);
316  }
317 
318  PQconninfoFree(conn_opts);
319 
320  retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
322  return retval;
323 }
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:6692
static char * buf
Definition: pg_test_fsync.c:67
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:92
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:267
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:131
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67

References _, appendPQExpBuffer(), Assert(), buf, conn, _PQconninfoOption::dispchar, ereport, errcode(), errmsg(), ERROR, initPQExpBuffer(), _PQconninfoOption::keyword, PQconninfo(), PQconninfoFree(), PQExpBufferDataBroken, pstrdup(), termPQExpBuffer(), and _PQconninfoOption::val.

◆ libpqrcv_get_senderinfo()

static void libpqrcv_get_senderinfo ( WalReceiverConn conn,
char **  sender_host,
int *  sender_port 
)
static

Definition at line 329 of file libpqwalreceiver.c.

331 {
332  char *ret = NULL;
333 
334  *sender_host = NULL;
335  *sender_port = 0;
336 
337  Assert(conn->streamConn != NULL);
338 
339  ret = PQhost(conn->streamConn);
340  if (ret && strlen(ret) != 0)
341  *sender_host = pstrdup(ret);
342 
343  ret = PQport(conn->streamConn);
344  if (ret && strlen(ret) != 0)
345  *sender_port = atoi(ret);
346 }
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:6787
char * PQport(const PGconn *conn)
Definition: fe-connect.c:6823

References Assert(), conn, PQhost(), PQport(), and pstrdup().

◆ libpqrcv_identify_system()

static char * libpqrcv_identify_system ( WalReceiverConn conn,
TimeLineID primary_tli 
)
static

Definition at line 353 of file libpqwalreceiver.c.

354 {
355  PGresult *res;
356  char *primary_sysid;
357 
358  /*
359  * Get the system identifier and timeline ID as a DataRow message from the
360  * primary server.
361  */
362  res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
364  {
365  PQclear(res);
366  ereport(ERROR,
367  (errcode(ERRCODE_PROTOCOL_VIOLATION),
368  errmsg("could not receive database system identifier and timeline ID from "
369  "the primary server: %s",
370  pchomp(PQerrorMessage(conn->streamConn)))));
371  }
372  if (PQnfields(res) < 3 || PQntuples(res) != 1)
373  {
374  int ntuples = PQntuples(res);
375  int nfields = PQnfields(res);
376 
377  PQclear(res);
378  ereport(ERROR,
379  (errcode(ERRCODE_PROTOCOL_VIOLATION),
380  errmsg("invalid response from primary server"),
381  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
382  ntuples, nfields, 3, 1)));
383  }
384  primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
385  *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
386  PQclear(res);
387 
388  return primary_sysid;
389 }
int errdetail(const char *fmt,...)
Definition: elog.c:1037

References conn, ereport, errcode(), errdetail(), errmsg(), ERROR, libpqrcv_PQexec(), pchomp(), pg_strtoint32(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), and res.

◆ libpqrcv_PQexec()

static PGresult * libpqrcv_PQexec ( PGconn streamConn,
const char *  query 
)
static

Definition at line 644 of file libpqwalreceiver.c.

645 {
646  PGresult *lastResult = NULL;
647 
648  /*
649  * PQexec() silently discards any prior query results on the connection.
650  * This is not required for this function as it's expected that the caller
651  * (which is this library in all cases) will behave correctly and we don't
652  * have to be backwards compatible with old libpq.
653  */
654 
655  /*
656  * Submit the query. Since we don't use non-blocking mode, this could
657  * theoretically block. In practice, since we don't send very long query
658  * strings, the risk seems negligible.
659  */
660  if (!PQsendQuery(streamConn, query))
661  return NULL;
662 
663  for (;;)
664  {
665  /* Wait for, and collect, the next PGresult. */
666  PGresult *result;
667 
668  result = libpqrcv_PQgetResult(streamConn);
669  if (result == NULL)
670  break; /* query is complete, or failure */
671 
672  /*
673  * Emulate PQexec()'s behavior of returning the last result when there
674  * are many. We are fine with returning just last error message.
675  */
676  PQclear(lastResult);
677  lastResult = result;
678 
679  if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
680  PQresultStatus(lastResult) == PGRES_COPY_OUT ||
681  PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
682  PQstatus(streamConn) == CONNECTION_BAD)
683  break;
684  }
685 
686  return lastResult;
687 }
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1424

References CONNECTION_BAD, libpqrcv_PQgetResult(), PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PQclear(), PQresultStatus(), PQsendQuery(), and PQstatus().

Referenced by libpqrcv_connect(), libpqrcv_create_slot(), libpqrcv_exec(), libpqrcv_identify_system(), libpqrcv_readtimelinehistoryfile(), and libpqrcv_startstreaming().

◆ libpqrcv_PQgetResult()

static PGresult * libpqrcv_PQgetResult ( PGconn streamConn)
static

Definition at line 693 of file libpqwalreceiver.c.

694 {
695  /*
696  * Collect data until PQgetResult is ready to get the result without
697  * blocking.
698  */
699  while (PQisBusy(streamConn))
700  {
701  int rc;
702 
703  /*
704  * We don't need to break down the sleep into smaller increments,
705  * since we'll get interrupted by signals and can handle any
706  * interrupts here.
707  */
710  WL_LATCH_SET,
711  PQsocket(streamConn),
712  0,
714 
715  /* Interrupted? */
716  if (rc & WL_LATCH_SET)
717  {
720  }
721 
722  /* Consume whatever data is available from the socket */
723  if (PQconsumeInput(streamConn) == 0)
724  {
725  /* trouble; return NULL */
726  return NULL;
727  }
728  }
729 
730  /* Now we can collect and return the next PGresult */
731  return PQgetResult(streamConn);
732 }
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:2004
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2051
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2082
@ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE
Definition: wait_event.h:66

References MyLatch, PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ProcessWalRcvInterrupts(), ResetLatch(), WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE, WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by libpqrcv_endstreaming(), libpqrcv_PQexec(), and libpqrcv_receive().

◆ libpqrcv_processTuples()

static void libpqrcv_processTuples ( PGresult pgres,
WalRcvExecResult walres,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 983 of file libpqwalreceiver.c.

985 {
986  int tupn;
987  int coln;
988  int nfields = PQnfields(pgres);
989  HeapTuple tuple;
990  AttInMetadata *attinmeta;
991  MemoryContext rowcontext;
992  MemoryContext oldcontext;
993 
994  /* Make sure we got expected number of fields. */
995  if (nfields != nRetTypes)
996  ereport(ERROR,
997  (errcode(ERRCODE_PROTOCOL_VIOLATION),
998  errmsg("invalid query response"),
999  errdetail("Expected %d fields, got %d fields.",
1000  nRetTypes, nfields)));
1001 
1002  walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1003 
1004  /* Create tuple descriptor corresponding to expected result. */
1005  walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1006  for (coln = 0; coln < nRetTypes; coln++)
1007  TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1008  PQfname(pgres, coln), retTypes[coln], -1, 0);
1009  attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1010 
1011  /* No point in doing more here if there were no tuples returned. */
1012  if (PQntuples(pgres) == 0)
1013  return;
1014 
1015  /* Create temporary context for local allocations. */
1017  "libpqrcv query result context",
1019 
1020  /* Process returned rows. */
1021  for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1022  {
1023  char *cstrs[MaxTupleAttributeNumber];
1024 
1026 
1027  /* Do the allocations in temporary context. */
1028  oldcontext = MemoryContextSwitchTo(rowcontext);
1029 
1030  /*
1031  * Fill cstrs with null-terminated strings of column values.
1032  */
1033  for (coln = 0; coln < nfields; coln++)
1034  {
1035  if (PQgetisnull(pgres, tupn, coln))
1036  cstrs[coln] = NULL;
1037  else
1038  cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1039  }
1040 
1041  /* Convert row to a tuple, and add it to the tuplestore */
1042  tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1043  tuplestore_puttuple(walres->tuplestore, tuple);
1044 
1045  /* Clean up */
1046  MemoryContextSwitchTo(oldcontext);
1047  MemoryContextReset(rowcontext);
1048  }
1049 
1050  MemoryContextDelete(rowcontext);
1051 }
int16 AttrNumber
Definition: attnum.h:21
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2135
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2086
char * PQfname(const PGresult *res, int field_num)
Definition: fe-exec.c:3426
int work_mem
Definition: globals.c:125
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Tuplestorestate * tuplestore
Definition: walreceiver.h:219
TupleDesc tupledesc
Definition: walreceiver.h:220
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:583
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:730

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, BuildTupleFromCStrings(), CreateTemplateTupleDesc(), CurrentMemoryContext, ereport, errcode(), errdetail(), errmsg(), ERROR, MaxTupleAttributeNumber, MemoryContextDelete(), MemoryContextReset(), MemoryContextSwitchTo(), PQfname(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), ProcessWalRcvInterrupts(), WalRcvExecResult::tupledesc, TupleDescGetAttInMetadata(), TupleDescInitEntry(), WalRcvExecResult::tuplestore, tuplestore_begin_heap(), tuplestore_puttuple(), and work_mem.

Referenced by libpqrcv_exec().

◆ libpqrcv_readtimelinehistoryfile()

static void libpqrcv_readtimelinehistoryfile ( WalReceiverConn conn,
TimeLineID  tli,
char **  filename,
char **  content,
int *  len 
)
static

Definition at line 587 of file libpqwalreceiver.c.

590 {
591  PGresult *res;
592  char cmd[64];
593 
594  Assert(!conn->logical);
595 
596  /*
597  * Request the primary to send over the history file for given timeline.
598  */
599  snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
600  res = libpqrcv_PQexec(conn->streamConn, cmd);
602  {
603  PQclear(res);
604  ereport(ERROR,
605  (errcode(ERRCODE_PROTOCOL_VIOLATION),
606  errmsg("could not receive timeline history file from "
607  "the primary server: %s",
608  pchomp(PQerrorMessage(conn->streamConn)))));
609  }
610  if (PQnfields(res) != 2 || PQntuples(res) != 1)
611  {
612  int ntuples = PQntuples(res);
613  int nfields = PQnfields(res);
614 
615  PQclear(res);
616  ereport(ERROR,
617  (errcode(ERRCODE_PROTOCOL_VIOLATION),
618  errmsg("invalid response from primary server"),
619  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
620  ntuples, nfields)));
621  }
622  *filename = pstrdup(PQgetvalue(res, 0, 0));
623 
624  *len = PQgetlength(res, 0, 1);
625  *content = palloc(*len);
626  memcpy(*content, PQgetvalue(res, 0, 1), *len);
627  PQclear(res);
628 }
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3746
void * palloc(Size size)
Definition: mcxt.c:1068
const void size_t len
static char * filename
Definition: pg_dumpall.c:94
#define snprintf
Definition: port.h:225

References Assert(), conn, ereport, errcode(), errdetail(), errmsg(), ERROR, filename, len, libpqrcv_PQexec(), palloc(), pchomp(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetlength(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), res, and snprintf.

◆ libpqrcv_receive()

static int libpqrcv_receive ( WalReceiverConn conn,
char **  buffer,
pgsocket wait_fd 
)
static

Definition at line 763 of file libpqwalreceiver.c.

765 {
766  int rawlen;
767 
768  if (conn->recvBuf != NULL)
769  PQfreemem(conn->recvBuf);
770  conn->recvBuf = NULL;
771 
772  /* Try to receive a CopyData message */
773  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
774  if (rawlen == 0)
775  {
776  /* Try consuming some data. */
777  if (PQconsumeInput(conn->streamConn) == 0)
778  ereport(ERROR,
779  (errcode(ERRCODE_CONNECTION_FAILURE),
780  errmsg("could not receive data from WAL stream: %s",
781  pchomp(PQerrorMessage(conn->streamConn)))));
782 
783  /* Now that we've consumed some input, try again */
784  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
785  if (rawlen == 0)
786  {
787  /* Tell caller to try again when our socket is ready. */
788  *wait_fd = PQsocket(conn->streamConn);
789  return 0;
790  }
791  }
792  if (rawlen == -1) /* end-of-streaming or error */
793  {
794  PGresult *res;
795 
796  res = libpqrcv_PQgetResult(conn->streamConn);
798  {
799  PQclear(res);
800 
801  /* Verify that there are no more results. */
802  res = libpqrcv_PQgetResult(conn->streamConn);
803  if (res != NULL)
804  {
805  PQclear(res);
806 
807  /*
808  * If the other side closed the connection orderly (otherwise
809  * we'd seen an error, or PGRES_COPY_IN) don't report an error
810  * here, but let callers deal with it.
811  */
812  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
813  return -1;
814 
815  ereport(ERROR,
816  (errcode(ERRCODE_PROTOCOL_VIOLATION),
817  errmsg("unexpected result after CommandComplete: %s",
818  PQerrorMessage(conn->streamConn))));
819  }
820 
821  return -1;
822  }
823  else if (PQresultStatus(res) == PGRES_COPY_IN)
824  {
825  PQclear(res);
826  return -1;
827  }
828  else
829  {
830  PQclear(res);
831  ereport(ERROR,
832  (errcode(ERRCODE_PROTOCOL_VIOLATION),
833  errmsg("could not receive data from WAL stream: %s",
834  pchomp(PQerrorMessage(conn->streamConn)))));
835  }
836  }
837  if (rawlen < -1)
838  ereport(ERROR,
839  (errcode(ERRCODE_PROTOCOL_VIOLATION),
840  errmsg("could not receive data from WAL stream: %s",
841  pchomp(PQerrorMessage(conn->streamConn)))));
842 
843  /* Return received messages to caller */
844  *buffer = conn->recvBuf;
845  return rawlen;
846 }
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2751

References conn, CONNECTION_BAD, ereport, errcode(), errmsg(), ERROR, libpqrcv_PQgetResult(), pchomp(), PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear(), PQconsumeInput(), PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQresultStatus(), PQsocket(), PQstatus(), and res.

◆ libpqrcv_send()

static void libpqrcv_send ( WalReceiverConn conn,
const char *  buffer,
int  nbytes 
)
static

Definition at line 854 of file libpqwalreceiver.c.

855 {
856  if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
857  PQflush(conn->streamConn))
858  ereport(ERROR,
859  (errcode(ERRCODE_CONNECTION_FAILURE),
860  errmsg("could not send data to WAL stream: %s",
861  pchomp(PQerrorMessage(conn->streamConn)))));
862 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2627

References conn, ereport, errcode(), errmsg(), ERROR, pchomp(), PQerrorMessage(), PQflush(), and PQputCopyData().

◆ libpqrcv_server_version()

static int libpqrcv_server_version ( WalReceiverConn conn)
static

Definition at line 395 of file libpqwalreceiver.c.

396 {
397  return PQserverVersion(conn->streamConn);
398 }

References conn, and PQserverVersion().

◆ libpqrcv_startstreaming()

static bool libpqrcv_startstreaming ( WalReceiverConn conn,
const WalRcvStreamOptions options 
)
static

Definition at line 411 of file libpqwalreceiver.c.

413 {
414  StringInfoData cmd;
415  PGresult *res;
416 
417  Assert(options->logical == conn->logical);
418  Assert(options->slotname || !options->logical);
419 
420  initStringInfo(&cmd);
421 
422  /* Build the command. */
423  appendStringInfoString(&cmd, "START_REPLICATION");
424  if (options->slotname != NULL)
425  appendStringInfo(&cmd, " SLOT \"%s\"",
426  options->slotname);
427 
428  if (options->logical)
429  appendStringInfoString(&cmd, " LOGICAL");
430 
431  appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
432 
433  /*
434  * Additional options are different depending on if we are doing logical
435  * or physical replication.
436  */
437  if (options->logical)
438  {
439  char *pubnames_str;
440  List *pubnames;
441  char *pubnames_literal;
442 
443  appendStringInfoString(&cmd, " (");
444 
445  appendStringInfo(&cmd, "proto_version '%u'",
446  options->proto.logical.proto_version);
447 
448  if (options->proto.logical.streaming &&
449  PQserverVersion(conn->streamConn) >= 140000)
450  appendStringInfoString(&cmd, ", streaming 'on'");
451 
452  if (options->proto.logical.twophase &&
453  PQserverVersion(conn->streamConn) >= 150000)
454  appendStringInfoString(&cmd, ", two_phase 'on'");
455 
456  pubnames = options->proto.logical.publication_names;
457  pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
458  if (!pubnames_str)
459  ereport(ERROR,
460  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
461  errmsg("could not start WAL streaming: %s",
462  pchomp(PQerrorMessage(conn->streamConn)))));
463  pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
464  strlen(pubnames_str));
465  if (!pubnames_literal)
466  ereport(ERROR,
467  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
468  errmsg("could not start WAL streaming: %s",
469  pchomp(PQerrorMessage(conn->streamConn)))));
470  appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
471  PQfreemem(pubnames_literal);
472  pfree(pubnames_str);
473 
474  if (options->proto.logical.binary &&
475  PQserverVersion(conn->streamConn) >= 140000)
476  appendStringInfoString(&cmd, ", binary 'true'");
477 
478  appendStringInfoChar(&cmd, ')');
479  }
480  else
481  appendStringInfo(&cmd, " TIMELINE %u",
482  options->proto.physical.startpointTLI);
483 
484  /* Start streaming. */
485  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
486  pfree(cmd.data);
487 
489  {
490  PQclear(res);
491  return false;
492  }
493  else if (PQresultStatus(res) != PGRES_COPY_BOTH)
494  {
495  PQclear(res);
496  ereport(ERROR,
497  (errcode(ERRCODE_PROTOCOL_VIOLATION),
498  errmsg("could not start WAL streaming: %s",
499  pchomp(PQerrorMessage(conn->streamConn)))));
500  }
501  PQclear(res);
502  return true;
503 }
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4166
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
Definition: pg_list.h:51
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), Assert(), conn, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, initStringInfo(), libpqrcv_PQexec(), LSN_FORMAT_ARGS, pchomp(), pfree(), PGRES_COMMAND_OK, PGRES_COPY_BOTH, PQclear(), PQerrorMessage(), PQescapeLiteral(), PQfreemem(), PQresultStatus(), PQserverVersion(), res, and stringlist_to_identifierstr().

◆ stringlist_to_identifierstr()

static char * stringlist_to_identifierstr ( PGconn conn,
List strings 
)
static

Definition at line 1138 of file libpqwalreceiver.c.

1139 {
1140  ListCell *lc;
1142  bool first = true;
1143 
1144  initStringInfo(&res);
1145 
1146  foreach(lc, strings)
1147  {
1148  char *val = strVal(lfirst(lc));
1149  char *val_escaped;
1150 
1151  if (first)
1152  first = false;
1153  else
1154  appendStringInfoChar(&res, ',');
1155 
1156  val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1157  if (!val_escaped)
1158  {
1159  free(res.data);
1160  return NULL;
1161  }
1162  appendStringInfoString(&res, val_escaped);
1163  PQfreemem(val_escaped);
1164  }
1165 
1166  return res.data;
1167 }
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4172
#define free(a)
Definition: header.h:65
long val
Definition: informix.c:664
#define lfirst(lc)
Definition: pg_list.h:169
#define strVal(v)
Definition: value.h:72

References appendStringInfoChar(), appendStringInfoString(), conn, free, initStringInfo(), lfirst, PQescapeIdentifier(), PQfreemem(), res, strVal, and val.

Referenced by libpqrcv_startstreaming().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 37 of file libpqwalreceiver.c.

◆ PQWalReceiverFunctions

WalReceiverFunctionsType PQWalReceiverFunctions
static
Initial value:
= {
}
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
static int libpqrcv_server_version(WalReceiverConn *conn)
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static void libpqrcv_check_conninfo(const char *conninfo)
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)

Definition at line 86 of file libpqwalreceiver.c.

Referenced by _PG_init().