PostgreSQL Source Code  git master
libpqwalreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/time.h>
#include "access/xlog.h"
#include "catalog/pg_type.h"
#include "common/connect.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "pqexpbuffer.h"
#include "replication/walreceiver.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/tuplestore.h"
Include dependency graph for libpqwalreceiver.c:

Go to the source code of this file.

Data Structures

struct  WalReceiverConn
 

Functions

static WalReceiverConnlibpqrcv_connect (const char *conninfo, bool logical, const char *appname, char **err)
 
static void libpqrcv_check_conninfo (const char *conninfo)
 
static char * libpqrcv_get_conninfo (WalReceiverConn *conn)
 
static void libpqrcv_get_senderinfo (WalReceiverConn *conn, char **sender_host, int *sender_port)
 
static char * libpqrcv_identify_system (WalReceiverConn *conn, TimeLineID *primary_tli)
 
static int libpqrcv_server_version (WalReceiverConn *conn)
 
static void libpqrcv_readtimelinehistoryfile (WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
 
static bool libpqrcv_startstreaming (WalReceiverConn *conn, const WalRcvStreamOptions *options)
 
static void libpqrcv_endstreaming (WalReceiverConn *conn, TimeLineID *next_tli)
 
static int libpqrcv_receive (WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
 
static void libpqrcv_send (WalReceiverConn *conn, const char *buffer, int nbytes)
 
static char * libpqrcv_create_slot (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
 
static pid_t libpqrcv_get_backend_pid (WalReceiverConn *conn)
 
static WalRcvExecResultlibpqrcv_exec (WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
 
static void libpqrcv_disconnect (WalReceiverConn *conn)
 
static PGresultlibpqrcv_PQexec (PGconn *streamConn, const char *query)
 
static PGresultlibpqrcv_PQgetResult (PGconn *streamConn)
 
static char * stringlist_to_identifierstr (PGconn *conn, List *strings)
 
void _PG_init (void)
 
static void libpqrcv_processTuples (PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
 

Variables

 PG_MODULE_MAGIC
 
static WalReceiverFunctionsType PQWalReceiverFunctions
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 111 of file libpqwalreceiver.c.

112 {
113  if (WalReceiverFunctions != NULL)
114  elog(ERROR, "libpqwalreceiver already loaded");
116 }
#define ERROR
Definition: elog.h:35
static WalReceiverFunctionsType PQWalReceiverFunctions
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:96

References elog(), ERROR, PQWalReceiverFunctions, and WalReceiverFunctions.

◆ libpqrcv_check_conninfo()

static void libpqrcv_check_conninfo ( const char *  conninfo)
static

Definition at line 253 of file libpqwalreceiver.c.

254 {
255  PQconninfoOption *opts = NULL;
256  char *err = NULL;
257 
258  opts = PQconninfoParse(conninfo, &err);
259  if (opts == NULL)
260  {
261  /* The error string is malloc'd, so we must free it explicitly */
262  char *errcopy = err ? pstrdup(err) : "out of memory";
263 
264  PQfreemem(err);
265  ereport(ERROR,
266  (errcode(ERRCODE_SYNTAX_ERROR),
267  errmsg("invalid connection string syntax: %s", errcopy)));
268  }
269 
271 }
int errcode(int sqlerrcode)
Definition: elog.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define ereport(elevel,...)
Definition: elog.h:145
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5357
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:6576
void PQfreemem(void *ptr)
Definition: fe-exec.c:3865
char * pstrdup(const char *in)
Definition: mcxt.c:1483
static AmcheckOptions opts
Definition: pg_amcheck.c:110

References ereport, errcode(), errmsg(), ERROR, opts, PQconninfoFree(), PQconninfoParse(), PQfreemem(), and pstrdup().

◆ libpqrcv_connect()

static WalReceiverConn * libpqrcv_connect ( const char *  conninfo,
bool  logical,
const char *  appname,
char **  err 
)
static

Definition at line 124 of file libpqwalreceiver.c.

126 {
129  const char *keys[6];
130  const char *vals[6];
131  int i = 0;
132 
133  /*
134  * We use the expand_dbname parameter to process the connection string (or
135  * URI), and pass some extra options.
136  */
137  keys[i] = "dbname";
138  vals[i] = conninfo;
139  keys[++i] = "replication";
140  vals[i] = logical ? "database" : "true";
141  if (!logical)
142  {
143  /*
144  * The database name is ignored by the server in replication mode, but
145  * specify "replication" for .pgpass lookup.
146  */
147  keys[++i] = "dbname";
148  vals[i] = "replication";
149  }
150  keys[++i] = "fallback_application_name";
151  vals[i] = appname;
152  if (logical)
153  {
154  /* Tell the publisher to translate to our encoding */
155  keys[++i] = "client_encoding";
156  vals[i] = GetDatabaseEncodingName();
157 
158  /*
159  * Force assorted GUC parameters to settings that ensure that the
160  * publisher will output data values in a form that is unambiguous to
161  * the subscriber. (We don't want to modify the subscriber's GUC
162  * settings, since that might surprise user-defined code running in
163  * the subscriber, such as triggers.) This should match what pg_dump
164  * does.
165  */
166  keys[++i] = "options";
167  vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
168  }
169  keys[++i] = NULL;
170  vals[i] = NULL;
171 
172  Assert(i < sizeof(keys));
173 
174  conn = palloc0(sizeof(WalReceiverConn));
175  conn->streamConn = PQconnectStartParams(keys, vals,
176  /* expand_dbname = */ true);
177  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
178  {
179  *err = pchomp(PQerrorMessage(conn->streamConn));
180  return NULL;
181  }
182 
183  /*
184  * Poll connection until we have OK or FAILED status.
185  *
186  * Per spec for PQconnectPoll, first wait till socket is write-ready.
187  */
189  do
190  {
191  int io_flag;
192  int rc;
193 
195  io_flag = WL_SOCKET_READABLE;
196 #ifdef WIN32
197  /* Windows needs a different test while waiting for connection-made */
198  else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
199  io_flag = WL_SOCKET_CONNECTED;
200 #endif
201  else
202  io_flag = WL_SOCKET_WRITEABLE;
203 
205  WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
206  PQsocket(conn->streamConn),
207  0,
209 
210  /* Interrupted? */
211  if (rc & WL_LATCH_SET)
212  {
215  }
216 
217  /* If socket is ready, advance the libpq state machine */
218  if (rc & io_flag)
219  status = PQconnectPoll(conn->streamConn);
221 
222  if (PQstatus(conn->streamConn) != CONNECTION_OK)
223  {
224  *err = pchomp(PQerrorMessage(conn->streamConn));
225  return NULL;
226  }
227 
228  if (logical)
229  {
230  PGresult *res;
231 
232  res = libpqrcv_PQexec(conn->streamConn,
235  {
236  PQclear(res);
237  ereport(ERROR,
238  (errmsg("could not clear search path: %s",
239  pchomp(PQerrorMessage(conn->streamConn)))));
240  }
241  PQclear(res);
242  }
243 
244  conn->logical = logical;
245 
246  return conn;
247 }
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:754
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2210
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6743
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6690
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6769
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3240
struct Latch * MyLatch
Definition: globals.c:58
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:524
void ResetLatch(Latch *latch)
Definition: latch.c:683
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define WL_SOCKET_CONNECTED
Definition: latch.h:135
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
@ CONNECTION_STARTED
Definition: libpq-fe.h:68
@ CONNECTION_BAD
Definition: libpq-fe.h:61
@ CONNECTION_OK
Definition: libpq-fe.h:60
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:100
PostgresPollingStatusType
Definition: libpq-fe.h:85
@ PGRES_POLLING_OK
Definition: libpq-fe.h:89
@ PGRES_POLLING_READING
Definition: libpq-fe.h:87
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:88
@ PGRES_POLLING_FAILED
Definition: libpq-fe.h:86
Assert(fmt[strlen(fmt) - 1] !='\n')
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1216
char * pchomp(const char *in)
Definition: mcxt.c:1511
void * palloc0(Size size)
Definition: mcxt.c:1230
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
PGconn * conn
Definition: streamutil.c:54
@ WAIT_EVENT_LIBPQWALRECEIVER_CONNECT
Definition: wait_event.h:65
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:166

References ALWAYS_SECURE_SEARCH_PATH_SQL, Assert(), conn, CONNECTION_BAD, CONNECTION_OK, CONNECTION_STARTED, ereport, errmsg(), ERROR, GetDatabaseEncodingName(), i, libpqrcv_PQexec(), MyLatch, palloc0(), pchomp(), PGRES_POLLING_FAILED, PGRES_POLLING_OK, PGRES_POLLING_READING, PGRES_POLLING_WRITING, PGRES_TUPLES_OK, PQclear(), PQconnectPoll(), PQconnectStartParams(), PQerrorMessage(), PQresultStatus(), PQsocket(), PQstatus(), ProcessWalRcvInterrupts(), res, ResetLatch(), status(), WAIT_EVENT_LIBPQWALRECEIVER_CONNECT, WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_CONNECTED, WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

◆ libpqrcv_create_slot()

static char * libpqrcv_create_slot ( WalReceiverConn conn,
const char *  slotname,
bool  temporary,
bool  two_phase,
CRSSnapshotAction  snapshot_action,
XLogRecPtr lsn 
)
static

Definition at line 871 of file libpqwalreceiver.c.

874 {
875  PGresult *res;
876  StringInfoData cmd;
877  char *snapshot;
878  int use_new_options_syntax;
879 
880  use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
881 
882  initStringInfo(&cmd);
883 
884  appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
885 
886  if (temporary)
887  appendStringInfoString(&cmd, " TEMPORARY");
888 
889  if (conn->logical)
890  {
891  appendStringInfoString(&cmd, " LOGICAL pgoutput ");
892  if (use_new_options_syntax)
893  appendStringInfoChar(&cmd, '(');
894  if (two_phase)
895  {
896  appendStringInfoString(&cmd, "TWO_PHASE");
897  if (use_new_options_syntax)
898  appendStringInfoString(&cmd, ", ");
899  else
900  appendStringInfoChar(&cmd, ' ');
901  }
902 
903  if (use_new_options_syntax)
904  {
905  switch (snapshot_action)
906  {
907  case CRS_EXPORT_SNAPSHOT:
908  appendStringInfoString(&cmd, "SNAPSHOT 'export'");
909  break;
911  appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
912  break;
913  case CRS_USE_SNAPSHOT:
914  appendStringInfoString(&cmd, "SNAPSHOT 'use'");
915  break;
916  }
917  }
918  else
919  {
920  switch (snapshot_action)
921  {
922  case CRS_EXPORT_SNAPSHOT:
923  appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
924  break;
926  appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
927  break;
928  case CRS_USE_SNAPSHOT:
929  appendStringInfoString(&cmd, "USE_SNAPSHOT");
930  break;
931  }
932  }
933 
934  if (use_new_options_syntax)
935  appendStringInfoChar(&cmd, ')');
936  }
937  else
938  {
939  if (use_new_options_syntax)
940  appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
941  else
942  appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
943  }
944 
945  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
946  pfree(cmd.data);
947 
949  {
950  PQclear(res);
951  ereport(ERROR,
952  (errcode(ERRCODE_PROTOCOL_VIOLATION),
953  errmsg("could not create replication slot \"%s\": %s",
954  slotname, pchomp(PQerrorMessage(conn->streamConn)))));
955  }
956 
957  if (lsn)
959  CStringGetDatum(PQgetvalue(res, 0, 1))));
960 
961  if (!PQgetisnull(res, 0, 2))
962  snapshot = pstrdup(PQgetvalue(res, 0, 2));
963  else
964  snapshot = NULL;
965 
966  PQclear(res);
967 
968  return snapshot;
969 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6733
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3705
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3730
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:777
void pfree(void *pointer)
Definition: mcxt.c:1306
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
static bool two_phase
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:698
#define InvalidOid
Definition: postgres_ext.h:36
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), conn, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetDatum(), StringInfoData::data, DatumGetLSN(), DirectFunctionCall1Coll(), ereport, errcode(), errmsg(), ERROR, initStringInfo(), InvalidOid, libpqrcv_PQexec(), pchomp(), pfree(), pg_lsn_in(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetisnull(), PQgetvalue(), PQresultStatus(), PQserverVersion(), pstrdup(), res, and two_phase.

◆ libpqrcv_disconnect()

static void libpqrcv_disconnect ( WalReceiverConn conn)
static

Definition at line 741 of file libpqwalreceiver.c.

742 {
743  PQfinish(conn->streamConn);
744  PQfreemem(conn->recvBuf);
745  pfree(conn);
746 }
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4130

References conn, pfree(), PQfinish(), and PQfreemem().

◆ libpqrcv_endstreaming()

static void libpqrcv_endstreaming ( WalReceiverConn conn,
TimeLineID next_tli 
)
static

Definition at line 513 of file libpqwalreceiver.c.

514 {
515  PGresult *res;
516 
517  /*
518  * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
519  * block, but the risk seems small.
520  */
521  if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
522  PQflush(conn->streamConn))
523  ereport(ERROR,
524  (errcode(ERRCODE_CONNECTION_FAILURE),
525  errmsg("could not send end-of-streaming message to primary: %s",
526  pchomp(PQerrorMessage(conn->streamConn)))));
527 
528  *next_tli = 0;
529 
530  /*
531  * After COPY is finished, we should receive a result set indicating the
532  * next timeline's ID, or just CommandComplete if the server was shut
533  * down.
534  *
535  * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
536  * also possible in case we aborted the copy in mid-stream.
537  */
538  res = libpqrcv_PQgetResult(conn->streamConn);
540  {
541  /*
542  * Read the next timeline's ID. The server also sends the timeline's
543  * starting point, but it is ignored.
544  */
545  if (PQnfields(res) < 2 || PQntuples(res) != 1)
546  ereport(ERROR,
547  (errcode(ERRCODE_PROTOCOL_VIOLATION),
548  errmsg("unexpected result set after end-of-streaming")));
549  *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
550  PQclear(res);
551 
552  /* the result set should be followed by CommandComplete */
553  res = libpqrcv_PQgetResult(conn->streamConn);
554  }
555  else if (PQresultStatus(res) == PGRES_COPY_OUT)
556  {
557  PQclear(res);
558 
559  /* End the copy */
560  if (PQendcopy(conn->streamConn))
561  ereport(ERROR,
562  (errcode(ERRCODE_CONNECTION_FAILURE),
563  errmsg("error while shutting down streaming COPY: %s",
564  pchomp(PQerrorMessage(conn->streamConn)))));
565 
566  /* CommandComplete should follow */
567  res = libpqrcv_PQgetResult(conn->streamConn);
568  }
569 
571  ereport(ERROR,
572  (errcode(ERRCODE_PROTOCOL_VIOLATION),
573  errmsg("error reading result of streaming command: %s",
574  pchomp(PQerrorMessage(conn->streamConn)))));
575  PQclear(res);
576 
577  /* Verify that there are no more results */
578  res = libpqrcv_PQgetResult(conn->streamConn);
579  if (res != NULL)
580  ereport(ERROR,
581  (errcode(ERRCODE_PROTOCOL_VIOLATION),
582  errmsg("unexpected result after CommandComplete: %s",
583  pchomp(PQerrorMessage(conn->streamConn)))));
584 }
int PQflush(PGconn *conn)
Definition: fe-exec.c:3833
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2831
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2631
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3310
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3318
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
@ PGRES_COPY_OUT
Definition: libpq-fe.h:103
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
int32 pg_strtoint32(const char *s)
Definition: numutils.c:177

References conn, ereport, errcode(), errmsg(), ERROR, libpqrcv_PQgetResult(), pchomp(), pg_strtoint32(), PGRES_COMMAND_OK, PGRES_COPY_OUT, PGRES_TUPLES_OK, PQclear(), PQendcopy(), PQerrorMessage(), PQflush(), PQgetvalue(), PQnfields(), PQntuples(), PQputCopyEnd(), PQresultStatus(), and res.

◆ libpqrcv_exec()

static WalRcvExecResult * libpqrcv_exec ( WalReceiverConn conn,
const char *  query,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 1060 of file libpqwalreceiver.c.

1062 {
1063  PGresult *pgres = NULL;
1064  WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1065  char *diag_sqlstate;
1066 
1067  if (MyDatabaseId == InvalidOid)
1068  ereport(ERROR,
1069  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1070  errmsg("the query interface requires a database connection")));
1071 
1072  pgres = libpqrcv_PQexec(conn->streamConn, query);
1073 
1074  switch (PQresultStatus(pgres))
1075  {
1076  case PGRES_SINGLE_TUPLE:
1077  case PGRES_TUPLES_OK:
1078  walres->status = WALRCV_OK_TUPLES;
1079  libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1080  break;
1081 
1082  case PGRES_COPY_IN:
1083  walres->status = WALRCV_OK_COPY_IN;
1084  break;
1085 
1086  case PGRES_COPY_OUT:
1087  walres->status = WALRCV_OK_COPY_OUT;
1088  break;
1089 
1090  case PGRES_COPY_BOTH:
1091  walres->status = WALRCV_OK_COPY_BOTH;
1092  break;
1093 
1094  case PGRES_COMMAND_OK:
1095  walres->status = WALRCV_OK_COMMAND;
1096  break;
1097 
1098  /* Empty query is considered error. */
1099  case PGRES_EMPTY_QUERY:
1100  walres->status = WALRCV_ERROR;
1101  walres->err = _("empty query");
1102  break;
1103 
1104  case PGRES_PIPELINE_SYNC:
1106  walres->status = WALRCV_ERROR;
1107  walres->err = _("unexpected pipeline mode");
1108  break;
1109 
1110  case PGRES_NONFATAL_ERROR:
1111  case PGRES_FATAL_ERROR:
1112  case PGRES_BAD_RESPONSE:
1113  walres->status = WALRCV_ERROR;
1114  walres->err = pchomp(PQerrorMessage(conn->streamConn));
1115  diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1116  if (diag_sqlstate)
1117  walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1118  diag_sqlstate[1],
1119  diag_sqlstate[2],
1120  diag_sqlstate[3],
1121  diag_sqlstate[4]);
1122  break;
1123  }
1124 
1125  PQclear(pgres);
1126 
1127  return walres;
1128 }
#define _(x)
Definition: elog.c:90
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:52
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3295
Oid MyDatabaseId
Definition: globals.c:89
@ PGRES_COPY_IN
Definition: libpq-fe.h:104
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:109
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:108
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:110
@ PGRES_EMPTY_QUERY
Definition: libpq-fe.h:96
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:111
@ PGRES_BAD_RESPONSE
Definition: libpq-fe.h:105
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:112
@ PGRES_NONFATAL_ERROR
Definition: libpq-fe.h:107
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
WalRcvExecStatus status
Definition: walreceiver.h:220
@ WALRCV_OK_COPY_IN
Definition: walreceiver.h:208
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_ERROR
Definition: walreceiver.h:204
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
@ WALRCV_OK_COPY_BOTH
Definition: walreceiver.h:210

References _, conn, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, InvalidOid, libpqrcv_PQexec(), libpqrcv_processTuples(), MAKE_SQLSTATE, MyDatabaseId, palloc0(), pchomp(), PG_DIAG_SQLSTATE, PGRES_BAD_RESPONSE, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PGRES_EMPTY_QUERY, PGRES_FATAL_ERROR, PGRES_NONFATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQresultErrorField(), PQresultStatus(), WalRcvExecResult::sqlstate, WalRcvExecResult::status, WALRCV_ERROR, WALRCV_OK_COMMAND, WALRCV_OK_COPY_BOTH, WALRCV_OK_COPY_IN, WALRCV_OK_COPY_OUT, and WALRCV_OK_TUPLES.

◆ libpqrcv_get_backend_pid()

static pid_t libpqrcv_get_backend_pid ( WalReceiverConn conn)
static

Definition at line 975 of file libpqwalreceiver.c.

976 {
977  return PQbackendPID(conn->streamConn);
978 }
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:6777

References conn, and PQbackendPID().

◆ libpqrcv_get_conninfo()

static char * libpqrcv_get_conninfo ( WalReceiverConn conn)
static

Definition at line 278 of file libpqwalreceiver.c.

279 {
280  PQconninfoOption *conn_opts;
281  PQconninfoOption *conn_opt;
283  char *retval;
284 
285  Assert(conn->streamConn != NULL);
286 
288  conn_opts = PQconninfo(conn->streamConn);
289 
290  if (conn_opts == NULL)
291  ereport(ERROR,
292  (errcode(ERRCODE_OUT_OF_MEMORY),
293  errmsg("could not parse connection string: %s",
294  _("out of memory"))));
295 
296  /* build a clean connection string from pieces */
297  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
298  {
299  bool obfuscate;
300 
301  /* Skip debug and empty options */
302  if (strchr(conn_opt->dispchar, 'D') ||
303  conn_opt->val == NULL ||
304  conn_opt->val[0] == '\0')
305  continue;
306 
307  /* Obfuscate security-sensitive options */
308  obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
309 
310  appendPQExpBuffer(&buf, "%s%s=%s",
311  buf.len == 0 ? "" : " ",
312  conn_opt->keyword,
313  obfuscate ? "********" : conn_opt->val);
314  }
315 
316  PQconninfoFree(conn_opts);
317 
318  retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
320  return retval;
321 }
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:6532
static char * buf
Definition: pg_test_fsync.c:67
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67

References _, appendPQExpBuffer(), Assert(), buf, conn, _PQconninfoOption::dispchar, ereport, errcode(), errmsg(), ERROR, initPQExpBuffer(), _PQconninfoOption::keyword, PQconninfo(), PQconninfoFree(), PQExpBufferDataBroken, pstrdup(), termPQExpBuffer(), and _PQconninfoOption::val.

◆ libpqrcv_get_senderinfo()

static void libpqrcv_get_senderinfo ( WalReceiverConn conn,
char **  sender_host,
int *  sender_port 
)
static

Definition at line 327 of file libpqwalreceiver.c.

329 {
330  char *ret = NULL;
331 
332  *sender_host = NULL;
333  *sender_port = 0;
334 
335  Assert(conn->streamConn != NULL);
336 
337  ret = PQhost(conn->streamConn);
338  if (ret && strlen(ret) != 0)
339  *sender_host = pstrdup(ret);
340 
341  ret = PQport(conn->streamConn);
342  if (ret && strlen(ret) != 0)
343  *sender_port = atoi(ret);
344 }
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:6622
char * PQport(const PGconn *conn)
Definition: fe-connect.c:6658

References Assert(), conn, PQhost(), PQport(), and pstrdup().

◆ libpqrcv_identify_system()

static char * libpqrcv_identify_system ( WalReceiverConn conn,
TimeLineID primary_tli 
)
static

Definition at line 351 of file libpqwalreceiver.c.

352 {
353  PGresult *res;
354  char *primary_sysid;
355 
356  /*
357  * Get the system identifier and timeline ID as a DataRow message from the
358  * primary server.
359  */
360  res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
362  {
363  PQclear(res);
364  ereport(ERROR,
365  (errcode(ERRCODE_PROTOCOL_VIOLATION),
366  errmsg("could not receive database system identifier and timeline ID from "
367  "the primary server: %s",
368  pchomp(PQerrorMessage(conn->streamConn)))));
369  }
370  if (PQnfields(res) < 3 || PQntuples(res) != 1)
371  {
372  int ntuples = PQntuples(res);
373  int nfields = PQnfields(res);
374 
375  PQclear(res);
376  ereport(ERROR,
377  (errcode(ERRCODE_PROTOCOL_VIOLATION),
378  errmsg("invalid response from primary server"),
379  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
380  ntuples, nfields, 3, 1)));
381  }
382  primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
383  *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
384  PQclear(res);
385 
386  return primary_sysid;
387 }
int errdetail(const char *fmt,...)
Definition: elog.c:1039

References conn, ereport, errcode(), errdetail(), errmsg(), ERROR, libpqrcv_PQexec(), pchomp(), pg_strtoint32(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), and res.

◆ libpqrcv_PQexec()

static PGresult * libpqrcv_PQexec ( PGconn streamConn,
const char *  query 
)
static

Definition at line 647 of file libpqwalreceiver.c.

648 {
649  PGresult *lastResult = NULL;
650 
651  /*
652  * PQexec() silently discards any prior query results on the connection.
653  * This is not required for this function as it's expected that the caller
654  * (which is this library in all cases) will behave correctly and we don't
655  * have to be backwards compatible with old libpq.
656  */
657 
658  /*
659  * Submit the query. Since we don't use non-blocking mode, this could
660  * theoretically block. In practice, since we don't send very long query
661  * strings, the risk seems negligible.
662  */
663  if (!PQsendQuery(streamConn, query))
664  return NULL;
665 
666  for (;;)
667  {
668  /* Wait for, and collect, the next PGresult. */
669  PGresult *result;
670 
671  result = libpqrcv_PQgetResult(streamConn);
672  if (result == NULL)
673  break; /* query is complete, or failure */
674 
675  /*
676  * Emulate PQexec()'s behavior of returning the last result when there
677  * are many. We are fine with returning just last error message.
678  */
679  PQclear(lastResult);
680  lastResult = result;
681 
682  if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
683  PQresultStatus(lastResult) == PGRES_COPY_OUT ||
684  PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
685  PQstatus(streamConn) == CONNECTION_BAD)
686  break;
687  }
688 
689  return lastResult;
690 }
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1418

References CONNECTION_BAD, libpqrcv_PQgetResult(), PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PQclear(), PQresultStatus(), PQsendQuery(), and PQstatus().

Referenced by libpqrcv_connect(), libpqrcv_create_slot(), libpqrcv_exec(), libpqrcv_identify_system(), libpqrcv_readtimelinehistoryfile(), and libpqrcv_startstreaming().

◆ libpqrcv_PQgetResult()

static PGresult * libpqrcv_PQgetResult ( PGconn streamConn)
static

Definition at line 696 of file libpqwalreceiver.c.

697 {
698  /*
699  * Collect data until PQgetResult is ready to get the result without
700  * blocking.
701  */
702  while (PQisBusy(streamConn))
703  {
704  int rc;
705 
706  /*
707  * We don't need to break down the sleep into smaller increments,
708  * since we'll get interrupted by signals and can handle any
709  * interrupts here.
710  */
713  WL_LATCH_SET,
714  PQsocket(streamConn),
715  0,
717 
718  /* Interrupted? */
719  if (rc & WL_LATCH_SET)
720  {
723  }
724 
725  /* Consume whatever data is available from the socket */
726  if (PQconsumeInput(streamConn) == 0)
727  {
728  /* trouble; return NULL */
729  return NULL;
730  }
731  }
732 
733  /* Now we can collect and return the next PGresult */
734  return PQgetResult(streamConn);
735 }
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1953
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2000
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2031
@ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE
Definition: wait_event.h:66

References MyLatch, PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ProcessWalRcvInterrupts(), ResetLatch(), WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE, WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by libpqrcv_endstreaming(), libpqrcv_PQexec(), and libpqrcv_receive().

◆ libpqrcv_processTuples()

static void libpqrcv_processTuples ( PGresult pgres,
WalRcvExecResult walres,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 984 of file libpqwalreceiver.c.

986 {
987  int tupn;
988  int coln;
989  int nfields = PQnfields(pgres);
990  HeapTuple tuple;
991  AttInMetadata *attinmeta;
992  MemoryContext rowcontext;
993  MemoryContext oldcontext;
994 
995  /* Make sure we got expected number of fields. */
996  if (nfields != nRetTypes)
997  ereport(ERROR,
998  (errcode(ERRCODE_PROTOCOL_VIOLATION),
999  errmsg("invalid query response"),
1000  errdetail("Expected %d fields, got %d fields.",
1001  nRetTypes, nfields)));
1002 
1003  walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1004 
1005  /* Create tuple descriptor corresponding to expected result. */
1006  walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1007  for (coln = 0; coln < nRetTypes; coln++)
1008  TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1009  PQfname(pgres, coln), retTypes[coln], -1, 0);
1010  attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1011 
1012  /* No point in doing more here if there were no tuples returned. */
1013  if (PQntuples(pgres) == 0)
1014  return;
1015 
1016  /* Create temporary context for local allocations. */
1018  "libpqrcv query result context",
1020 
1021  /* Process returned rows. */
1022  for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1023  {
1024  char *cstrs[MaxTupleAttributeNumber];
1025 
1027 
1028  /* Do the allocations in temporary context. */
1029  oldcontext = MemoryContextSwitchTo(rowcontext);
1030 
1031  /*
1032  * Fill cstrs with null-terminated strings of column values.
1033  */
1034  for (coln = 0; coln < nfields; coln++)
1035  {
1036  if (PQgetisnull(pgres, tupn, coln))
1037  cstrs[coln] = NULL;
1038  else
1039  cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1040  }
1041 
1042  /* Convert row to a tuple, and add it to the tuplestore */
1043  tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1044  tuplestore_puttuple(walres->tuplestore, tuple);
1045 
1046  /* Clean up */
1047  MemoryContextSwitchTo(oldcontext);
1048  MemoryContextReset(rowcontext);
1049  }
1050 
1051  MemoryContextDelete(rowcontext);
1052 }
int16 AttrNumber
Definition: attnum.h:21
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2135
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2086
char * PQfname(const PGresult *res, int field_num)
Definition: fe-exec.c:3396
int work_mem
Definition: globals.c:125
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:303
MemoryContext CurrentMemoryContext
Definition: mcxt.c:124
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:376
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:135
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:583
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:730

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, BuildTupleFromCStrings(), CreateTemplateTupleDesc(), CurrentMemoryContext, ereport, errcode(), errdetail(), errmsg(), ERROR, MaxTupleAttributeNumber, MemoryContextDelete(), MemoryContextReset(), MemoryContextSwitchTo(), PQfname(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), ProcessWalRcvInterrupts(), WalRcvExecResult::tupledesc, TupleDescGetAttInMetadata(), TupleDescInitEntry(), WalRcvExecResult::tuplestore, tuplestore_begin_heap(), tuplestore_puttuple(), and work_mem.

Referenced by libpqrcv_exec().

◆ libpqrcv_readtimelinehistoryfile()

static void libpqrcv_readtimelinehistoryfile ( WalReceiverConn conn,
TimeLineID  tli,
char **  filename,
char **  content,
int *  len 
)
static

Definition at line 590 of file libpqwalreceiver.c.

593 {
594  PGresult *res;
595  char cmd[64];
596 
597  Assert(!conn->logical);
598 
599  /*
600  * Request the primary to send over the history file for given timeline.
601  */
602  snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
603  res = libpqrcv_PQexec(conn->streamConn, cmd);
605  {
606  PQclear(res);
607  ereport(ERROR,
608  (errcode(ERRCODE_PROTOCOL_VIOLATION),
609  errmsg("could not receive timeline history file from "
610  "the primary server: %s",
611  pchomp(PQerrorMessage(conn->streamConn)))));
612  }
613  if (PQnfields(res) != 2 || PQntuples(res) != 1)
614  {
615  int ntuples = PQntuples(res);
616  int nfields = PQnfields(res);
617 
618  PQclear(res);
619  ereport(ERROR,
620  (errcode(ERRCODE_PROTOCOL_VIOLATION),
621  errmsg("invalid response from primary server"),
622  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
623  ntuples, nfields)));
624  }
625  *filename = pstrdup(PQgetvalue(res, 0, 0));
626 
627  *len = PQgetlength(res, 0, 1);
628  *content = palloc(*len);
629  memcpy(*content, PQgetvalue(res, 0, 1), *len);
630  PQclear(res);
631 }
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3716
void * palloc(Size size)
Definition: mcxt.c:1199
const void size_t len
static char * filename
Definition: pg_dumpall.c:119
#define snprintf
Definition: port.h:238

References Assert(), conn, ereport, errcode(), errdetail(), errmsg(), ERROR, filename, len, libpqrcv_PQexec(), palloc(), pchomp(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetlength(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), res, and snprintf.

◆ libpqrcv_receive()

static int libpqrcv_receive ( WalReceiverConn conn,
char **  buffer,
pgsocket wait_fd 
)
static

Definition at line 765 of file libpqwalreceiver.c.

767 {
768  int rawlen;
769 
770  PQfreemem(conn->recvBuf);
771  conn->recvBuf = NULL;
772 
773  /* Try to receive a CopyData message */
774  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
775  if (rawlen == 0)
776  {
777  /* Try consuming some data. */
778  if (PQconsumeInput(conn->streamConn) == 0)
779  ereport(ERROR,
780  (errcode(ERRCODE_CONNECTION_FAILURE),
781  errmsg("could not receive data from WAL stream: %s",
782  pchomp(PQerrorMessage(conn->streamConn)))));
783 
784  /* Now that we've consumed some input, try again */
785  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
786  if (rawlen == 0)
787  {
788  /* Tell caller to try again when our socket is ready. */
789  *wait_fd = PQsocket(conn->streamConn);
790  return 0;
791  }
792  }
793  if (rawlen == -1) /* end-of-streaming or error */
794  {
795  PGresult *res;
796 
797  res = libpqrcv_PQgetResult(conn->streamConn);
799  {
800  PQclear(res);
801 
802  /* Verify that there are no more results. */
803  res = libpqrcv_PQgetResult(conn->streamConn);
804  if (res != NULL)
805  {
806  PQclear(res);
807 
808  /*
809  * If the other side closed the connection orderly (otherwise
810  * we'd seen an error, or PGRES_COPY_IN) don't report an error
811  * here, but let callers deal with it.
812  */
813  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
814  return -1;
815 
816  ereport(ERROR,
817  (errcode(ERRCODE_PROTOCOL_VIOLATION),
818  errmsg("unexpected result after CommandComplete: %s",
819  PQerrorMessage(conn->streamConn))));
820  }
821 
822  return -1;
823  }
824  else if (PQresultStatus(res) == PGRES_COPY_IN)
825  {
826  PQclear(res);
827  return -1;
828  }
829  else
830  {
831  PQclear(res);
832  ereport(ERROR,
833  (errcode(ERRCODE_PROTOCOL_VIOLATION),
834  errmsg("could not receive data from WAL stream: %s",
835  pchomp(PQerrorMessage(conn->streamConn)))));
836  }
837  }
838  if (rawlen < -1)
839  ereport(ERROR,
840  (errcode(ERRCODE_PROTOCOL_VIOLATION),
841  errmsg("could not receive data from WAL stream: %s",
842  pchomp(PQerrorMessage(conn->streamConn)))));
843 
844  /* Return received messages to caller */
845  *buffer = conn->recvBuf;
846  return rawlen;
847 }
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2698

References conn, CONNECTION_BAD, ereport, errcode(), errmsg(), ERROR, libpqrcv_PQgetResult(), pchomp(), PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear(), PQconsumeInput(), PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQresultStatus(), PQsocket(), PQstatus(), and res.

◆ libpqrcv_send()

static void libpqrcv_send ( WalReceiverConn conn,
const char *  buffer,
int  nbytes 
)
static

Definition at line 855 of file libpqwalreceiver.c.

856 {
857  if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
858  PQflush(conn->streamConn))
859  ereport(ERROR,
860  (errcode(ERRCODE_CONNECTION_FAILURE),
861  errmsg("could not send data to WAL stream: %s",
862  pchomp(PQerrorMessage(conn->streamConn)))));
863 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2576

References conn, ereport, errcode(), errmsg(), ERROR, pchomp(), PQerrorMessage(), PQflush(), and PQputCopyData().

◆ libpqrcv_server_version()

static int libpqrcv_server_version ( WalReceiverConn conn)
static

Definition at line 393 of file libpqwalreceiver.c.

394 {
395  return PQserverVersion(conn->streamConn);
396 }

References conn, and PQserverVersion().

◆ libpqrcv_startstreaming()

static bool libpqrcv_startstreaming ( WalReceiverConn conn,
const WalRcvStreamOptions options 
)
static

Definition at line 409 of file libpqwalreceiver.c.

411 {
412  StringInfoData cmd;
413  PGresult *res;
414 
415  Assert(options->logical == conn->logical);
416  Assert(options->slotname || !options->logical);
417 
418  initStringInfo(&cmd);
419 
420  /* Build the command. */
421  appendStringInfoString(&cmd, "START_REPLICATION");
422  if (options->slotname != NULL)
423  appendStringInfo(&cmd, " SLOT \"%s\"",
424  options->slotname);
425 
426  if (options->logical)
427  appendStringInfoString(&cmd, " LOGICAL");
428 
429  appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
430 
431  /*
432  * Additional options are different depending on if we are doing logical
433  * or physical replication.
434  */
435  if (options->logical)
436  {
437  char *pubnames_str;
438  List *pubnames;
439  char *pubnames_literal;
440 
441  appendStringInfoString(&cmd, " (");
442 
443  appendStringInfo(&cmd, "proto_version '%u'",
444  options->proto.logical.proto_version);
445 
446  if (options->proto.logical.streaming &&
447  PQserverVersion(conn->streamConn) >= 140000)
448  appendStringInfoString(&cmd, ", streaming 'on'");
449 
450  if (options->proto.logical.twophase &&
451  PQserverVersion(conn->streamConn) >= 150000)
452  appendStringInfoString(&cmd, ", two_phase 'on'");
453 
454  if (options->proto.logical.origin &&
455  PQserverVersion(conn->streamConn) >= 160000)
456  appendStringInfo(&cmd, ", origin '%s'",
457  options->proto.logical.origin);
458 
459  pubnames = options->proto.logical.publication_names;
460  pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
461  if (!pubnames_str)
462  ereport(ERROR,
463  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
464  errmsg("could not start WAL streaming: %s",
465  pchomp(PQerrorMessage(conn->streamConn)))));
466  pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
467  strlen(pubnames_str));
468  if (!pubnames_literal)
469  ereport(ERROR,
470  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
471  errmsg("could not start WAL streaming: %s",
472  pchomp(PQerrorMessage(conn->streamConn)))));
473  appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
474  PQfreemem(pubnames_literal);
475  pfree(pubnames_str);
476 
477  if (options->proto.logical.binary &&
478  PQserverVersion(conn->streamConn) >= 140000)
479  appendStringInfoString(&cmd, ", binary 'true'");
480 
481  appendStringInfoChar(&cmd, ')');
482  }
483  else
484  appendStringInfo(&cmd, " TIMELINE %u",
485  options->proto.physical.startpointTLI);
486 
487  /* Start streaming. */
488  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
489  pfree(cmd.data);
490 
492  {
493  PQclear(res);
494  return false;
495  }
496  else if (PQresultStatus(res) != PGRES_COPY_BOTH)
497  {
498  PQclear(res);
499  ereport(ERROR,
500  (errcode(ERRCODE_PROTOCOL_VIOLATION),
501  errmsg("could not start WAL streaming: %s",
502  pchomp(PQerrorMessage(conn->streamConn)))));
503  }
504  PQclear(res);
505  return true;
506 }
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4137
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
Definition: pg_list.h:52
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), Assert(), conn, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, initStringInfo(), libpqrcv_PQexec(), LSN_FORMAT_ARGS, pchomp(), pfree(), PGRES_COMMAND_OK, PGRES_COPY_BOTH, PQclear(), PQerrorMessage(), PQescapeLiteral(), PQfreemem(), PQresultStatus(), PQserverVersion(), res, and stringlist_to_identifierstr().

◆ stringlist_to_identifierstr()

static char * stringlist_to_identifierstr ( PGconn conn,
List strings 
)
static

Definition at line 1139 of file libpqwalreceiver.c.

1140 {
1141  ListCell *lc;
1143  bool first = true;
1144 
1145  initStringInfo(&res);
1146 
1147  foreach(lc, strings)
1148  {
1149  char *val = strVal(lfirst(lc));
1150  char *val_escaped;
1151 
1152  if (first)
1153  first = false;
1154  else
1155  appendStringInfoChar(&res, ',');
1156 
1157  val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1158  if (!val_escaped)
1159  {
1160  free(res.data);
1161  return NULL;
1162  }
1163  appendStringInfoString(&res, val_escaped);
1164  PQfreemem(val_escaped);
1165  }
1166 
1167  return res.data;
1168 }
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4143
#define free(a)
Definition: header.h:65
long val
Definition: informix.c:664
#define lfirst(lc)
Definition: pg_list.h:170
#define strVal(v)
Definition: value.h:82

References appendStringInfoChar(), appendStringInfoString(), conn, free, initStringInfo(), lfirst, PQescapeIdentifier(), PQfreemem(), res, strVal, and val.

Referenced by libpqrcv_startstreaming().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 37 of file libpqwalreceiver.c.

◆ PQWalReceiverFunctions

WalReceiverFunctionsType PQWalReceiverFunctions
static
Initial value:
= {
.walrcv_connect = libpqrcv_connect,
.walrcv_check_conninfo = libpqrcv_check_conninfo,
.walrcv_get_conninfo = libpqrcv_get_conninfo,
.walrcv_get_senderinfo = libpqrcv_get_senderinfo,
.walrcv_identify_system = libpqrcv_identify_system,
.walrcv_server_version = libpqrcv_server_version,
.walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
.walrcv_startstreaming = libpqrcv_startstreaming,
.walrcv_endstreaming = libpqrcv_endstreaming,
.walrcv_receive = libpqrcv_receive,
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
}
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
static int libpqrcv_server_version(WalReceiverConn *conn)
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static void libpqrcv_check_conninfo(const char *conninfo)
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)

Definition at line 84 of file libpqwalreceiver.c.

Referenced by _PG_init().