PostgreSQL Source Code  git master
libpqwalreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/time.h>
#include "common/connect.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "pqexpbuffer.h"
#include "replication/walreceiver.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/tuplestore.h"
Include dependency graph for libpqwalreceiver.c:

Go to the source code of this file.

Data Structures

struct  WalReceiverConn
 

Functions

static WalReceiverConnlibpqrcv_connect (const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err)
 
static void libpqrcv_check_conninfo (const char *conninfo, bool must_use_password)
 
static char * libpqrcv_get_conninfo (WalReceiverConn *conn)
 
static void libpqrcv_get_senderinfo (WalReceiverConn *conn, char **sender_host, int *sender_port)
 
static char * libpqrcv_identify_system (WalReceiverConn *conn, TimeLineID *primary_tli)
 
static char * libpqrcv_get_dbname_from_conninfo (const char *conninfo)
 
static int libpqrcv_server_version (WalReceiverConn *conn)
 
static void libpqrcv_readtimelinehistoryfile (WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
 
static bool libpqrcv_startstreaming (WalReceiverConn *conn, const WalRcvStreamOptions *options)
 
static void libpqrcv_endstreaming (WalReceiverConn *conn, TimeLineID *next_tli)
 
static int libpqrcv_receive (WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
 
static void libpqrcv_send (WalReceiverConn *conn, const char *buffer, int nbytes)
 
static char * libpqrcv_create_slot (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
 
static void libpqrcv_alter_slot (WalReceiverConn *conn, const char *slotname, bool failover)
 
static pid_t libpqrcv_get_backend_pid (WalReceiverConn *conn)
 
static WalRcvExecResultlibpqrcv_exec (WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
 
static void libpqrcv_disconnect (WalReceiverConn *conn)
 
static PGresultlibpqrcv_PQexec (PGconn *streamConn, const char *query)
 
static PGresultlibpqrcv_PQgetResult (PGconn *streamConn)
 
static char * stringlist_to_identifierstr (PGconn *conn, List *strings)
 
void _PG_init (void)
 
static void libpqrcv_processTuples (PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
 

Variables

 PG_MODULE_MAGIC
 
static WalReceiverFunctionsType PQWalReceiverFunctions
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 120 of file libpqwalreceiver.c.

121 {
122  if (WalReceiverFunctions != NULL)
123  elog(ERROR, "libpqwalreceiver already loaded");
125 }
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
static WalReceiverFunctionsType PQWalReceiverFunctions
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:93

References elog, ERROR, PQWalReceiverFunctions, and WalReceiverFunctions.

◆ libpqrcv_alter_slot()

static void libpqrcv_alter_slot ( WalReceiverConn conn,
const char *  slotname,
bool  failover 
)
static

Definition at line 1123 of file libpqwalreceiver.c.

1125 {
1126  StringInfoData cmd;
1127  PGresult *res;
1128 
1129  initStringInfo(&cmd);
1130  appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
1131  quote_identifier(slotname),
1132  failover ? "true" : "false");
1133 
1134  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1135  pfree(cmd.data);
1136 
1138  ereport(ERROR,
1139  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1140  errmsg("could not alter replication slot \"%s\": %s",
1141  slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1142 
1143  PQclear(res);
1144 }
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ereport(elevel,...)
Definition: elog.h:149
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7147
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
char * pchomp(const char *in)
Definition: mcxt.c:1723
void pfree(void *pointer)
Definition: mcxt.c:1520
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12623
PGconn * conn
Definition: streamutil.c:55
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59

References appendStringInfo(), conn, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, initStringInfo(), libpqrcv_PQexec(), pchomp(), pfree(), PGRES_COMMAND_OK, PQclear(), PQerrorMessage(), PQresultStatus(), quote_identifier(), and res.

◆ libpqrcv_check_conninfo()

static void libpqrcv_check_conninfo ( const char *  conninfo,
bool  must_use_password 
)
static

Definition at line 316 of file libpqwalreceiver.c.

317 {
318  PQconninfoOption *opts = NULL;
319  PQconninfoOption *opt;
320  char *err = NULL;
321 
322  opts = PQconninfoParse(conninfo, &err);
323  if (opts == NULL)
324  {
325  /* The error string is malloc'd, so we must free it explicitly */
326  char *errcopy = err ? pstrdup(err) : "out of memory";
327 
328  PQfreemem(err);
329  ereport(ERROR,
330  (errcode(ERRCODE_SYNTAX_ERROR),
331  errmsg("invalid connection string syntax: %s", errcopy)));
332  }
333 
334  if (must_use_password)
335  {
336  bool uses_password = false;
337 
338  for (opt = opts; opt->keyword != NULL; ++opt)
339  {
340  /* Ignore connection options that are not present. */
341  if (opt->val == NULL)
342  continue;
343 
344  if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
345  {
346  uses_password = true;
347  break;
348  }
349  }
350 
351  if (!uses_password)
352  {
353  /* malloc'd, so we must free it explicitly */
355 
356  ereport(ERROR,
357  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
358  errmsg("password is required"),
359  errdetail("Non-superusers must provide a password in the connection string.")));
360  }
361  }
362 
364 }
int errdetail(const char *fmt,...)
Definition: elog.c:1205
void err(int eval, const char *fmt,...)
Definition: err.c:43
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5728
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:6980
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
char * pstrdup(const char *in)
Definition: mcxt.c:1695
static AmcheckOptions opts
Definition: pg_amcheck.c:111

References ereport, err(), errcode(), errdetail(), errmsg(), ERROR, _PQconninfoOption::keyword, opts, PQconninfoFree(), PQconninfoParse(), PQfreemem(), pstrdup(), and _PQconninfoOption::val.

Referenced by libpqrcv_connect().

◆ libpqrcv_connect()

static WalReceiverConn * libpqrcv_connect ( const char *  conninfo,
bool  replication,
bool  logical,
bool  must_use_password,
const char *  appname,
char **  err 
)
static

Definition at line 143 of file libpqwalreceiver.c.

145 {
148  const char *keys[6];
149  const char *vals[6];
150  int i = 0;
151 
152  /*
153  * Re-validate connection string. The validation already happened at DDL
154  * time, but the subscription owner may have changed. If we don't recheck
155  * with the correct must_use_password, it's possible that the connection
156  * will obtain the password from a different source, such as PGPASSFILE or
157  * PGPASSWORD.
158  */
159  libpqrcv_check_conninfo(conninfo, must_use_password);
160 
161  /*
162  * We use the expand_dbname parameter to process the connection string (or
163  * URI), and pass some extra options.
164  */
165  keys[i] = "dbname";
166  vals[i] = conninfo;
167 
168  /* We can not have logical without replication */
169  Assert(replication || !logical);
170 
171  if (replication)
172  {
173  keys[++i] = "replication";
174  vals[i] = logical ? "database" : "true";
175 
176  if (logical)
177  {
178  /* Tell the publisher to translate to our encoding */
179  keys[++i] = "client_encoding";
180  vals[i] = GetDatabaseEncodingName();
181 
182  /*
183  * Force assorted GUC parameters to settings that ensure that the
184  * publisher will output data values in a form that is unambiguous
185  * to the subscriber. (We don't want to modify the subscriber's
186  * GUC settings, since that might surprise user-defined code
187  * running in the subscriber, such as triggers.) This should
188  * match what pg_dump does.
189  */
190  keys[++i] = "options";
191  vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
192  }
193  else
194  {
195  /*
196  * The database name is ignored by the server in replication mode,
197  * but specify "replication" for .pgpass lookup.
198  */
199  keys[++i] = "dbname";
200  vals[i] = "replication";
201  }
202  }
203 
204  keys[++i] = "fallback_application_name";
205  vals[i] = appname;
206 
207  keys[++i] = NULL;
208  vals[i] = NULL;
209 
210  Assert(i < sizeof(keys));
211 
212  conn = palloc0(sizeof(WalReceiverConn));
213  conn->streamConn = PQconnectStartParams(keys, vals,
214  /* expand_dbname = */ true);
215  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
216  goto bad_connection_errmsg;
217 
218  /*
219  * Poll connection until we have OK or FAILED status.
220  *
221  * Per spec for PQconnectPoll, first wait till socket is write-ready.
222  */
223  status = PGRES_POLLING_WRITING;
224  do
225  {
226  int io_flag;
227  int rc;
228 
229  if (status == PGRES_POLLING_READING)
230  io_flag = WL_SOCKET_READABLE;
231 #ifdef WIN32
232  /* Windows needs a different test while waiting for connection-made */
233  else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
234  io_flag = WL_SOCKET_CONNECTED;
235 #endif
236  else
237  io_flag = WL_SOCKET_WRITEABLE;
238 
240  WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
241  PQsocket(conn->streamConn),
242  0,
243  WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
244 
245  /* Interrupted? */
246  if (rc & WL_LATCH_SET)
247  {
250  }
251 
252  /* If socket is ready, advance the libpq state machine */
253  if (rc & io_flag)
254  status = PQconnectPoll(conn->streamConn);
255  } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
256 
257  if (PQstatus(conn->streamConn) != CONNECTION_OK)
258  goto bad_connection_errmsg;
259 
260  if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
261  {
262  PQfinish(conn->streamConn);
263  pfree(conn);
264 
265  ereport(ERROR,
266  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
267  errmsg("password is required"),
268  errdetail("Non-superuser cannot connect if the server does not request a password."),
269  errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
270  }
271 
272  /*
273  * Set always-secure search path for the cases where the connection is
274  * used to run SQL queries, so malicious users can't get control.
275  */
276  if (!replication || logical)
277  {
278  PGresult *res;
279 
280  res = libpqrcv_PQexec(conn->streamConn,
283  {
284  PQclear(res);
285  *err = psprintf(_("could not clear search path: %s"),
286  pchomp(PQerrorMessage(conn->streamConn)));
287  goto bad_connection;
288  }
289  PQclear(res);
290  }
291 
292  conn->logical = logical;
293 
294  return conn;
295 
296  /* error path, using libpq's error message */
297 bad_connection_errmsg:
298  *err = pchomp(PQerrorMessage(conn->streamConn));
299 
300  /* error path, error already set */
301 bad_connection:
302  PQfinish(conn->streamConn);
303  pfree(conn);
304  return NULL;
305 }
#define Assert(condition)
Definition: c.h:858
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
int errhint(const char *fmt,...)
Definition: elog.c:1319
#define _(x)
Definition: elog.c:90
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:791
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7213
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2591
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7094
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4868
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7173
struct Latch * MyLatch
Definition: globals.c:60
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
#define WL_SOCKET_CONNECTED
Definition: latch.h:137
#define WL_SOCKET_WRITEABLE
Definition: latch.h:129
@ CONNECTION_STARTED
Definition: libpq-fe.h:69
@ CONNECTION_BAD
Definition: libpq-fe.h:62
@ CONNECTION_OK
Definition: libpq-fe.h:61
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:103
PostgresPollingStatusType
Definition: libpq-fe.h:89
@ PGRES_POLLING_OK
Definition: libpq-fe.h:93
@ PGRES_POLLING_READING
Definition: libpq-fe.h:91
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:92
@ PGRES_POLLING_FAILED
Definition: libpq-fe.h:90
static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1267
void * palloc0(Size size)
Definition: mcxt.c:1346
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:162

References _, ALWAYS_SECURE_SEARCH_PATH_SQL, Assert, conn, CONNECTION_BAD, CONNECTION_OK, CONNECTION_STARTED, ereport, err(), errcode(), errdetail(), errhint(), errmsg(), ERROR, GetDatabaseEncodingName(), i, libpqrcv_check_conninfo(), libpqrcv_PQexec(), MyLatch, palloc0(), pchomp(), pfree(), PGRES_POLLING_FAILED, PGRES_POLLING_OK, PGRES_POLLING_READING, PGRES_POLLING_WRITING, PGRES_TUPLES_OK, PQclear(), PQconnectionUsedPassword(), PQconnectPoll(), PQconnectStartParams(), PQerrorMessage(), PQfinish(), PQresultStatus(), PQsocket(), PQstatus(), ProcessWalRcvInterrupts(), psprintf(), res, ResetLatch(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_CONNECTED, WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

◆ libpqrcv_create_slot()

static char * libpqrcv_create_slot ( WalReceiverConn conn,
const char *  slotname,
bool  temporary,
bool  two_phase,
bool  failover,
CRSSnapshotAction  snapshot_action,
XLogRecPtr lsn 
)
static

Definition at line 1010 of file libpqwalreceiver.c.

1013 {
1014  PGresult *res;
1015  StringInfoData cmd;
1016  char *snapshot;
1017  int use_new_options_syntax;
1018 
1019  use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
1020 
1021  initStringInfo(&cmd);
1022 
1023  appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
1024 
1025  if (temporary)
1026  appendStringInfoString(&cmd, " TEMPORARY");
1027 
1028  if (conn->logical)
1029  {
1030  appendStringInfoString(&cmd, " LOGICAL pgoutput ");
1031  if (use_new_options_syntax)
1032  appendStringInfoChar(&cmd, '(');
1033  if (two_phase)
1034  {
1035  appendStringInfoString(&cmd, "TWO_PHASE");
1036  if (use_new_options_syntax)
1037  appendStringInfoString(&cmd, ", ");
1038  else
1039  appendStringInfoChar(&cmd, ' ');
1040  }
1041 
1042  if (failover)
1043  {
1044  appendStringInfoString(&cmd, "FAILOVER");
1045  if (use_new_options_syntax)
1046  appendStringInfoString(&cmd, ", ");
1047  else
1048  appendStringInfoChar(&cmd, ' ');
1049  }
1050 
1051  if (use_new_options_syntax)
1052  {
1053  switch (snapshot_action)
1054  {
1055  case CRS_EXPORT_SNAPSHOT:
1056  appendStringInfoString(&cmd, "SNAPSHOT 'export'");
1057  break;
1058  case CRS_NOEXPORT_SNAPSHOT:
1059  appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
1060  break;
1061  case CRS_USE_SNAPSHOT:
1062  appendStringInfoString(&cmd, "SNAPSHOT 'use'");
1063  break;
1064  }
1065  }
1066  else
1067  {
1068  switch (snapshot_action)
1069  {
1070  case CRS_EXPORT_SNAPSHOT:
1071  appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
1072  break;
1073  case CRS_NOEXPORT_SNAPSHOT:
1074  appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
1075  break;
1076  case CRS_USE_SNAPSHOT:
1077  appendStringInfoString(&cmd, "USE_SNAPSHOT");
1078  break;
1079  }
1080  }
1081 
1082  if (use_new_options_syntax)
1083  appendStringInfoChar(&cmd, ')');
1084  }
1085  else
1086  {
1087  if (use_new_options_syntax)
1088  appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
1089  else
1090  appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
1091  }
1092 
1093  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1094  pfree(cmd.data);
1095 
1097  {
1098  PQclear(res);
1099  ereport(ERROR,
1100  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1101  errmsg("could not create replication slot \"%s\": %s",
1102  slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1103  }
1104 
1105  if (lsn)
1107  CStringGetDatum(PQgetvalue(res, 0, 1))));
1108 
1109  if (!PQgetisnull(res, 0, 2))
1110  snapshot = pstrdup(PQgetvalue(res, 0, 2));
1111  else
1112  snapshot = NULL;
1113 
1114  PQclear(res);
1115 
1116  return snapshot;
1117 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7137
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3901
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:792
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:63
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
static bool two_phase
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:350
#define InvalidOid
Definition: postgres_ext.h:36
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:194
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), conn, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetDatum(), StringInfoData::data, DatumGetLSN(), DirectFunctionCall1Coll(), ereport, errcode(), errmsg(), ERROR, initStringInfo(), InvalidOid, libpqrcv_PQexec(), pchomp(), pfree(), pg_lsn_in(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetisnull(), PQgetvalue(), PQresultStatus(), PQserverVersion(), pstrdup(), res, and two_phase.

◆ libpqrcv_disconnect()

static void libpqrcv_disconnect ( WalReceiverConn conn)
static

Definition at line 880 of file libpqwalreceiver.c.

881 {
882  PQfinish(conn->streamConn);
883  PQfreemem(conn->recvBuf);
884  pfree(conn);
885 }

References conn, pfree(), PQfinish(), and PQfreemem().

◆ libpqrcv_endstreaming()

static void libpqrcv_endstreaming ( WalReceiverConn conn,
TimeLineID next_tli 
)
static

Definition at line 655 of file libpqwalreceiver.c.

656 {
657  PGresult *res;
658 
659  /*
660  * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
661  * block, but the risk seems small.
662  */
663  if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
664  PQflush(conn->streamConn))
665  ereport(ERROR,
666  (errcode(ERRCODE_CONNECTION_FAILURE),
667  errmsg("could not send end-of-streaming message to primary: %s",
668  pchomp(PQerrorMessage(conn->streamConn)))));
669 
670  *next_tli = 0;
671 
672  /*
673  * After COPY is finished, we should receive a result set indicating the
674  * next timeline's ID, or just CommandComplete if the server was shut
675  * down.
676  *
677  * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
678  * also possible in case we aborted the copy in mid-stream.
679  */
680  res = libpqrcv_PQgetResult(conn->streamConn);
682  {
683  /*
684  * Read the next timeline's ID. The server also sends the timeline's
685  * starting point, but it is ignored.
686  */
687  if (PQnfields(res) < 2 || PQntuples(res) != 1)
688  ereport(ERROR,
689  (errcode(ERRCODE_PROTOCOL_VIOLATION),
690  errmsg("unexpected result set after end-of-streaming")));
691  *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
692  PQclear(res);
693 
694  /* the result set should be followed by CommandComplete */
695  res = libpqrcv_PQgetResult(conn->streamConn);
696  }
697  else if (PQresultStatus(res) == PGRES_COPY_OUT)
698  {
699  PQclear(res);
700 
701  /* End the copy */
702  if (PQendcopy(conn->streamConn))
703  ereport(ERROR,
704  (errcode(ERRCODE_CONNECTION_FAILURE),
705  errmsg("error while shutting down streaming COPY: %s",
706  pchomp(PQerrorMessage(conn->streamConn)))));
707 
708  /* CommandComplete should follow */
709  res = libpqrcv_PQgetResult(conn->streamConn);
710  }
711 
713  ereport(ERROR,
714  (errcode(ERRCODE_PROTOCOL_VIOLATION),
715  errmsg("error reading result of streaming command: %s",
716  pchomp(PQerrorMessage(conn->streamConn)))));
717  PQclear(res);
718 
719  /* Verify that there are no more results */
720  res = libpqrcv_PQgetResult(conn->streamConn);
721  if (res != NULL)
722  ereport(ERROR,
723  (errcode(ERRCODE_PROTOCOL_VIOLATION),
724  errmsg("unexpected result after CommandComplete: %s",
725  pchomp(PQerrorMessage(conn->streamConn)))));
726 }
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2949
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2749
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
@ PGRES_COPY_OUT
Definition: libpq-fe.h:106
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
int32 pg_strtoint32(const char *s)
Definition: numutils.c:383

References conn, ereport, errcode(), errmsg(), ERROR, libpqrcv_PQgetResult(), pchomp(), pg_strtoint32(), PGRES_COMMAND_OK, PGRES_COPY_OUT, PGRES_TUPLES_OK, PQclear(), PQendcopy(), PQerrorMessage(), PQflush(), PQgetvalue(), PQnfields(), PQntuples(), PQputCopyEnd(), PQresultStatus(), and res.

◆ libpqrcv_exec()

static WalRcvExecResult * libpqrcv_exec ( WalReceiverConn conn,
const char *  query,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 1235 of file libpqwalreceiver.c.

1237 {
1238  PGresult *pgres = NULL;
1239  WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1240  char *diag_sqlstate;
1241 
1242  if (MyDatabaseId == InvalidOid)
1243  ereport(ERROR,
1244  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1245  errmsg("the query interface requires a database connection")));
1246 
1247  pgres = libpqrcv_PQexec(conn->streamConn, query);
1248 
1249  switch (PQresultStatus(pgres))
1250  {
1251  case PGRES_TUPLES_OK:
1252  case PGRES_SINGLE_TUPLE:
1253  case PGRES_TUPLES_CHUNK:
1254  walres->status = WALRCV_OK_TUPLES;
1255  libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1256  break;
1257 
1258  case PGRES_COPY_IN:
1259  walres->status = WALRCV_OK_COPY_IN;
1260  break;
1261 
1262  case PGRES_COPY_OUT:
1263  walres->status = WALRCV_OK_COPY_OUT;
1264  break;
1265 
1266  case PGRES_COPY_BOTH:
1267  walres->status = WALRCV_OK_COPY_BOTH;
1268  break;
1269 
1270  case PGRES_COMMAND_OK:
1271  walres->status = WALRCV_OK_COMMAND;
1272  break;
1273 
1274  /* Empty query is considered error. */
1275  case PGRES_EMPTY_QUERY:
1276  walres->status = WALRCV_ERROR;
1277  walres->err = _("empty query");
1278  break;
1279 
1280  case PGRES_PIPELINE_SYNC:
1282  walres->status = WALRCV_ERROR;
1283  walres->err = _("unexpected pipeline mode");
1284  break;
1285 
1286  case PGRES_NONFATAL_ERROR:
1287  case PGRES_FATAL_ERROR:
1288  case PGRES_BAD_RESPONSE:
1289  walres->status = WALRCV_ERROR;
1290  walres->err = pchomp(PQerrorMessage(conn->streamConn));
1291  diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1292  if (diag_sqlstate)
1293  walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1294  diag_sqlstate[1],
1295  diag_sqlstate[2],
1296  diag_sqlstate[3],
1297  diag_sqlstate[4]);
1298  break;
1299  }
1300 
1301  PQclear(pgres);
1302 
1303  return walres;
1304 }
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
Oid MyDatabaseId
Definition: globals.c:91
@ PGRES_COPY_IN
Definition: libpq-fe.h:107
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:112
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:117
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:111
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:113
@ PGRES_EMPTY_QUERY
Definition: libpq-fe.h:99
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:114
@ PGRES_BAD_RESPONSE
Definition: libpq-fe.h:108
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:115
@ PGRES_NONFATAL_ERROR
Definition: libpq-fe.h:110
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
WalRcvExecStatus status
Definition: walreceiver.h:219
@ WALRCV_OK_COPY_IN
Definition: walreceiver.h:207
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:204
@ WALRCV_ERROR
Definition: walreceiver.h:203
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:206
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:208
@ WALRCV_OK_COPY_BOTH
Definition: walreceiver.h:209

References _, conn, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, InvalidOid, libpqrcv_PQexec(), libpqrcv_processTuples(), MAKE_SQLSTATE, MyDatabaseId, palloc0(), pchomp(), PG_DIAG_SQLSTATE, PGRES_BAD_RESPONSE, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PGRES_EMPTY_QUERY, PGRES_FATAL_ERROR, PGRES_NONFATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_CHUNK, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQresultErrorField(), PQresultStatus(), WalRcvExecResult::sqlstate, WalRcvExecResult::status, WALRCV_ERROR, WALRCV_OK_COMMAND, WALRCV_OK_COPY_BOTH, WALRCV_OK_COPY_IN, WALRCV_OK_COPY_OUT, and WALRCV_OK_TUPLES.

◆ libpqrcv_get_backend_pid()

static pid_t libpqrcv_get_backend_pid ( WalReceiverConn conn)
static

Definition at line 1150 of file libpqwalreceiver.c.

1151 {
1152  return PQbackendPID(conn->streamConn);
1153 }
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7181

References conn, and PQbackendPID().

◆ libpqrcv_get_conninfo()

static char * libpqrcv_get_conninfo ( WalReceiverConn conn)
static

Definition at line 371 of file libpqwalreceiver.c.

372 {
373  PQconninfoOption *conn_opts;
374  PQconninfoOption *conn_opt;
376  char *retval;
377 
378  Assert(conn->streamConn != NULL);
379 
381  conn_opts = PQconninfo(conn->streamConn);
382 
383  if (conn_opts == NULL)
384  ereport(ERROR,
385  (errcode(ERRCODE_OUT_OF_MEMORY),
386  errmsg("could not parse connection string: %s",
387  _("out of memory"))));
388 
389  /* build a clean connection string from pieces */
390  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
391  {
392  bool obfuscate;
393 
394  /* Skip debug and empty options */
395  if (strchr(conn_opt->dispchar, 'D') ||
396  conn_opt->val == NULL ||
397  conn_opt->val[0] == '\0')
398  continue;
399 
400  /* Obfuscate security-sensitive options */
401  obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
402 
403  appendPQExpBuffer(&buf, "%s%s=%s",
404  buf.len == 0 ? "" : " ",
405  conn_opt->keyword,
406  obfuscate ? "********" : conn_opt->val);
407  }
408 
409  PQconninfoFree(conn_opts);
410 
411  retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
413  return retval;
414 }
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:6936
static char * buf
Definition: pg_test_fsync.c:73
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67

References _, appendPQExpBuffer(), Assert, buf, conn, _PQconninfoOption::dispchar, ereport, errcode(), errmsg(), ERROR, initPQExpBuffer(), _PQconninfoOption::keyword, PQconninfo(), PQconninfoFree(), PQExpBufferDataBroken, pstrdup(), termPQExpBuffer(), and _PQconninfoOption::val.

◆ libpqrcv_get_dbname_from_conninfo()

static char * libpqrcv_get_dbname_from_conninfo ( const char *  conninfo)
static

Definition at line 502 of file libpqwalreceiver.c.

503 {
505  char *dbname = NULL;
506  char *err = NULL;
507 
508  opts = PQconninfoParse(connInfo, &err);
509  if (opts == NULL)
510  {
511  /* The error string is malloc'd, so we must free it explicitly */
512  char *errcopy = err ? pstrdup(err) : "out of memory";
513 
514  PQfreemem(err);
515  ereport(ERROR,
516  (errcode(ERRCODE_SYNTAX_ERROR),
517  errmsg("invalid connection string syntax: %s", errcopy)));
518  }
519 
520  for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
521  {
522  /*
523  * If multiple dbnames are specified, then the last one will be
524  * returned
525  */
526  if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
527  *opt->val)
528  {
529  if (dbname)
530  pfree(dbname);
531 
532  dbname = pstrdup(opt->val);
533  }
534  }
535 
537  return dbname;
538 }
char * dbname
Definition: streamutil.c:52

References dbname, ereport, err(), errcode(), errmsg(), ERROR, opts, pfree(), PQconninfoFree(), PQconninfoParse(), PQfreemem(), and pstrdup().

◆ libpqrcv_get_senderinfo()

static void libpqrcv_get_senderinfo ( WalReceiverConn conn,
char **  sender_host,
int *  sender_port 
)
static

Definition at line 420 of file libpqwalreceiver.c.

422 {
423  char *ret = NULL;
424 
425  *sender_host = NULL;
426  *sender_port = 0;
427 
428  Assert(conn->streamConn != NULL);
429 
430  ret = PQhost(conn->streamConn);
431  if (ret && strlen(ret) != 0)
432  *sender_host = pstrdup(ret);
433 
434  ret = PQport(conn->streamConn);
435  if (ret && strlen(ret) != 0)
436  *sender_port = atoi(ret);
437 }
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:7026
char * PQport(const PGconn *conn)
Definition: fe-connect.c:7062

References Assert, conn, PQhost(), PQport(), and pstrdup().

◆ libpqrcv_identify_system()

static char * libpqrcv_identify_system ( WalReceiverConn conn,
TimeLineID primary_tli 
)
static

Definition at line 444 of file libpqwalreceiver.c.

445 {
446  PGresult *res;
447  char *primary_sysid;
448 
449  /*
450  * Get the system identifier and timeline ID as a DataRow message from the
451  * primary server.
452  */
453  res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
455  {
456  PQclear(res);
457  ereport(ERROR,
458  (errcode(ERRCODE_PROTOCOL_VIOLATION),
459  errmsg("could not receive database system identifier and timeline ID from "
460  "the primary server: %s",
461  pchomp(PQerrorMessage(conn->streamConn)))));
462  }
463 
464  /*
465  * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
466  * 9.4 and onwards.
467  */
468  if (PQnfields(res) < 3 || PQntuples(res) != 1)
469  {
470  int ntuples = PQntuples(res);
471  int nfields = PQnfields(res);
472 
473  PQclear(res);
474  ereport(ERROR,
475  (errcode(ERRCODE_PROTOCOL_VIOLATION),
476  errmsg("invalid response from primary server"),
477  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
478  ntuples, nfields, 1, 3)));
479  }
480  primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
481  *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
482  PQclear(res);
483 
484  return primary_sysid;
485 }

References conn, ereport, errcode(), errdetail(), errmsg(), ERROR, libpqrcv_PQexec(), pchomp(), pg_strtoint32(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), and res.

◆ libpqrcv_PQexec()

static PGresult * libpqrcv_PQexec ( PGconn streamConn,
const char *  query 
)
static

Definition at line 786 of file libpqwalreceiver.c.

787 {
788  PGresult *lastResult = NULL;
789 
790  /*
791  * PQexec() silently discards any prior query results on the connection.
792  * This is not required for this function as it's expected that the caller
793  * (which is this library in all cases) will behave correctly and we don't
794  * have to be backwards compatible with old libpq.
795  */
796 
797  /*
798  * Submit the query. Since we don't use non-blocking mode, this could
799  * theoretically block. In practice, since we don't send very long query
800  * strings, the risk seems negligible.
801  */
802  if (!PQsendQuery(streamConn, query))
803  return NULL;
804 
805  for (;;)
806  {
807  /* Wait for, and collect, the next PGresult. */
808  PGresult *result;
809 
810  result = libpqrcv_PQgetResult(streamConn);
811  if (result == NULL)
812  break; /* query is complete, or failure */
813 
814  /*
815  * Emulate PQexec()'s behavior of returning the last result when there
816  * are many. We are fine with returning just last error message.
817  */
818  PQclear(lastResult);
819  lastResult = result;
820 
821  if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
822  PQresultStatus(lastResult) == PGRES_COPY_OUT ||
823  PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
824  PQstatus(streamConn) == CONNECTION_BAD)
825  break;
826  }
827 
828  return lastResult;
829 }
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416

References CONNECTION_BAD, libpqrcv_PQgetResult(), PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PQclear(), PQresultStatus(), PQsendQuery(), and PQstatus().

Referenced by libpqrcv_alter_slot(), libpqrcv_connect(), libpqrcv_create_slot(), libpqrcv_exec(), libpqrcv_identify_system(), libpqrcv_readtimelinehistoryfile(), and libpqrcv_startstreaming().

◆ libpqrcv_PQgetResult()

static PGresult * libpqrcv_PQgetResult ( PGconn streamConn)
static

Definition at line 835 of file libpqwalreceiver.c.

836 {
837  /*
838  * Collect data until PQgetResult is ready to get the result without
839  * blocking.
840  */
841  while (PQisBusy(streamConn))
842  {
843  int rc;
844 
845  /*
846  * We don't need to break down the sleep into smaller increments,
847  * since we'll get interrupted by signals and can handle any
848  * interrupts here.
849  */
852  WL_LATCH_SET,
853  PQsocket(streamConn),
854  0,
855  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
856 
857  /* Interrupted? */
858  if (rc & WL_LATCH_SET)
859  {
862  }
863 
864  /* Consume whatever data is available from the socket */
865  if (PQconsumeInput(streamConn) == 0)
866  {
867  /* trouble; return NULL */
868  return NULL;
869  }
870  }
871 
872  /* Now we can collect and return the next PGresult */
873  return PQgetResult(streamConn);
874 }
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062

References MyLatch, PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ProcessWalRcvInterrupts(), ResetLatch(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by libpqrcv_endstreaming(), libpqrcv_PQexec(), and libpqrcv_receive().

◆ libpqrcv_processTuples()

static void libpqrcv_processTuples ( PGresult pgres,
WalRcvExecResult walres,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 1159 of file libpqwalreceiver.c.

1161 {
1162  int tupn;
1163  int coln;
1164  int nfields = PQnfields(pgres);
1165  HeapTuple tuple;
1166  AttInMetadata *attinmeta;
1167  MemoryContext rowcontext;
1168  MemoryContext oldcontext;
1169 
1170  /* Make sure we got expected number of fields. */
1171  if (nfields != nRetTypes)
1172  ereport(ERROR,
1173  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1174  errmsg("invalid query response"),
1175  errdetail("Expected %d fields, got %d fields.",
1176  nRetTypes, nfields)));
1177 
1178  walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1179 
1180  /* Create tuple descriptor corresponding to expected result. */
1181  walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1182  for (coln = 0; coln < nRetTypes; coln++)
1183  TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1184  PQfname(pgres, coln), retTypes[coln], -1, 0);
1185  attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1186 
1187  /* No point in doing more here if there were no tuples returned. */
1188  if (PQntuples(pgres) == 0)
1189  return;
1190 
1191  /* Create temporary context for local allocations. */
1193  "libpqrcv query result context",
1195 
1196  /* Process returned rows. */
1197  for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1198  {
1199  char *cstrs[MaxTupleAttributeNumber];
1200 
1202 
1203  /* Do the allocations in temporary context. */
1204  oldcontext = MemoryContextSwitchTo(rowcontext);
1205 
1206  /*
1207  * Fill cstrs with null-terminated strings of column values.
1208  */
1209  for (coln = 0; coln < nfields; coln++)
1210  {
1211  if (PQgetisnull(pgres, tupn, coln))
1212  cstrs[coln] = NULL;
1213  else
1214  cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1215  }
1216 
1217  /* Convert row to a tuple, and add it to the tuplestore */
1218  tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1219  tuplestore_puttuple(walres->tuplestore, tuple);
1220 
1221  /* Clean up */
1222  MemoryContextSwitchTo(oldcontext);
1223  MemoryContextReset(rowcontext);
1224  }
1225 
1226  MemoryContextDelete(rowcontext);
1227 }
int16 AttrNumber
Definition: attnum.h:21
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2222
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2173
char * PQfname(const PGresult *res, int field_num)
Definition: fe-exec.c:3567
int work_mem
Definition: globals.c:128
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
MemoryContextSwitchTo(old_ctx)
Tuplestorestate * tuplestore
Definition: walreceiver.h:222
TupleDesc tupledesc
Definition: walreceiver.h:223
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:67
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:651
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:730

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, BuildTupleFromCStrings(), CreateTemplateTupleDesc(), CurrentMemoryContext, ereport, errcode(), errdetail(), errmsg(), ERROR, MaxTupleAttributeNumber, MemoryContextDelete(), MemoryContextReset(), MemoryContextSwitchTo(), PQfname(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), ProcessWalRcvInterrupts(), WalRcvExecResult::tupledesc, TupleDescGetAttInMetadata(), TupleDescInitEntry(), WalRcvExecResult::tuplestore, tuplestore_begin_heap(), tuplestore_puttuple(), and work_mem.

Referenced by libpqrcv_exec().

◆ libpqrcv_readtimelinehistoryfile()

static void libpqrcv_readtimelinehistoryfile ( WalReceiverConn conn,
TimeLineID  tli,
char **  filename,
char **  content,
int *  len 
)
static

Definition at line 732 of file libpqwalreceiver.c.

735 {
736  PGresult *res;
737  char cmd[64];
738 
739  Assert(!conn->logical);
740 
741  /*
742  * Request the primary to send over the history file for given timeline.
743  */
744  snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
745  res = libpqrcv_PQexec(conn->streamConn, cmd);
747  {
748  PQclear(res);
749  ereport(ERROR,
750  (errcode(ERRCODE_PROTOCOL_VIOLATION),
751  errmsg("could not receive timeline history file from "
752  "the primary server: %s",
753  pchomp(PQerrorMessage(conn->streamConn)))));
754  }
755  if (PQnfields(res) != 2 || PQntuples(res) != 1)
756  {
757  int ntuples = PQntuples(res);
758  int nfields = PQnfields(res);
759 
760  PQclear(res);
761  ereport(ERROR,
762  (errcode(ERRCODE_PROTOCOL_VIOLATION),
763  errmsg("invalid response from primary server"),
764  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
765  ntuples, nfields)));
766  }
767  *filename = pstrdup(PQgetvalue(res, 0, 0));
768 
769  *len = PQgetlength(res, 0, 1);
770  *content = palloc(*len);
771  memcpy(*content, PQgetvalue(res, 0, 1), *len);
772  PQclear(res);
773 }
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3887
void * palloc(Size size)
Definition: mcxt.c:1316
const void size_t len
static char * filename
Definition: pg_dumpall.c:119
#define snprintf
Definition: port.h:238

References Assert, conn, ereport, errcode(), errdetail(), errmsg(), ERROR, filename, len, libpqrcv_PQexec(), palloc(), pchomp(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetlength(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), res, and snprintf.

◆ libpqrcv_receive()

static int libpqrcv_receive ( WalReceiverConn conn,
char **  buffer,
pgsocket wait_fd 
)
static

Definition at line 904 of file libpqwalreceiver.c.

906 {
907  int rawlen;
908 
909  PQfreemem(conn->recvBuf);
910  conn->recvBuf = NULL;
911 
912  /* Try to receive a CopyData message */
913  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
914  if (rawlen == 0)
915  {
916  /* Try consuming some data. */
917  if (PQconsumeInput(conn->streamConn) == 0)
918  ereport(ERROR,
919  (errcode(ERRCODE_CONNECTION_FAILURE),
920  errmsg("could not receive data from WAL stream: %s",
921  pchomp(PQerrorMessage(conn->streamConn)))));
922 
923  /* Now that we've consumed some input, try again */
924  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
925  if (rawlen == 0)
926  {
927  /* Tell caller to try again when our socket is ready. */
928  *wait_fd = PQsocket(conn->streamConn);
929  return 0;
930  }
931  }
932  if (rawlen == -1) /* end-of-streaming or error */
933  {
934  PGresult *res;
935 
936  res = libpqrcv_PQgetResult(conn->streamConn);
938  {
939  PQclear(res);
940 
941  /* Verify that there are no more results. */
942  res = libpqrcv_PQgetResult(conn->streamConn);
943  if (res != NULL)
944  {
945  PQclear(res);
946 
947  /*
948  * If the other side closed the connection orderly (otherwise
949  * we'd seen an error, or PGRES_COPY_IN) don't report an error
950  * here, but let callers deal with it.
951  */
952  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
953  return -1;
954 
955  ereport(ERROR,
956  (errcode(ERRCODE_PROTOCOL_VIOLATION),
957  errmsg("unexpected result after CommandComplete: %s",
958  PQerrorMessage(conn->streamConn))));
959  }
960 
961  return -1;
962  }
963  else if (PQresultStatus(res) == PGRES_COPY_IN)
964  {
965  PQclear(res);
966  return -1;
967  }
968  else
969  {
970  PQclear(res);
971  ereport(ERROR,
972  (errcode(ERRCODE_PROTOCOL_VIOLATION),
973  errmsg("could not receive data from WAL stream: %s",
974  pchomp(PQerrorMessage(conn->streamConn)))));
975  }
976  }
977  if (rawlen < -1)
978  ereport(ERROR,
979  (errcode(ERRCODE_PROTOCOL_VIOLATION),
980  errmsg("could not receive data from WAL stream: %s",
981  pchomp(PQerrorMessage(conn->streamConn)))));
982 
983  /* Return received messages to caller */
984  *buffer = conn->recvBuf;
985  return rawlen;
986 }
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2816

References conn, CONNECTION_BAD, ereport, errcode(), errmsg(), ERROR, libpqrcv_PQgetResult(), pchomp(), PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear(), PQconsumeInput(), PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQresultStatus(), PQsocket(), PQstatus(), and res.

◆ libpqrcv_send()

static void libpqrcv_send ( WalReceiverConn conn,
const char *  buffer,
int  nbytes 
)
static

Definition at line 994 of file libpqwalreceiver.c.

995 {
996  if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
997  PQflush(conn->streamConn))
998  ereport(ERROR,
999  (errcode(ERRCODE_CONNECTION_FAILURE),
1000  errmsg("could not send data to WAL stream: %s",
1001  pchomp(PQerrorMessage(conn->streamConn)))));
1002 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2695

References conn, ereport, errcode(), errmsg(), ERROR, pchomp(), PQerrorMessage(), PQflush(), and PQputCopyData().

◆ libpqrcv_server_version()

static int libpqrcv_server_version ( WalReceiverConn conn)
static

Definition at line 491 of file libpqwalreceiver.c.

492 {
493  return PQserverVersion(conn->streamConn);
494 }

References conn, and PQserverVersion().

◆ libpqrcv_startstreaming()

static bool libpqrcv_startstreaming ( WalReceiverConn conn,
const WalRcvStreamOptions options 
)
static

Definition at line 551 of file libpqwalreceiver.c.

553 {
554  StringInfoData cmd;
555  PGresult *res;
556 
557  Assert(options->logical == conn->logical);
558  Assert(options->slotname || !options->logical);
559 
560  initStringInfo(&cmd);
561 
562  /* Build the command. */
563  appendStringInfoString(&cmd, "START_REPLICATION");
564  if (options->slotname != NULL)
565  appendStringInfo(&cmd, " SLOT \"%s\"",
566  options->slotname);
567 
568  if (options->logical)
569  appendStringInfoString(&cmd, " LOGICAL");
570 
571  appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
572 
573  /*
574  * Additional options are different depending on if we are doing logical
575  * or physical replication.
576  */
577  if (options->logical)
578  {
579  char *pubnames_str;
580  List *pubnames;
581  char *pubnames_literal;
582 
583  appendStringInfoString(&cmd, " (");
584 
585  appendStringInfo(&cmd, "proto_version '%u'",
586  options->proto.logical.proto_version);
587 
588  if (options->proto.logical.streaming_str)
589  appendStringInfo(&cmd, ", streaming '%s'",
590  options->proto.logical.streaming_str);
591 
592  if (options->proto.logical.twophase &&
593  PQserverVersion(conn->streamConn) >= 150000)
594  appendStringInfoString(&cmd, ", two_phase 'on'");
595 
596  if (options->proto.logical.origin &&
597  PQserverVersion(conn->streamConn) >= 160000)
598  appendStringInfo(&cmd, ", origin '%s'",
599  options->proto.logical.origin);
600 
601  pubnames = options->proto.logical.publication_names;
602  pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
603  if (!pubnames_str)
604  ereport(ERROR,
605  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
606  errmsg("could not start WAL streaming: %s",
607  pchomp(PQerrorMessage(conn->streamConn)))));
608  pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
609  strlen(pubnames_str));
610  if (!pubnames_literal)
611  ereport(ERROR,
612  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
613  errmsg("could not start WAL streaming: %s",
614  pchomp(PQerrorMessage(conn->streamConn)))));
615  appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
616  PQfreemem(pubnames_literal);
617  pfree(pubnames_str);
618 
619  if (options->proto.logical.binary &&
620  PQserverVersion(conn->streamConn) >= 140000)
621  appendStringInfoString(&cmd, ", binary 'true'");
622 
623  appendStringInfoChar(&cmd, ')');
624  }
625  else
626  appendStringInfo(&cmd, " TIMELINE %u",
627  options->proto.physical.startpointTLI);
628 
629  /* Start streaming. */
630  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
631  pfree(cmd.data);
632 
634  {
635  PQclear(res);
636  return false;
637  }
638  else if (PQresultStatus(res) != PGRES_COPY_BOTH)
639  {
640  PQclear(res);
641  ereport(ERROR,
642  (errcode(ERRCODE_PROTOCOL_VIOLATION),
643  errmsg("could not start WAL streaming: %s",
644  pchomp(PQerrorMessage(conn->streamConn)))));
645  }
646  PQclear(res);
647  return true;
648 }
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4304
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
Definition: pg_list.h:54
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), Assert, conn, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, initStringInfo(), libpqrcv_PQexec(), LSN_FORMAT_ARGS, pchomp(), pfree(), PGRES_COMMAND_OK, PGRES_COPY_BOTH, PQclear(), PQerrorMessage(), PQescapeLiteral(), PQfreemem(), PQresultStatus(), PQserverVersion(), res, and stringlist_to_identifierstr().

◆ stringlist_to_identifierstr()

static char * stringlist_to_identifierstr ( PGconn conn,
List strings 
)
static

Definition at line 1315 of file libpqwalreceiver.c.

1316 {
1317  ListCell *lc;
1319  bool first = true;
1320 
1321  initStringInfo(&res);
1322 
1323  foreach(lc, strings)
1324  {
1325  char *val = strVal(lfirst(lc));
1326  char *val_escaped;
1327 
1328  if (first)
1329  first = false;
1330  else
1331  appendStringInfoChar(&res, ',');
1332 
1333  val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1334  if (!val_escaped)
1335  {
1336  free(res.data);
1337  return NULL;
1338  }
1339  appendStringInfoString(&res, val_escaped);
1340  PQfreemem(val_escaped);
1341  }
1342 
1343  return res.data;
1344 }
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4310
#define free(a)
Definition: header.h:65
long val
Definition: informix.c:670
#define lfirst(lc)
Definition: pg_list.h:172
#define strVal(v)
Definition: value.h:82

References appendStringInfoChar(), appendStringInfoString(), conn, free, initStringInfo(), lfirst, PQescapeIdentifier(), PQfreemem(), res, strVal, and val.

Referenced by libpqrcv_startstreaming().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 38 of file libpqwalreceiver.c.

◆ PQWalReceiverFunctions

WalReceiverFunctionsType PQWalReceiverFunctions
static
Initial value:
= {
.walrcv_connect = libpqrcv_connect,
.walrcv_check_conninfo = libpqrcv_check_conninfo,
.walrcv_get_conninfo = libpqrcv_get_conninfo,
.walrcv_get_senderinfo = libpqrcv_get_senderinfo,
.walrcv_identify_system = libpqrcv_identify_system,
.walrcv_server_version = libpqrcv_server_version,
.walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
.walrcv_startstreaming = libpqrcv_startstreaming,
.walrcv_endstreaming = libpqrcv_endstreaming,
.walrcv_receive = libpqrcv_receive,
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
.walrcv_alter_slot = libpqrcv_alter_slot,
.walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
}
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err)
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
static int libpqrcv_server_version(WalReceiverConn *conn)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, bool failover)
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
static char * libpqrcv_get_dbname_from_conninfo(const char *conninfo)

Definition at line 91 of file libpqwalreceiver.c.

Referenced by _PG_init().