PostgreSQL Source Code  git master
libpqwalreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/time.h>
#include "access/xlog.h"
#include "catalog/pg_type.h"
#include "common/connect.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "pqexpbuffer.h"
#include "replication/walreceiver.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/tuplestore.h"
Include dependency graph for libpqwalreceiver.c:

Go to the source code of this file.

Data Structures

struct  WalReceiverConn
 

Functions

static WalReceiverConnlibpqrcv_connect (const char *conninfo, bool logical, bool must_use_password, const char *appname, char **err)
 
static void libpqrcv_check_conninfo (const char *conninfo, bool must_use_password)
 
static char * libpqrcv_get_conninfo (WalReceiverConn *conn)
 
static void libpqrcv_get_senderinfo (WalReceiverConn *conn, char **sender_host, int *sender_port)
 
static char * libpqrcv_identify_system (WalReceiverConn *conn, TimeLineID *primary_tli)
 
static int libpqrcv_server_version (WalReceiverConn *conn)
 
static void libpqrcv_readtimelinehistoryfile (WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
 
static bool libpqrcv_startstreaming (WalReceiverConn *conn, const WalRcvStreamOptions *options)
 
static void libpqrcv_endstreaming (WalReceiverConn *conn, TimeLineID *next_tli)
 
static int libpqrcv_receive (WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
 
static void libpqrcv_send (WalReceiverConn *conn, const char *buffer, int nbytes)
 
static char * libpqrcv_create_slot (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
 
static pid_t libpqrcv_get_backend_pid (WalReceiverConn *conn)
 
static WalRcvExecResultlibpqrcv_exec (WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
 
static void libpqrcv_disconnect (WalReceiverConn *conn)
 
static PGresultlibpqrcv_PQexec (PGconn *streamConn, const char *query)
 
static PGresultlibpqrcv_PQgetResult (PGconn *streamConn)
 
static char * stringlist_to_identifierstr (PGconn *conn, List *strings)
 
void _PG_init (void)
 
static void libpqrcv_processTuples (PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
 

Variables

 PG_MODULE_MAGIC
 
static WalReceiverFunctionsType PQWalReceiverFunctions
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 113 of file libpqwalreceiver.c.

114 {
115  if (WalReceiverFunctions != NULL)
116  elog(ERROR, "libpqwalreceiver already loaded");
118 }
#define ERROR
Definition: elog.h:39
static WalReceiverFunctionsType PQWalReceiverFunctions
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:96

References elog(), ERROR, PQWalReceiverFunctions, and WalReceiverFunctions.

◆ libpqrcv_check_conninfo()

static void libpqrcv_check_conninfo ( const char *  conninfo,
bool  must_use_password 
)
static

Definition at line 242 of file libpqwalreceiver.c.

243 {
244  PQconninfoOption *opts = NULL;
245  PQconninfoOption *opt;
246  char *err = NULL;
247 
248  opts = PQconninfoParse(conninfo, &err);
249  if (opts == NULL)
250  {
251  /* The error string is malloc'd, so we must free it explicitly */
252  char *errcopy = err ? pstrdup(err) : "out of memory";
253 
254  PQfreemem(err);
255  ereport(ERROR,
256  (errcode(ERRCODE_SYNTAX_ERROR),
257  errmsg("invalid connection string syntax: %s", errcopy)));
258  }
259 
260  if (must_use_password)
261  {
262  bool uses_password = false;
263 
264  for (opt = opts; opt->keyword != NULL; ++opt)
265  {
266  /* Ignore connection options that are not present. */
267  if (opt->val == NULL)
268  continue;
269 
270  if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
271  {
272  uses_password = true;
273  break;
274  }
275  }
276 
277  if (!uses_password)
278  ereport(ERROR,
279  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
280  errmsg("password is required"),
281  errdetail("Non-superusers must provide a password in the connection string.")));
282  }
283 
285 }
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ereport(elevel,...)
Definition: elog.h:149
void err(int eval, const char *fmt,...)
Definition: err.c:43
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5829
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7081
void PQfreemem(void *ptr)
Definition: fe-exec.c:3946
char * pstrdup(const char *in)
Definition: mcxt.c:1644
static AmcheckOptions opts
Definition: pg_amcheck.c:110

References ereport, err(), errcode(), errdetail(), errmsg(), ERROR, _PQconninfoOption::keyword, opts, PQconninfoFree(), PQconninfoParse(), PQfreemem(), pstrdup(), and _PQconninfoOption::val.

◆ libpqrcv_connect()

static WalReceiverConn * libpqrcv_connect ( const char *  conninfo,
bool  logical,
bool  must_use_password,
const char *  appname,
char **  err 
)
static

Definition at line 132 of file libpqwalreceiver.c.

134 {
136  const char *keys[6];
137  const char *vals[6];
138  int i = 0;
139 
140  /*
141  * We use the expand_dbname parameter to process the connection string (or
142  * URI), and pass some extra options.
143  */
144  keys[i] = "dbname";
145  vals[i] = conninfo;
146  keys[++i] = "replication";
147  vals[i] = logical ? "database" : "true";
148  if (!logical)
149  {
150  /*
151  * The database name is ignored by the server in replication mode, but
152  * specify "replication" for .pgpass lookup.
153  */
154  keys[++i] = "dbname";
155  vals[i] = "replication";
156  }
157  keys[++i] = "fallback_application_name";
158  vals[i] = appname;
159  if (logical)
160  {
161  /* Tell the publisher to translate to our encoding */
162  keys[++i] = "client_encoding";
163  vals[i] = GetDatabaseEncodingName();
164 
165  /*
166  * Force assorted GUC parameters to settings that ensure that the
167  * publisher will output data values in a form that is unambiguous to
168  * the subscriber. (We don't want to modify the subscriber's GUC
169  * settings, since that might surprise user-defined code running in
170  * the subscriber, such as triggers.) This should match what pg_dump
171  * does.
172  */
173  keys[++i] = "options";
174  vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
175  }
176  keys[++i] = NULL;
177  vals[i] = NULL;
178 
179  Assert(i < sizeof(keys));
180 
181  conn = palloc0(sizeof(WalReceiverConn));
182  conn->streamConn =
183  libpqsrv_connect_params(keys, vals,
184  /* expand_dbname = */ true,
185  WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
186 
187  if (PQstatus(conn->streamConn) != CONNECTION_OK)
188  goto bad_connection_errmsg;
189 
190  if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
191  {
192  libpqsrv_disconnect(conn->streamConn);
193  pfree(conn);
194 
195  ereport(ERROR,
196  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
197  errmsg("password is required"),
198  errdetail("Non-superuser cannot connect if the server does not request a password."),
199  errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
200  }
201 
202  if (logical)
203  {
204  PGresult *res;
205 
206  res = libpqrcv_PQexec(conn->streamConn,
209  {
210  PQclear(res);
211  *err = psprintf(_("could not clear search path: %s"),
212  pchomp(PQerrorMessage(conn->streamConn)));
213  goto bad_connection;
214  }
215  PQclear(res);
216  }
217 
218  conn->logical = logical;
219 
220  return conn;
221 
222  /* error path, using libpq's error message */
223 bad_connection_errmsg:
224  *err = pchomp(PQerrorMessage(conn->streamConn));
225 
226  /* error path, error already set */
227 bad_connection:
228  libpqsrv_disconnect(conn->streamConn);
229  pfree(conn);
230  return NULL;
231 }
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
int errhint(const char *fmt,...)
Definition: elog.c:1316
#define _(x)
Definition: elog.c:91
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7314
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7248
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7195
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3325
int i
Definition: isn.c:73
static void libpqsrv_disconnect(PGconn *conn)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
@ CONNECTION_OK
Definition: libpq-fe.h:60
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:100
Assert(fmt[strlen(fmt) - 1] !='\n')
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1274
char * pchomp(const char *in)
Definition: mcxt.c:1672
void pfree(void *pointer)
Definition: mcxt.c:1456
void * palloc0(Size size)
Definition: mcxt.c:1257
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
PGconn * conn
Definition: streamutil.c:54

References _, ALWAYS_SECURE_SEARCH_PATH_SQL, Assert(), conn, CONNECTION_OK, ereport, err(), errcode(), errdetail(), errhint(), errmsg(), ERROR, GetDatabaseEncodingName(), i, libpqrcv_PQexec(), libpqsrv_connect_params(), libpqsrv_disconnect(), palloc0(), pchomp(), pfree(), PGRES_TUPLES_OK, PQclear(), PQconnectionUsedPassword(), PQerrorMessage(), PQresultStatus(), PQstatus(), psprintf(), and res.

◆ libpqrcv_create_slot()

static char * libpqrcv_create_slot ( WalReceiverConn conn,
const char *  slotname,
bool  temporary,
bool  two_phase,
CRSSnapshotAction  snapshot_action,
XLogRecPtr lsn 
)
static

Definition at line 885 of file libpqwalreceiver.c.

888 {
889  PGresult *res;
890  StringInfoData cmd;
891  char *snapshot;
892  int use_new_options_syntax;
893 
894  use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
895 
896  initStringInfo(&cmd);
897 
898  appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
899 
900  if (temporary)
901  appendStringInfoString(&cmd, " TEMPORARY");
902 
903  if (conn->logical)
904  {
905  appendStringInfoString(&cmd, " LOGICAL pgoutput ");
906  if (use_new_options_syntax)
907  appendStringInfoChar(&cmd, '(');
908  if (two_phase)
909  {
910  appendStringInfoString(&cmd, "TWO_PHASE");
911  if (use_new_options_syntax)
912  appendStringInfoString(&cmd, ", ");
913  else
914  appendStringInfoChar(&cmd, ' ');
915  }
916 
917  if (use_new_options_syntax)
918  {
919  switch (snapshot_action)
920  {
921  case CRS_EXPORT_SNAPSHOT:
922  appendStringInfoString(&cmd, "SNAPSHOT 'export'");
923  break;
925  appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
926  break;
927  case CRS_USE_SNAPSHOT:
928  appendStringInfoString(&cmd, "SNAPSHOT 'use'");
929  break;
930  }
931  }
932  else
933  {
934  switch (snapshot_action)
935  {
936  case CRS_EXPORT_SNAPSHOT:
937  appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
938  break;
940  appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
941  break;
942  case CRS_USE_SNAPSHOT:
943  appendStringInfoString(&cmd, "USE_SNAPSHOT");
944  break;
945  }
946  }
947 
948  if (use_new_options_syntax)
949  appendStringInfoChar(&cmd, ')');
950  }
951  else
952  {
953  if (use_new_options_syntax)
954  appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
955  else
956  appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
957  }
958 
959  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
960  pfree(cmd.data);
961 
963  {
964  PQclear(res);
965  ereport(ERROR,
966  (errcode(ERRCODE_PROTOCOL_VIOLATION),
967  errmsg("could not create replication slot \"%s\": %s",
968  slotname, pchomp(PQerrorMessage(conn->streamConn)))));
969  }
970 
971  if (lsn)
973  CStringGetDatum(PQgetvalue(res, 0, 1))));
974 
975  if (!PQgetisnull(res, 0, 2))
976  snapshot = pstrdup(PQgetvalue(res, 0, 2));
977  else
978  snapshot = NULL;
979 
980  PQclear(res);
981 
982  return snapshot;
983 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7238
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3790
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3815
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:775
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
static bool two_phase
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:350
#define InvalidOid
Definition: postgres_ext.h:36
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), conn, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetDatum(), StringInfoData::data, DatumGetLSN(), DirectFunctionCall1Coll(), ereport, errcode(), errmsg(), ERROR, initStringInfo(), InvalidOid, libpqrcv_PQexec(), pchomp(), pfree(), pg_lsn_in(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetisnull(), PQgetvalue(), PQresultStatus(), PQserverVersion(), pstrdup(), res, and two_phase.

◆ libpqrcv_disconnect()

static void libpqrcv_disconnect ( WalReceiverConn conn)
static

Definition at line 755 of file libpqwalreceiver.c.

756 {
757  libpqsrv_disconnect(conn->streamConn);
758  PQfreemem(conn->recvBuf);
759  pfree(conn);
760 }

References conn, libpqsrv_disconnect(), pfree(), and PQfreemem().

◆ libpqrcv_endstreaming()

static void libpqrcv_endstreaming ( WalReceiverConn conn,
TimeLineID next_tli 
)
static

Definition at line 527 of file libpqwalreceiver.c.

528 {
529  PGresult *res;
530 
531  /*
532  * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
533  * block, but the risk seems small.
534  */
535  if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
536  PQflush(conn->streamConn))
537  ereport(ERROR,
538  (errcode(ERRCODE_CONNECTION_FAILURE),
539  errmsg("could not send end-of-streaming message to primary: %s",
540  pchomp(PQerrorMessage(conn->streamConn)))));
541 
542  *next_tli = 0;
543 
544  /*
545  * After COPY is finished, we should receive a result set indicating the
546  * next timeline's ID, or just CommandComplete if the server was shut
547  * down.
548  *
549  * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
550  * also possible in case we aborted the copy in mid-stream.
551  */
552  res = libpqrcv_PQgetResult(conn->streamConn);
554  {
555  /*
556  * Read the next timeline's ID. The server also sends the timeline's
557  * starting point, but it is ignored.
558  */
559  if (PQnfields(res) < 2 || PQntuples(res) != 1)
560  ereport(ERROR,
561  (errcode(ERRCODE_PROTOCOL_VIOLATION),
562  errmsg("unexpected result set after end-of-streaming")));
563  *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
564  PQclear(res);
565 
566  /* the result set should be followed by CommandComplete */
567  res = libpqrcv_PQgetResult(conn->streamConn);
568  }
569  else if (PQresultStatus(res) == PGRES_COPY_OUT)
570  {
571  PQclear(res);
572 
573  /* End the copy */
574  if (PQendcopy(conn->streamConn))
575  ereport(ERROR,
576  (errcode(ERRCODE_CONNECTION_FAILURE),
577  errmsg("error while shutting down streaming COPY: %s",
578  pchomp(PQerrorMessage(conn->streamConn)))));
579 
580  /* CommandComplete should follow */
581  res = libpqrcv_PQgetResult(conn->streamConn);
582  }
583 
585  ereport(ERROR,
586  (errcode(ERRCODE_PROTOCOL_VIOLATION),
587  errmsg("error reading result of streaming command: %s",
588  pchomp(PQerrorMessage(conn->streamConn)))));
589  PQclear(res);
590 
591  /* Verify that there are no more results */
592  res = libpqrcv_PQgetResult(conn->streamConn);
593  if (res != NULL)
594  ereport(ERROR,
595  (errcode(ERRCODE_PROTOCOL_VIOLATION),
596  errmsg("unexpected result after CommandComplete: %s",
597  pchomp(PQerrorMessage(conn->streamConn)))));
598 }
int PQflush(PGconn *conn)
Definition: fe-exec.c:3914
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2915
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2715
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3395
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3403
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
@ PGRES_COPY_OUT
Definition: libpq-fe.h:103
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
int32 pg_strtoint32(const char *s)
Definition: numutils.c:384

References conn, ereport, errcode(), errmsg(), ERROR, libpqrcv_PQgetResult(), pchomp(), pg_strtoint32(), PGRES_COMMAND_OK, PGRES_COPY_OUT, PGRES_TUPLES_OK, PQclear(), PQendcopy(), PQerrorMessage(), PQflush(), PQgetvalue(), PQnfields(), PQntuples(), PQputCopyEnd(), PQresultStatus(), and res.

◆ libpqrcv_exec()

static WalRcvExecResult * libpqrcv_exec ( WalReceiverConn conn,
const char *  query,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 1074 of file libpqwalreceiver.c.

1076 {
1077  PGresult *pgres = NULL;
1078  WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1079  char *diag_sqlstate;
1080 
1081  if (MyDatabaseId == InvalidOid)
1082  ereport(ERROR,
1083  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1084  errmsg("the query interface requires a database connection")));
1085 
1086  pgres = libpqrcv_PQexec(conn->streamConn, query);
1087 
1088  switch (PQresultStatus(pgres))
1089  {
1090  case PGRES_SINGLE_TUPLE:
1091  case PGRES_TUPLES_OK:
1092  walres->status = WALRCV_OK_TUPLES;
1093  libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1094  break;
1095 
1096  case PGRES_COPY_IN:
1097  walres->status = WALRCV_OK_COPY_IN;
1098  break;
1099 
1100  case PGRES_COPY_OUT:
1101  walres->status = WALRCV_OK_COPY_OUT;
1102  break;
1103 
1104  case PGRES_COPY_BOTH:
1105  walres->status = WALRCV_OK_COPY_BOTH;
1106  break;
1107 
1108  case PGRES_COMMAND_OK:
1109  walres->status = WALRCV_OK_COMMAND;
1110  break;
1111 
1112  /* Empty query is considered error. */
1113  case PGRES_EMPTY_QUERY:
1114  walres->status = WALRCV_ERROR;
1115  walres->err = _("empty query");
1116  break;
1117 
1118  case PGRES_PIPELINE_SYNC:
1120  walres->status = WALRCV_ERROR;
1121  walres->err = _("unexpected pipeline mode");
1122  break;
1123 
1124  case PGRES_NONFATAL_ERROR:
1125  case PGRES_FATAL_ERROR:
1126  case PGRES_BAD_RESPONSE:
1127  walres->status = WALRCV_ERROR;
1128  walres->err = pchomp(PQerrorMessage(conn->streamConn));
1129  diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1130  if (diag_sqlstate)
1131  walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1132  diag_sqlstate[1],
1133  diag_sqlstate[2],
1134  diag_sqlstate[3],
1135  diag_sqlstate[4]);
1136  break;
1137  }
1138 
1139  PQclear(pgres);
1140 
1141  return walres;
1142 }
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3380
Oid MyDatabaseId
Definition: globals.c:89
@ PGRES_COPY_IN
Definition: libpq-fe.h:104
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:109
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:108
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:110
@ PGRES_EMPTY_QUERY
Definition: libpq-fe.h:96
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:111
@ PGRES_BAD_RESPONSE
Definition: libpq-fe.h:105
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:112
@ PGRES_NONFATAL_ERROR
Definition: libpq-fe.h:107
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
WalRcvExecStatus status
Definition: walreceiver.h:220
@ WALRCV_OK_COPY_IN
Definition: walreceiver.h:208
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_ERROR
Definition: walreceiver.h:204
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
@ WALRCV_OK_COPY_BOTH
Definition: walreceiver.h:210

References _, conn, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, InvalidOid, libpqrcv_PQexec(), libpqrcv_processTuples(), MAKE_SQLSTATE, MyDatabaseId, palloc0(), pchomp(), PG_DIAG_SQLSTATE, PGRES_BAD_RESPONSE, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PGRES_EMPTY_QUERY, PGRES_FATAL_ERROR, PGRES_NONFATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQresultErrorField(), PQresultStatus(), WalRcvExecResult::sqlstate, WalRcvExecResult::status, WALRCV_ERROR, WALRCV_OK_COMMAND, WALRCV_OK_COPY_BOTH, WALRCV_OK_COPY_IN, WALRCV_OK_COPY_OUT, and WALRCV_OK_TUPLES.

◆ libpqrcv_get_backend_pid()

static pid_t libpqrcv_get_backend_pid ( WalReceiverConn conn)
static

Definition at line 989 of file libpqwalreceiver.c.

990 {
991  return PQbackendPID(conn->streamConn);
992 }
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7282

References conn, and PQbackendPID().

◆ libpqrcv_get_conninfo()

static char * libpqrcv_get_conninfo ( WalReceiverConn conn)
static

Definition at line 292 of file libpqwalreceiver.c.

293 {
294  PQconninfoOption *conn_opts;
295  PQconninfoOption *conn_opt;
297  char *retval;
298 
299  Assert(conn->streamConn != NULL);
300 
302  conn_opts = PQconninfo(conn->streamConn);
303 
304  if (conn_opts == NULL)
305  ereport(ERROR,
306  (errcode(ERRCODE_OUT_OF_MEMORY),
307  errmsg("could not parse connection string: %s",
308  _("out of memory"))));
309 
310  /* build a clean connection string from pieces */
311  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
312  {
313  bool obfuscate;
314 
315  /* Skip debug and empty options */
316  if (strchr(conn_opt->dispchar, 'D') ||
317  conn_opt->val == NULL ||
318  conn_opt->val[0] == '\0')
319  continue;
320 
321  /* Obfuscate security-sensitive options */
322  obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
323 
324  appendPQExpBuffer(&buf, "%s%s=%s",
325  buf.len == 0 ? "" : " ",
326  conn_opt->keyword,
327  obfuscate ? "********" : conn_opt->val);
328  }
329 
330  PQconninfoFree(conn_opts);
331 
332  retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
334  return retval;
335 }
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:7037
static char * buf
Definition: pg_test_fsync.c:67
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67

References _, appendPQExpBuffer(), Assert(), buf, conn, _PQconninfoOption::dispchar, ereport, errcode(), errmsg(), ERROR, initPQExpBuffer(), _PQconninfoOption::keyword, PQconninfo(), PQconninfoFree(), PQExpBufferDataBroken, pstrdup(), termPQExpBuffer(), and _PQconninfoOption::val.

◆ libpqrcv_get_senderinfo()

static void libpqrcv_get_senderinfo ( WalReceiverConn conn,
char **  sender_host,
int *  sender_port 
)
static

Definition at line 341 of file libpqwalreceiver.c.

343 {
344  char *ret = NULL;
345 
346  *sender_host = NULL;
347  *sender_port = 0;
348 
349  Assert(conn->streamConn != NULL);
350 
351  ret = PQhost(conn->streamConn);
352  if (ret && strlen(ret) != 0)
353  *sender_host = pstrdup(ret);
354 
355  ret = PQport(conn->streamConn);
356  if (ret && strlen(ret) != 0)
357  *sender_port = atoi(ret);
358 }
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:7127
char * PQport(const PGconn *conn)
Definition: fe-connect.c:7163

References Assert(), conn, PQhost(), PQport(), and pstrdup().

◆ libpqrcv_identify_system()

static char * libpqrcv_identify_system ( WalReceiverConn conn,
TimeLineID primary_tli 
)
static

Definition at line 365 of file libpqwalreceiver.c.

366 {
367  PGresult *res;
368  char *primary_sysid;
369 
370  /*
371  * Get the system identifier and timeline ID as a DataRow message from the
372  * primary server.
373  */
374  res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
376  {
377  PQclear(res);
378  ereport(ERROR,
379  (errcode(ERRCODE_PROTOCOL_VIOLATION),
380  errmsg("could not receive database system identifier and timeline ID from "
381  "the primary server: %s",
382  pchomp(PQerrorMessage(conn->streamConn)))));
383  }
384  if (PQnfields(res) < 3 || PQntuples(res) != 1)
385  {
386  int ntuples = PQntuples(res);
387  int nfields = PQnfields(res);
388 
389  PQclear(res);
390  ereport(ERROR,
391  (errcode(ERRCODE_PROTOCOL_VIOLATION),
392  errmsg("invalid response from primary server"),
393  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
394  ntuples, nfields, 3, 1)));
395  }
396  primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
397  *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
398  PQclear(res);
399 
400  return primary_sysid;
401 }

References conn, ereport, errcode(), errdetail(), errmsg(), ERROR, libpqrcv_PQexec(), pchomp(), pg_strtoint32(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), and res.

◆ libpqrcv_PQexec()

static PGresult * libpqrcv_PQexec ( PGconn streamConn,
const char *  query 
)
static

Definition at line 661 of file libpqwalreceiver.c.

662 {
663  PGresult *lastResult = NULL;
664 
665  /*
666  * PQexec() silently discards any prior query results on the connection.
667  * This is not required for this function as it's expected that the caller
668  * (which is this library in all cases) will behave correctly and we don't
669  * have to be backwards compatible with old libpq.
670  */
671 
672  /*
673  * Submit the query. Since we don't use non-blocking mode, this could
674  * theoretically block. In practice, since we don't send very long query
675  * strings, the risk seems negligible.
676  */
677  if (!PQsendQuery(streamConn, query))
678  return NULL;
679 
680  for (;;)
681  {
682  /* Wait for, and collect, the next PGresult. */
683  PGresult *result;
684 
685  result = libpqrcv_PQgetResult(streamConn);
686  if (result == NULL)
687  break; /* query is complete, or failure */
688 
689  /*
690  * Emulate PQexec()'s behavior of returning the last result when there
691  * are many. We are fine with returning just last error message.
692  */
693  PQclear(lastResult);
694  lastResult = result;
695 
696  if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
697  PQresultStatus(lastResult) == PGRES_COPY_OUT ||
698  PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
699  PQstatus(streamConn) == CONNECTION_BAD)
700  break;
701  }
702 
703  return lastResult;
704 }
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1422
@ CONNECTION_BAD
Definition: libpq-fe.h:61

References CONNECTION_BAD, libpqrcv_PQgetResult(), PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PQclear(), PQresultStatus(), PQsendQuery(), and PQstatus().

Referenced by libpqrcv_connect(), libpqrcv_create_slot(), libpqrcv_exec(), libpqrcv_identify_system(), libpqrcv_readtimelinehistoryfile(), and libpqrcv_startstreaming().

◆ libpqrcv_PQgetResult()

static PGresult * libpqrcv_PQgetResult ( PGconn streamConn)
static

Definition at line 710 of file libpqwalreceiver.c.

711 {
712  /*
713  * Collect data until PQgetResult is ready to get the result without
714  * blocking.
715  */
716  while (PQisBusy(streamConn))
717  {
718  int rc;
719 
720  /*
721  * We don't need to break down the sleep into smaller increments,
722  * since we'll get interrupted by signals and can handle any
723  * interrupts here.
724  */
727  WL_LATCH_SET,
728  PQsocket(streamConn),
729  0,
730  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
731 
732  /* Interrupted? */
733  if (rc & WL_LATCH_SET)
734  {
737  }
738 
739  /* Consume whatever data is available from the socket */
740  if (PQconsumeInput(streamConn) == 0)
741  {
742  /* trouble; return NULL */
743  return NULL;
744  }
745  }
746 
747  /* Now we can collect and return the next PGresult */
748  return PQgetResult(streamConn);
749 }
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7274
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1957
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2004
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2035
struct Latch * MyLatch
Definition: globals.c:58
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:538
void ResetLatch(Latch *latch)
Definition: latch.c:697
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:166

References MyLatch, PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ProcessWalRcvInterrupts(), ResetLatch(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by libpqrcv_endstreaming(), libpqrcv_PQexec(), and libpqrcv_receive().

◆ libpqrcv_processTuples()

static void libpqrcv_processTuples ( PGresult pgres,
WalRcvExecResult walres,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 998 of file libpqwalreceiver.c.

1000 {
1001  int tupn;
1002  int coln;
1003  int nfields = PQnfields(pgres);
1004  HeapTuple tuple;
1005  AttInMetadata *attinmeta;
1006  MemoryContext rowcontext;
1007  MemoryContext oldcontext;
1008 
1009  /* Make sure we got expected number of fields. */
1010  if (nfields != nRetTypes)
1011  ereport(ERROR,
1012  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1013  errmsg("invalid query response"),
1014  errdetail("Expected %d fields, got %d fields.",
1015  nRetTypes, nfields)));
1016 
1017  walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1018 
1019  /* Create tuple descriptor corresponding to expected result. */
1020  walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1021  for (coln = 0; coln < nRetTypes; coln++)
1022  TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1023  PQfname(pgres, coln), retTypes[coln], -1, 0);
1024  attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1025 
1026  /* No point in doing more here if there were no tuples returned. */
1027  if (PQntuples(pgres) == 0)
1028  return;
1029 
1030  /* Create temporary context for local allocations. */
1032  "libpqrcv query result context",
1034 
1035  /* Process returned rows. */
1036  for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1037  {
1038  char *cstrs[MaxTupleAttributeNumber];
1039 
1041 
1042  /* Do the allocations in temporary context. */
1043  oldcontext = MemoryContextSwitchTo(rowcontext);
1044 
1045  /*
1046  * Fill cstrs with null-terminated strings of column values.
1047  */
1048  for (coln = 0; coln < nfields; coln++)
1049  {
1050  if (PQgetisnull(pgres, tupn, coln))
1051  cstrs[coln] = NULL;
1052  else
1053  cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1054  }
1055 
1056  /* Convert row to a tuple, and add it to the tuplestore */
1057  tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1058  tuplestore_puttuple(walres->tuplestore, tuple);
1059 
1060  /* Clean up */
1061  MemoryContextSwitchTo(oldcontext);
1062  MemoryContextReset(rowcontext);
1063  }
1064 
1065  MemoryContextDelete(rowcontext);
1066 }
int16 AttrNumber
Definition: attnum.h:21
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2136
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2087
char * PQfname(const PGresult *res, int field_num)
Definition: fe-exec.c:3481
int work_mem
Definition: globals.c:125
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:583
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:730

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, BuildTupleFromCStrings(), CreateTemplateTupleDesc(), CurrentMemoryContext, ereport, errcode(), errdetail(), errmsg(), ERROR, MaxTupleAttributeNumber, MemoryContextDelete(), MemoryContextReset(), MemoryContextSwitchTo(), PQfname(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), ProcessWalRcvInterrupts(), WalRcvExecResult::tupledesc, TupleDescGetAttInMetadata(), TupleDescInitEntry(), WalRcvExecResult::tuplestore, tuplestore_begin_heap(), tuplestore_puttuple(), and work_mem.

Referenced by libpqrcv_exec().

◆ libpqrcv_readtimelinehistoryfile()

static void libpqrcv_readtimelinehistoryfile ( WalReceiverConn conn,
TimeLineID  tli,
char **  filename,
char **  content,
int *  len 
)
static

Definition at line 604 of file libpqwalreceiver.c.

607 {
608  PGresult *res;
609  char cmd[64];
610 
611  Assert(!conn->logical);
612 
613  /*
614  * Request the primary to send over the history file for given timeline.
615  */
616  snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
617  res = libpqrcv_PQexec(conn->streamConn, cmd);
619  {
620  PQclear(res);
621  ereport(ERROR,
622  (errcode(ERRCODE_PROTOCOL_VIOLATION),
623  errmsg("could not receive timeline history file from "
624  "the primary server: %s",
625  pchomp(PQerrorMessage(conn->streamConn)))));
626  }
627  if (PQnfields(res) != 2 || PQntuples(res) != 1)
628  {
629  int ntuples = PQntuples(res);
630  int nfields = PQnfields(res);
631 
632  PQclear(res);
633  ereport(ERROR,
634  (errcode(ERRCODE_PROTOCOL_VIOLATION),
635  errmsg("invalid response from primary server"),
636  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
637  ntuples, nfields)));
638  }
639  *filename = pstrdup(PQgetvalue(res, 0, 0));
640 
641  *len = PQgetlength(res, 0, 1);
642  *content = palloc(*len);
643  memcpy(*content, PQgetvalue(res, 0, 1), *len);
644  PQclear(res);
645 }
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3801
void * palloc(Size size)
Definition: mcxt.c:1226
const void size_t len
static char * filename
Definition: pg_dumpall.c:119
#define snprintf
Definition: port.h:238

References Assert(), conn, ereport, errcode(), errdetail(), errmsg(), ERROR, filename, len, libpqrcv_PQexec(), palloc(), pchomp(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetlength(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), res, and snprintf.

◆ libpqrcv_receive()

static int libpqrcv_receive ( WalReceiverConn conn,
char **  buffer,
pgsocket wait_fd 
)
static

Definition at line 779 of file libpqwalreceiver.c.

781 {
782  int rawlen;
783 
784  PQfreemem(conn->recvBuf);
785  conn->recvBuf = NULL;
786 
787  /* Try to receive a CopyData message */
788  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
789  if (rawlen == 0)
790  {
791  /* Try consuming some data. */
792  if (PQconsumeInput(conn->streamConn) == 0)
793  ereport(ERROR,
794  (errcode(ERRCODE_CONNECTION_FAILURE),
795  errmsg("could not receive data from WAL stream: %s",
796  pchomp(PQerrorMessage(conn->streamConn)))));
797 
798  /* Now that we've consumed some input, try again */
799  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
800  if (rawlen == 0)
801  {
802  /* Tell caller to try again when our socket is ready. */
803  *wait_fd = PQsocket(conn->streamConn);
804  return 0;
805  }
806  }
807  if (rawlen == -1) /* end-of-streaming or error */
808  {
809  PGresult *res;
810 
811  res = libpqrcv_PQgetResult(conn->streamConn);
813  {
814  PQclear(res);
815 
816  /* Verify that there are no more results. */
817  res = libpqrcv_PQgetResult(conn->streamConn);
818  if (res != NULL)
819  {
820  PQclear(res);
821 
822  /*
823  * If the other side closed the connection orderly (otherwise
824  * we'd seen an error, or PGRES_COPY_IN) don't report an error
825  * here, but let callers deal with it.
826  */
827  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
828  return -1;
829 
830  ereport(ERROR,
831  (errcode(ERRCODE_PROTOCOL_VIOLATION),
832  errmsg("unexpected result after CommandComplete: %s",
833  PQerrorMessage(conn->streamConn))));
834  }
835 
836  return -1;
837  }
838  else if (PQresultStatus(res) == PGRES_COPY_IN)
839  {
840  PQclear(res);
841  return -1;
842  }
843  else
844  {
845  PQclear(res);
846  ereport(ERROR,
847  (errcode(ERRCODE_PROTOCOL_VIOLATION),
848  errmsg("could not receive data from WAL stream: %s",
849  pchomp(PQerrorMessage(conn->streamConn)))));
850  }
851  }
852  if (rawlen < -1)
853  ereport(ERROR,
854  (errcode(ERRCODE_PROTOCOL_VIOLATION),
855  errmsg("could not receive data from WAL stream: %s",
856  pchomp(PQerrorMessage(conn->streamConn)))));
857 
858  /* Return received messages to caller */
859  *buffer = conn->recvBuf;
860  return rawlen;
861 }
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2782

References conn, CONNECTION_BAD, ereport, errcode(), errmsg(), ERROR, libpqrcv_PQgetResult(), pchomp(), PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear(), PQconsumeInput(), PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQresultStatus(), PQsocket(), PQstatus(), and res.

◆ libpqrcv_send()

static void libpqrcv_send ( WalReceiverConn conn,
const char *  buffer,
int  nbytes 
)
static

Definition at line 869 of file libpqwalreceiver.c.

870 {
871  if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
872  PQflush(conn->streamConn))
873  ereport(ERROR,
874  (errcode(ERRCODE_CONNECTION_FAILURE),
875  errmsg("could not send data to WAL stream: %s",
876  pchomp(PQerrorMessage(conn->streamConn)))));
877 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2661

References conn, ereport, errcode(), errmsg(), ERROR, pchomp(), PQerrorMessage(), PQflush(), and PQputCopyData().

◆ libpqrcv_server_version()

static int libpqrcv_server_version ( WalReceiverConn conn)
static

Definition at line 407 of file libpqwalreceiver.c.

408 {
409  return PQserverVersion(conn->streamConn);
410 }

References conn, and PQserverVersion().

◆ libpqrcv_startstreaming()

static bool libpqrcv_startstreaming ( WalReceiverConn conn,
const WalRcvStreamOptions options 
)
static

Definition at line 423 of file libpqwalreceiver.c.

425 {
426  StringInfoData cmd;
427  PGresult *res;
428 
429  Assert(options->logical == conn->logical);
430  Assert(options->slotname || !options->logical);
431 
432  initStringInfo(&cmd);
433 
434  /* Build the command. */
435  appendStringInfoString(&cmd, "START_REPLICATION");
436  if (options->slotname != NULL)
437  appendStringInfo(&cmd, " SLOT \"%s\"",
438  options->slotname);
439 
440  if (options->logical)
441  appendStringInfoString(&cmd, " LOGICAL");
442 
443  appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
444 
445  /*
446  * Additional options are different depending on if we are doing logical
447  * or physical replication.
448  */
449  if (options->logical)
450  {
451  char *pubnames_str;
452  List *pubnames;
453  char *pubnames_literal;
454 
455  appendStringInfoString(&cmd, " (");
456 
457  appendStringInfo(&cmd, "proto_version '%u'",
458  options->proto.logical.proto_version);
459 
460  if (options->proto.logical.streaming_str)
461  appendStringInfo(&cmd, ", streaming '%s'",
462  options->proto.logical.streaming_str);
463 
464  if (options->proto.logical.twophase &&
465  PQserverVersion(conn->streamConn) >= 150000)
466  appendStringInfoString(&cmd, ", two_phase 'on'");
467 
468  if (options->proto.logical.origin &&
469  PQserverVersion(conn->streamConn) >= 160000)
470  appendStringInfo(&cmd, ", origin '%s'",
471  options->proto.logical.origin);
472 
473  pubnames = options->proto.logical.publication_names;
474  pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
475  if (!pubnames_str)
476  ereport(ERROR,
477  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
478  errmsg("could not start WAL streaming: %s",
479  pchomp(PQerrorMessage(conn->streamConn)))));
480  pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
481  strlen(pubnames_str));
482  if (!pubnames_literal)
483  ereport(ERROR,
484  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
485  errmsg("could not start WAL streaming: %s",
486  pchomp(PQerrorMessage(conn->streamConn)))));
487  appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
488  PQfreemem(pubnames_literal);
489  pfree(pubnames_str);
490 
491  if (options->proto.logical.binary &&
492  PQserverVersion(conn->streamConn) >= 140000)
493  appendStringInfoString(&cmd, ", binary 'true'");
494 
495  appendStringInfoChar(&cmd, ')');
496  }
497  else
498  appendStringInfo(&cmd, " TIMELINE %u",
499  options->proto.physical.startpointTLI);
500 
501  /* Start streaming. */
502  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
503  pfree(cmd.data);
504 
506  {
507  PQclear(res);
508  return false;
509  }
510  else if (PQresultStatus(res) != PGRES_COPY_BOTH)
511  {
512  PQclear(res);
513  ereport(ERROR,
514  (errcode(ERRCODE_PROTOCOL_VIOLATION),
515  errmsg("could not start WAL streaming: %s",
516  pchomp(PQerrorMessage(conn->streamConn)))));
517  }
518  PQclear(res);
519  return true;
520 }
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4218
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
Definition: pg_list.h:54
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), Assert(), conn, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, initStringInfo(), libpqrcv_PQexec(), LSN_FORMAT_ARGS, pchomp(), pfree(), PGRES_COMMAND_OK, PGRES_COPY_BOTH, PQclear(), PQerrorMessage(), PQescapeLiteral(), PQfreemem(), PQresultStatus(), PQserverVersion(), res, and stringlist_to_identifierstr().

◆ stringlist_to_identifierstr()

static char * stringlist_to_identifierstr ( PGconn conn,
List strings 
)
static

Definition at line 1153 of file libpqwalreceiver.c.

1154 {
1155  ListCell *lc;
1157  bool first = true;
1158 
1159  initStringInfo(&res);
1160 
1161  foreach(lc, strings)
1162  {
1163  char *val = strVal(lfirst(lc));
1164  char *val_escaped;
1165 
1166  if (first)
1167  first = false;
1168  else
1169  appendStringInfoChar(&res, ',');
1170 
1171  val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1172  if (!val_escaped)
1173  {
1174  free(res.data);
1175  return NULL;
1176  }
1177  appendStringInfoString(&res, val_escaped);
1178  PQfreemem(val_escaped);
1179  }
1180 
1181  return res.data;
1182 }
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4224
#define free(a)
Definition: header.h:65
long val
Definition: informix.c:664
#define lfirst(lc)
Definition: pg_list.h:172
#define strVal(v)
Definition: value.h:82

References appendStringInfoChar(), appendStringInfoString(), conn, free, initStringInfo(), lfirst, PQescapeIdentifier(), PQfreemem(), res, strVal, and val.

Referenced by libpqrcv_startstreaming().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 38 of file libpqwalreceiver.c.

◆ PQWalReceiverFunctions

WalReceiverFunctionsType PQWalReceiverFunctions
static
Initial value:
= {
.walrcv_connect = libpqrcv_connect,
.walrcv_check_conninfo = libpqrcv_check_conninfo,
.walrcv_get_conninfo = libpqrcv_get_conninfo,
.walrcv_get_senderinfo = libpqrcv_get_senderinfo,
.walrcv_identify_system = libpqrcv_identify_system,
.walrcv_server_version = libpqrcv_server_version,
.walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
.walrcv_startstreaming = libpqrcv_startstreaming,
.walrcv_endstreaming = libpqrcv_endstreaming,
.walrcv_receive = libpqrcv_receive,
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
}
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
static int libpqrcv_server_version(WalReceiverConn *conn)
static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, const char *appname, char **err)

Definition at line 86 of file libpqwalreceiver.c.

Referenced by _PG_init().