PostgreSQL Source Code git master
libpqwalreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/time.h>
#include "common/connect.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "pqexpbuffer.h"
#include "replication/walreceiver.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/tuplestore.h"
Include dependency graph for libpqwalreceiver.c:

Go to the source code of this file.

Data Structures

struct  WalReceiverConn
 

Functions

 PG_MODULE_MAGIC_EXT (.name="libpqwalreceiver",.version=PG_VERSION)
 
static WalReceiverConnlibpqrcv_connect (const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err)
 
static void libpqrcv_check_conninfo (const char *conninfo, bool must_use_password)
 
static char * libpqrcv_get_conninfo (WalReceiverConn *conn)
 
static void libpqrcv_get_senderinfo (WalReceiverConn *conn, char **sender_host, int *sender_port)
 
static char * libpqrcv_identify_system (WalReceiverConn *conn, TimeLineID *primary_tli)
 
static char * libpqrcv_get_dbname_from_conninfo (const char *connInfo)
 
static int libpqrcv_server_version (WalReceiverConn *conn)
 
static void libpqrcv_readtimelinehistoryfile (WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
 
static bool libpqrcv_startstreaming (WalReceiverConn *conn, const WalRcvStreamOptions *options)
 
static void libpqrcv_endstreaming (WalReceiverConn *conn, TimeLineID *next_tli)
 
static int libpqrcv_receive (WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
 
static void libpqrcv_send (WalReceiverConn *conn, const char *buffer, int nbytes)
 
static char * libpqrcv_create_slot (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
 
static void libpqrcv_alter_slot (WalReceiverConn *conn, const char *slotname, const bool *failover, const bool *two_phase)
 
static pid_t libpqrcv_get_backend_pid (WalReceiverConn *conn)
 
static WalRcvExecResultlibpqrcv_exec (WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
 
static void libpqrcv_disconnect (WalReceiverConn *conn)
 
static char * stringlist_to_identifierstr (PGconn *conn, List *strings)
 
void _PG_init (void)
 
static void libpqrcv_processTuples (PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
 

Variables

static WalReceiverFunctionsType PQWalReceiverFunctions
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 123 of file libpqwalreceiver.c.

124{
125 if (WalReceiverFunctions != NULL)
126 elog(ERROR, "libpqwalreceiver already loaded");
128}
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
static WalReceiverFunctionsType PQWalReceiverFunctions
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:94

References elog, ERROR, PQWalReceiverFunctions, and WalReceiverFunctions.

◆ libpqrcv_alter_slot()

static void libpqrcv_alter_slot ( WalReceiverConn conn,
const char *  slotname,
const bool *  failover,
const bool *  two_phase 
)
static

Definition at line 976 of file libpqwalreceiver.c.

978{
979 StringInfoData cmd;
980 PGresult *res;
981
982 initStringInfo(&cmd);
983 appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
984 quote_identifier(slotname));
985
986 if (failover)
987 appendStringInfo(&cmd, "FAILOVER %s",
988 *failover ? "true" : "false");
989
990 if (failover && two_phase)
991 appendStringInfoString(&cmd, ", ");
992
993 if (two_phase)
994 appendStringInfo(&cmd, "TWO_PHASE %s",
995 *two_phase ? "true" : "false");
996
997 appendStringInfoString(&cmd, " );");
998
999 res = libpqsrv_exec(conn->streamConn, cmd.data,
1000 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
1001 pfree(cmd.data);
1002
1003 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1004 ereport(ERROR,
1005 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1006 errmsg("could not alter replication slot \"%s\": %s",
1007 slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1008
1009 PQclear(res);
1010}
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ereport(elevel,...)
Definition: elog.h:150
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7704
static PGresult * libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
#define PQclear
Definition: libpq-be-fe.h:245
#define PQresultStatus
Definition: libpq-be-fe.h:247
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
void pfree(void *pointer)
Definition: mcxt.c:1594
char * pchomp(const char *in)
Definition: mcxt.c:1787
static bool two_phase
static bool failover
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13062
PGconn * conn
Definition: streamutil.c:52
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97

References appendStringInfo(), appendStringInfoString(), conn, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, failover, initStringInfo(), libpqsrv_exec(), pchomp(), pfree(), PGRES_COMMAND_OK, PQclear, PQerrorMessage(), PQresultStatus, quote_identifier(), and two_phase.

◆ libpqrcv_check_conninfo()

static void libpqrcv_check_conninfo ( const char *  conninfo,
bool  must_use_password 
)
static

Definition at line 283 of file libpqwalreceiver.c.

284{
285 PQconninfoOption *opts = NULL;
286 PQconninfoOption *opt;
287 char *err = NULL;
288
289 opts = PQconninfoParse(conninfo, &err);
290 if (opts == NULL)
291 {
292 /* The error string is malloc'd, so we must free it explicitly */
293 char *errcopy = err ? pstrdup(err) : "out of memory";
294
295 PQfreemem(err);
297 (errcode(ERRCODE_SYNTAX_ERROR),
298 errmsg("invalid connection string syntax: %s", errcopy)));
299 }
300
301 if (must_use_password)
302 {
303 bool uses_password = false;
304
305 for (opt = opts; opt->keyword != NULL; ++opt)
306 {
307 /* Ignore connection options that are not present. */
308 if (opt->val == NULL)
309 continue;
310
311 if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
312 {
313 uses_password = true;
314 break;
315 }
316 }
317
318 if (!uses_password)
319 {
320 /* malloc'd, so we must free it explicitly */
322
324 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
325 errmsg("password is required"),
326 errdetail("Non-superusers must provide a password in the connection string.")));
327 }
328 }
329
331}
int errdetail(const char *fmt,...)
Definition: elog.c:1216
void err(int eval, const char *fmt,...)
Definition: err.c:43
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7525
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:6241
void PQfreemem(void *ptr)
Definition: fe-exec.c:4049
char * pstrdup(const char *in)
Definition: mcxt.c:1759
static AmcheckOptions opts
Definition: pg_amcheck.c:112

References ereport, err(), errcode(), errdetail(), errmsg(), ERROR, _PQconninfoOption::keyword, opts, PQconninfoFree(), PQconninfoParse(), PQfreemem(), pstrdup(), and _PQconninfoOption::val.

Referenced by libpqrcv_connect().

◆ libpqrcv_connect()

static WalReceiverConn * libpqrcv_connect ( const char *  conninfo,
bool  replication,
bool  logical,
bool  must_use_password,
const char *  appname,
char **  err 
)
static

Definition at line 146 of file libpqwalreceiver.c.

148{
150 const char *keys[6];
151 const char *vals[6];
152 int i = 0;
153
154 /*
155 * Re-validate connection string. The validation already happened at DDL
156 * time, but the subscription owner may have changed. If we don't recheck
157 * with the correct must_use_password, it's possible that the connection
158 * will obtain the password from a different source, such as PGPASSFILE or
159 * PGPASSWORD.
160 */
161 libpqrcv_check_conninfo(conninfo, must_use_password);
162
163 /*
164 * We use the expand_dbname parameter to process the connection string (or
165 * URI), and pass some extra options.
166 */
167 keys[i] = "dbname";
168 vals[i] = conninfo;
169
170 /* We can not have logical without replication */
171 Assert(replication || !logical);
172
173 if (replication)
174 {
175 keys[++i] = "replication";
176 vals[i] = logical ? "database" : "true";
177
178 if (logical)
179 {
180 /* Tell the publisher to translate to our encoding */
181 keys[++i] = "client_encoding";
182 vals[i] = GetDatabaseEncodingName();
183
184 /*
185 * Force assorted GUC parameters to settings that ensure that the
186 * publisher will output data values in a form that is unambiguous
187 * to the subscriber. (We don't want to modify the subscriber's
188 * GUC settings, since that might surprise user-defined code
189 * running in the subscriber, such as triggers.) This should
190 * match what pg_dump does.
191 */
192 keys[++i] = "options";
193 vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
194 }
195 else
196 {
197 /*
198 * The database name is ignored by the server in replication mode,
199 * but specify "replication" for .pgpass lookup.
200 */
201 keys[++i] = "dbname";
202 vals[i] = "replication";
203 }
204 }
205
206 keys[++i] = "fallback_application_name";
207 vals[i] = appname;
208
209 keys[++i] = NULL;
210 vals[i] = NULL;
211
212 Assert(i < lengthof(keys));
213
214 conn = palloc0(sizeof(WalReceiverConn));
215 conn->streamConn =
216 libpqsrv_connect_params(keys, vals,
217 /* expand_dbname = */ true,
218 WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
219
220 if (PQstatus(conn->streamConn) != CONNECTION_OK)
221 goto bad_connection_errmsg;
222
223 if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
224 {
225 libpqsrv_disconnect(conn->streamConn);
226 pfree(conn);
227
229 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
230 errmsg("password is required"),
231 errdetail("Non-superuser cannot connect if the server does not request a password."),
232 errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
233 }
234
236 "received message via replication");
237
238 /*
239 * Set always-secure search path for the cases where the connection is
240 * used to run SQL queries, so malicious users can't get control.
241 */
242 if (!replication || logical)
243 {
244 PGresult *res;
245
246 res = libpqsrv_exec(conn->streamConn,
248 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
250 {
251 PQclear(res);
252 *err = psprintf(_("could not clear search path: %s"),
253 pchomp(PQerrorMessage(conn->streamConn)));
254 goto bad_connection;
255 }
256 PQclear(res);
257 }
258
259 conn->logical = logical;
260
261 return conn;
262
263 /* error path, using libpq's error message */
264bad_connection_errmsg:
265 *err = pchomp(PQerrorMessage(conn->streamConn));
266
267 /* error path, error already set */
268bad_connection:
269 libpqsrv_disconnect(conn->streamConn);
270 pfree(conn);
271 return NULL;
272}
#define lengthof(array)
Definition: c.h:790
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
int errhint(const char *fmt,...)
Definition: elog.c:1330
#define _(x)
Definition: elog.c:91
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7772
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7641
PQnoticeReceiver PQsetNoticeReceiver(PGconn *conn, PQnoticeReceiver proc, void *arg)
Definition: fe-connect.c:7868
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static void libpqsrv_notice_receiver(void *arg, const PGresult *res)
static void libpqsrv_disconnect(PGconn *conn)
@ CONNECTION_OK
Definition: libpq-fe.h:84
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1268
void * palloc0(Size size)
Definition: mcxt.c:1395
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43

References _, ALWAYS_SECURE_SEARCH_PATH_SQL, Assert(), conn, CONNECTION_OK, ereport, err(), errcode(), errdetail(), errhint(), errmsg(), ERROR, GetDatabaseEncodingName(), i, lengthof, libpqrcv_check_conninfo(), libpqsrv_connect_params(), libpqsrv_disconnect(), libpqsrv_exec(), libpqsrv_notice_receiver(), palloc0(), pchomp(), pfree(), PGRES_TUPLES_OK, PQclear, PQconnectionUsedPassword(), PQerrorMessage(), PQresultStatus, PQsetNoticeReceiver(), PQstatus(), and psprintf().

◆ libpqrcv_create_slot()

static char * libpqrcv_create_slot ( WalReceiverConn conn,
const char *  slotname,
bool  temporary,
bool  two_phase,
bool  failover,
CRSSnapshotAction  snapshot_action,
XLogRecPtr lsn 
)
static

Definition at line 864 of file libpqwalreceiver.c.

867{
868 PGresult *res;
869 StringInfoData cmd;
870 char *snapshot;
871 int use_new_options_syntax;
872
873 use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
874
875 initStringInfo(&cmd);
876
877 appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
878
879 if (temporary)
880 appendStringInfoString(&cmd, " TEMPORARY");
881
882 if (conn->logical)
883 {
884 appendStringInfoString(&cmd, " LOGICAL pgoutput ");
885 if (use_new_options_syntax)
886 appendStringInfoChar(&cmd, '(');
887 if (two_phase)
888 {
889 appendStringInfoString(&cmd, "TWO_PHASE");
890 if (use_new_options_syntax)
891 appendStringInfoString(&cmd, ", ");
892 else
893 appendStringInfoChar(&cmd, ' ');
894 }
895
896 if (failover)
897 {
898 appendStringInfoString(&cmd, "FAILOVER");
899 if (use_new_options_syntax)
900 appendStringInfoString(&cmd, ", ");
901 else
902 appendStringInfoChar(&cmd, ' ');
903 }
904
905 if (use_new_options_syntax)
906 {
907 switch (snapshot_action)
908 {
910 appendStringInfoString(&cmd, "SNAPSHOT 'export'");
911 break;
913 appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
914 break;
915 case CRS_USE_SNAPSHOT:
916 appendStringInfoString(&cmd, "SNAPSHOT 'use'");
917 break;
918 }
919 }
920 else
921 {
922 switch (snapshot_action)
923 {
925 appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
926 break;
928 appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
929 break;
930 case CRS_USE_SNAPSHOT:
931 appendStringInfoString(&cmd, "USE_SNAPSHOT");
932 break;
933 }
934 }
935
936 if (use_new_options_syntax)
937 appendStringInfoChar(&cmd, ')');
938 }
939 else
940 {
941 if (use_new_options_syntax)
942 appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
943 else
944 appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
945 }
946
947 res = libpqsrv_exec(conn->streamConn,
948 cmd.data,
949 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
950 pfree(cmd.data);
951
954 (errcode(ERRCODE_PROTOCOL_VIOLATION),
955 errmsg("could not create replication slot \"%s\": %s",
956 slotname, pchomp(PQerrorMessage(conn->streamConn)))));
957
958 if (lsn)
960 CStringGetDatum(PQgetvalue(res, 0, 1))));
961
962 if (!PQgetisnull(res, 0, 2))
963 snapshot = pstrdup(PQgetvalue(res, 0, 2));
964 else
965 snapshot = NULL;
966
967 PQclear(res);
968
969 return snapshot;
970}
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7694
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:793
#define PQgetvalue
Definition: libpq-be-fe.h:253
#define PQgetisnull
Definition: libpq-be-fe.h:255
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:25
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:360
#define InvalidOid
Definition: postgres_ext.h:37
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), conn, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetDatum(), StringInfoData::data, DatumGetLSN(), DirectFunctionCall1Coll(), ereport, errcode(), errmsg(), ERROR, failover, initStringInfo(), InvalidOid, libpqsrv_exec(), pchomp(), pfree(), pg_lsn_in(), PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQgetisnull, PQgetvalue, PQresultStatus, PQserverVersion(), pstrdup(), and two_phase.

◆ libpqrcv_disconnect()

static void libpqrcv_disconnect ( WalReceiverConn conn)
static

Definition at line 735 of file libpqwalreceiver.c.

736{
737 libpqsrv_disconnect(conn->streamConn);
738 PQfreemem(conn->recvBuf);
739 pfree(conn);
740}

References conn, libpqsrv_disconnect(), pfree(), and PQfreemem().

◆ libpqrcv_endstreaming()

static void libpqrcv_endstreaming ( WalReceiverConn conn,
TimeLineID next_tli 
)
static

Definition at line 614 of file libpqwalreceiver.c.

615{
616 PGresult *res;
617
618 /*
619 * Send copy-end message. As in libpqsrv_exec, this could theoretically
620 * block, but the risk seems small.
621 */
622 if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
623 PQflush(conn->streamConn))
625 (errcode(ERRCODE_CONNECTION_FAILURE),
626 errmsg("could not send end-of-streaming message to primary: %s",
627 pchomp(PQerrorMessage(conn->streamConn)))));
628
629 *next_tli = 0;
630
631 /*
632 * After COPY is finished, we should receive a result set indicating the
633 * next timeline's ID, or just CommandComplete if the server was shut
634 * down.
635 *
636 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
637 * also possible in case we aborted the copy in mid-stream.
638 */
639 res = libpqsrv_get_result(conn->streamConn,
640 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
642 {
643 /*
644 * Read the next timeline's ID. The server also sends the timeline's
645 * starting point, but it is ignored.
646 */
647 if (PQnfields(res) < 2 || PQntuples(res) != 1)
649 (errcode(ERRCODE_PROTOCOL_VIOLATION),
650 errmsg("unexpected result set after end-of-streaming")));
651 *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
652 PQclear(res);
653
654 /* the result set should be followed by CommandComplete */
655 res = libpqsrv_get_result(conn->streamConn,
656 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
657 }
658 else if (PQresultStatus(res) == PGRES_COPY_OUT)
659 {
660 PQclear(res);
661
662 /* End the copy */
663 if (PQendcopy(conn->streamConn))
665 (errcode(ERRCODE_CONNECTION_FAILURE),
666 errmsg("error while shutting down streaming COPY: %s",
667 pchomp(PQerrorMessage(conn->streamConn)))));
668
669 /* CommandComplete should follow */
670 res = libpqsrv_get_result(conn->streamConn,
671 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
672 }
673
676 (errcode(ERRCODE_PROTOCOL_VIOLATION),
677 errmsg("error reading result of streaming command: %s",
678 pchomp(PQerrorMessage(conn->streamConn)))));
679 PQclear(res);
680
681 /* Verify that there are no more results */
682 res = libpqsrv_get_result(conn->streamConn,
683 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
684 if (res != NULL)
686 (errcode(ERRCODE_PROTOCOL_VIOLATION),
687 errmsg("unexpected result after CommandComplete: %s",
688 pchomp(PQerrorMessage(conn->streamConn)))));
689}
int PQflush(PGconn *conn)
Definition: fe-exec.c:4017
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2966
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2766
static PGresult * libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
#define PQnfields
Definition: libpq-be-fe.h:252
#define PQntuples
Definition: libpq-be-fe.h:251
@ PGRES_COPY_OUT
Definition: libpq-fe.h:131
int32 pg_strtoint32(const char *s)
Definition: numutils.c:383

References conn, ereport, errcode(), errmsg(), ERROR, libpqsrv_get_result(), pchomp(), pg_strtoint32(), PGRES_COMMAND_OK, PGRES_COPY_OUT, PGRES_TUPLES_OK, PQclear, PQendcopy(), PQerrorMessage(), PQflush(), PQgetvalue, PQnfields, PQntuples, PQputCopyEnd(), and PQresultStatus.

◆ libpqrcv_exec()

static WalRcvExecResult * libpqrcv_exec ( WalReceiverConn conn,
const char *  query,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 1101 of file libpqwalreceiver.c.

1103{
1104 PGresult *pgres = NULL;
1105 WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1106 char *diag_sqlstate;
1107
1108 if (MyDatabaseId == InvalidOid)
1109 ereport(ERROR,
1110 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1111 errmsg("the query interface requires a database connection")));
1112
1113 pgres = libpqsrv_exec(conn->streamConn,
1114 query,
1115 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
1116
1117 switch (PQresultStatus(pgres))
1118 {
1119 case PGRES_TUPLES_OK:
1120 case PGRES_SINGLE_TUPLE:
1121 case PGRES_TUPLES_CHUNK:
1122 walres->status = WALRCV_OK_TUPLES;
1123 libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1124 break;
1125
1126 case PGRES_COPY_IN:
1127 walres->status = WALRCV_OK_COPY_IN;
1128 break;
1129
1130 case PGRES_COPY_OUT:
1131 walres->status = WALRCV_OK_COPY_OUT;
1132 break;
1133
1134 case PGRES_COPY_BOTH:
1135 walres->status = WALRCV_OK_COPY_BOTH;
1136 break;
1137
1138 case PGRES_COMMAND_OK:
1139 walres->status = WALRCV_OK_COMMAND;
1140 break;
1141
1142 /* Empty query is considered error. */
1143 case PGRES_EMPTY_QUERY:
1144 walres->status = WALRCV_ERROR;
1145 walres->err = _("empty query");
1146 break;
1147
1150 walres->status = WALRCV_ERROR;
1151 walres->err = _("unexpected pipeline mode");
1152 break;
1153
1155 case PGRES_FATAL_ERROR:
1156 case PGRES_BAD_RESPONSE:
1157 walres->status = WALRCV_ERROR;
1158 walres->err = pchomp(PQerrorMessage(conn->streamConn));
1159 diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1160 if (diag_sqlstate)
1161 walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1162 diag_sqlstate[1],
1163 diag_sqlstate[2],
1164 diag_sqlstate[3],
1165 diag_sqlstate[4]);
1166 break;
1167 }
1168
1169 PQclear(pgres);
1170
1171 return walres;
1172}
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
Oid MyDatabaseId
Definition: globals.c:94
#define PQresultErrorField
Definition: libpq-be-fe.h:249
@ PGRES_COPY_IN
Definition: libpq-fe.h:132
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:137
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:142
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:136
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:138
@ PGRES_EMPTY_QUERY
Definition: libpq-fe.h:124
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:139
@ PGRES_BAD_RESPONSE
Definition: libpq-fe.h:133
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:140
@ PGRES_NONFATAL_ERROR
Definition: libpq-fe.h:135
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
WalRcvExecStatus status
Definition: walreceiver.h:220
@ WALRCV_OK_COPY_IN
Definition: walreceiver.h:208
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_ERROR
Definition: walreceiver.h:204
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
@ WALRCV_OK_COPY_BOTH
Definition: walreceiver.h:210

References _, conn, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, InvalidOid, libpqrcv_processTuples(), libpqsrv_exec(), MAKE_SQLSTATE, MyDatabaseId, palloc0(), pchomp(), PG_DIAG_SQLSTATE, PGRES_BAD_RESPONSE, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PGRES_EMPTY_QUERY, PGRES_FATAL_ERROR, PGRES_NONFATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_CHUNK, PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQresultErrorField, PQresultStatus, WalRcvExecResult::sqlstate, WalRcvExecResult::status, WALRCV_ERROR, WALRCV_OK_COMMAND, WALRCV_OK_COPY_BOTH, WALRCV_OK_COPY_IN, WALRCV_OK_COPY_OUT, and WALRCV_OK_TUPLES.

◆ libpqrcv_get_backend_pid()

static pid_t libpqrcv_get_backend_pid ( WalReceiverConn conn)
static

Definition at line 1016 of file libpqwalreceiver.c.

1017{
1018 return PQbackendPID(conn->streamConn);
1019}
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7740

References conn, and PQbackendPID().

◆ libpqrcv_get_conninfo()

static char * libpqrcv_get_conninfo ( WalReceiverConn conn)
static

Definition at line 338 of file libpqwalreceiver.c.

339{
340 PQconninfoOption *conn_opts;
341 PQconninfoOption *conn_opt;
343 char *retval;
344
345 Assert(conn->streamConn != NULL);
346
348 conn_opts = PQconninfo(conn->streamConn);
349
350 if (conn_opts == NULL)
352 (errcode(ERRCODE_OUT_OF_MEMORY),
353 errmsg("could not parse connection string: %s",
354 _("out of memory"))));
355
356 /* build a clean connection string from pieces */
357 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
358 {
359 bool obfuscate;
360
361 /* Skip debug and empty options */
362 if (strchr(conn_opt->dispchar, 'D') ||
363 conn_opt->val == NULL ||
364 conn_opt->val[0] == '\0')
365 continue;
366
367 /* Obfuscate security-sensitive options */
368 obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
369
370 appendPQExpBuffer(&buf, "%s%s=%s",
371 buf.len == 0 ? "" : " ",
372 conn_opt->keyword,
373 obfuscate ? "********" : conn_opt->val);
374 }
375
376 PQconninfoFree(conn_opts);
377
378 retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
380 return retval;
381}
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:7481
static char * buf
Definition: pg_test_fsync.c:72
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67

References _, appendPQExpBuffer(), Assert(), buf, conn, _PQconninfoOption::dispchar, ereport, errcode(), errmsg(), ERROR, initPQExpBuffer(), _PQconninfoOption::keyword, PQconninfo(), PQconninfoFree(), PQExpBufferDataBroken, pstrdup(), termPQExpBuffer(), and _PQconninfoOption::val.

◆ libpqrcv_get_dbname_from_conninfo()

static char * libpqrcv_get_dbname_from_conninfo ( const char *  connInfo)
static

Definition at line 462 of file libpqwalreceiver.c.

463{
465 char *dbname = NULL;
466 char *err = NULL;
467
468 opts = PQconninfoParse(connInfo, &err);
469 if (opts == NULL)
470 {
471 /* The error string is malloc'd, so we must free it explicitly */
472 char *errcopy = err ? pstrdup(err) : "out of memory";
473
474 PQfreemem(err);
476 (errcode(ERRCODE_SYNTAX_ERROR),
477 errmsg("invalid connection string syntax: %s", errcopy)));
478 }
479
480 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
481 {
482 /*
483 * If multiple dbnames are specified, then the last one will be
484 * returned
485 */
486 if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
487 *opt->val)
488 {
489 if (dbname)
490 pfree(dbname);
491
492 dbname = pstrdup(opt->val);
493 }
494 }
495
497 return dbname;
498}
char * dbname
Definition: streamutil.c:49

References dbname, ereport, err(), errcode(), errmsg(), ERROR, opts, pfree(), PQconninfoFree(), PQconninfoParse(), PQfreemem(), and pstrdup().

◆ libpqrcv_get_senderinfo()

static void libpqrcv_get_senderinfo ( WalReceiverConn conn,
char **  sender_host,
int *  sender_port 
)
static

Definition at line 387 of file libpqwalreceiver.c.

389{
390 char *ret = NULL;
391
392 *sender_host = NULL;
393 *sender_port = 0;
394
395 Assert(conn->streamConn != NULL);
396
397 ret = PQhost(conn->streamConn);
398 if (ret && strlen(ret) != 0)
399 *sender_host = pstrdup(ret);
400
401 ret = PQport(conn->streamConn);
402 if (ret && strlen(ret) != 0)
403 *sender_port = atoi(ret);
404}
char * PQport(const PGconn *conn)
Definition: fe-connect.c:7607
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:7571

References Assert(), conn, PQhost(), PQport(), and pstrdup().

◆ libpqrcv_identify_system()

static char * libpqrcv_identify_system ( WalReceiverConn conn,
TimeLineID primary_tli 
)
static

Definition at line 411 of file libpqwalreceiver.c.

412{
413 PGresult *res;
414 char *primary_sysid;
415
416 /*
417 * Get the system identifier and timeline ID as a DataRow message from the
418 * primary server.
419 */
420 res = libpqsrv_exec(conn->streamConn,
421 "IDENTIFY_SYSTEM",
422 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
425 (errcode(ERRCODE_PROTOCOL_VIOLATION),
426 errmsg("could not receive database system identifier and timeline ID from "
427 "the primary server: %s",
428 pchomp(PQerrorMessage(conn->streamConn)))));
429
430 /*
431 * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
432 * 9.4 and onwards.
433 */
434 if (PQnfields(res) < 3 || PQntuples(res) != 1)
436 (errcode(ERRCODE_PROTOCOL_VIOLATION),
437 errmsg("invalid response from primary server"),
438 errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
439 PQntuples(res), PQnfields(res), 1, 3)));
440 primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
441 *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
442 PQclear(res);
443
444 return primary_sysid;
445}

References conn, ereport, errcode(), errdetail(), errmsg(), ERROR, libpqsrv_exec(), pchomp(), pg_strtoint32(), PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQgetvalue, PQnfields, PQntuples, PQresultStatus, and pstrdup().

◆ libpqrcv_processTuples()

static void libpqrcv_processTuples ( PGresult pgres,
WalRcvExecResult walres,
const int  nRetTypes,
const Oid retTypes 
)
static

Definition at line 1025 of file libpqwalreceiver.c.

1027{
1028 int tupn;
1029 int coln;
1030 int nfields = PQnfields(pgres);
1031 HeapTuple tuple;
1032 AttInMetadata *attinmeta;
1033 MemoryContext rowcontext;
1034 MemoryContext oldcontext;
1035
1036 /* Make sure we got expected number of fields. */
1037 if (nfields != nRetTypes)
1038 ereport(ERROR,
1039 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1040 errmsg("invalid query response"),
1041 errdetail("Expected %d fields, got %d fields.",
1042 nRetTypes, nfields)));
1043
1044 walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1045
1046 /* Create tuple descriptor corresponding to expected result. */
1047 walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1048 for (coln = 0; coln < nRetTypes; coln++)
1049 TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1050 PQfname(pgres, coln), retTypes[coln], -1, 0);
1051 attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1052
1053 /* No point in doing more here if there were no tuples returned. */
1054 if (PQntuples(pgres) == 0)
1055 return;
1056
1057 /* Create temporary context for local allocations. */
1059 "libpqrcv query result context",
1061
1062 /* Process returned rows. */
1063 for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1064 {
1065 char *cstrs[MaxTupleAttributeNumber];
1066
1068
1069 /* Do the allocations in temporary context. */
1070 oldcontext = MemoryContextSwitchTo(rowcontext);
1071
1072 /*
1073 * Fill cstrs with null-terminated strings of column values.
1074 */
1075 for (coln = 0; coln < nfields; coln++)
1076 {
1077 if (PQgetisnull(pgres, tupn, coln))
1078 cstrs[coln] = NULL;
1079 else
1080 cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1081 }
1082
1083 /* Convert row to a tuple, and add it to the tuplestore */
1084 tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1085 tuplestore_puttuple(walres->tuplestore, tuple);
1086
1087 /* Clean up */
1088 MemoryContextSwitchTo(oldcontext);
1089 MemoryContextReset(rowcontext);
1090 }
1091
1092 MemoryContextDelete(rowcontext);
1093}
int16 AttrNumber
Definition: attnum.h:21
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2324
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2275
int work_mem
Definition: globals.c:131
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
#define PQfname
Definition: libpq-be-fe.h:256
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:182
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:842
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:330
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:764

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, BuildTupleFromCStrings(), CHECK_FOR_INTERRUPTS, CreateTemplateTupleDesc(), CurrentMemoryContext, ereport, errcode(), errdetail(), errmsg(), ERROR, MaxTupleAttributeNumber, MemoryContextDelete(), MemoryContextReset(), MemoryContextSwitchTo(), PQfname, PQgetisnull, PQgetvalue, PQnfields, PQntuples, WalRcvExecResult::tupledesc, TupleDescGetAttInMetadata(), TupleDescInitEntry(), WalRcvExecResult::tuplestore, tuplestore_begin_heap(), tuplestore_puttuple(), and work_mem.

Referenced by libpqrcv_exec().

◆ libpqrcv_readtimelinehistoryfile()

static void libpqrcv_readtimelinehistoryfile ( WalReceiverConn conn,
TimeLineID  tli,
char **  filename,
char **  content,
int *  len 
)
static

Definition at line 695 of file libpqwalreceiver.c.

698{
699 PGresult *res;
700 char cmd[64];
701
702 Assert(!conn->logical);
703
704 /*
705 * Request the primary to send over the history file for given timeline.
706 */
707 snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
708 res = libpqsrv_exec(conn->streamConn,
709 cmd,
710 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
713 (errcode(ERRCODE_PROTOCOL_VIOLATION),
714 errmsg("could not receive timeline history file from "
715 "the primary server: %s",
716 pchomp(PQerrorMessage(conn->streamConn)))));
717 if (PQnfields(res) != 2 || PQntuples(res) != 1)
719 (errcode(ERRCODE_PROTOCOL_VIOLATION),
720 errmsg("invalid response from primary server"),
721 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
722 PQntuples(res), PQnfields(res))));
723 *filename = pstrdup(PQgetvalue(res, 0, 0));
724
725 *len = PQgetlength(res, 0, 1);
726 *content = palloc(*len);
727 memcpy(*content, PQgetvalue(res, 0, 1), *len);
728 PQclear(res);
729}
#define PQgetlength
Definition: libpq-be-fe.h:254
void * palloc(Size size)
Definition: mcxt.c:1365
const void size_t len
static char * filename
Definition: pg_dumpall.c:120
#define snprintf
Definition: port.h:260

References Assert(), conn, ereport, errcode(), errdetail(), errmsg(), ERROR, filename, len, libpqsrv_exec(), palloc(), pchomp(), PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQgetlength, PQgetvalue, PQnfields, PQntuples, PQresultStatus, pstrdup(), and snprintf.

◆ libpqrcv_receive()

static int libpqrcv_receive ( WalReceiverConn conn,
char **  buffer,
pgsocket wait_fd 
)
static

Definition at line 759 of file libpqwalreceiver.c.

761{
762 int rawlen;
763
764 PQfreemem(conn->recvBuf);
765 conn->recvBuf = NULL;
766
767 /* Try to receive a CopyData message */
768 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
769 if (rawlen == 0)
770 {
771 /* Try consuming some data. */
772 if (PQconsumeInput(conn->streamConn) == 0)
774 (errcode(ERRCODE_CONNECTION_FAILURE),
775 errmsg("could not receive data from WAL stream: %s",
776 pchomp(PQerrorMessage(conn->streamConn)))));
777
778 /* Now that we've consumed some input, try again */
779 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
780 if (rawlen == 0)
781 {
782 /* Tell caller to try again when our socket is ready. */
783 *wait_fd = PQsocket(conn->streamConn);
784 return 0;
785 }
786 }
787 if (rawlen == -1) /* end-of-streaming or error */
788 {
789 PGresult *res;
790
791 res = libpqsrv_get_result(conn->streamConn,
792 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
794 {
795 PQclear(res);
796
797 /* Verify that there are no more results. */
798 res = libpqsrv_get_result(conn->streamConn,
799 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
800 if (res != NULL)
801 {
802 PQclear(res);
803
804 /*
805 * If the other side closed the connection orderly (otherwise
806 * we'd seen an error, or PGRES_COPY_IN) don't report an error
807 * here, but let callers deal with it.
808 */
809 if (PQstatus(conn->streamConn) == CONNECTION_BAD)
810 return -1;
811
813 (errcode(ERRCODE_PROTOCOL_VIOLATION),
814 errmsg("unexpected result after CommandComplete: %s",
815 PQerrorMessage(conn->streamConn))));
816 }
817
818 return -1;
819 }
820 else if (PQresultStatus(res) == PGRES_COPY_IN)
821 {
822 PQclear(res);
823 return -1;
824 }
825 else
827 (errcode(ERRCODE_PROTOCOL_VIOLATION),
828 errmsg("could not receive data from WAL stream: %s",
829 pchomp(PQerrorMessage(conn->streamConn)))));
830 }
831 if (rawlen < -1)
833 (errcode(ERRCODE_PROTOCOL_VIOLATION),
834 errmsg("could not receive data from WAL stream: %s",
835 pchomp(PQerrorMessage(conn->streamConn)))));
836
837 /* Return received messages to caller */
838 *buffer = conn->recvBuf;
839 return rawlen;
840}
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7730
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:2001
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2833
@ CONNECTION_BAD
Definition: libpq-fe.h:85

References conn, CONNECTION_BAD, ereport, errcode(), errmsg(), ERROR, libpqsrv_get_result(), pchomp(), PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear, PQconsumeInput(), PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQresultStatus, PQsocket(), and PQstatus().

◆ libpqrcv_send()

static void libpqrcv_send ( WalReceiverConn conn,
const char *  buffer,
int  nbytes 
)
static

Definition at line 848 of file libpqwalreceiver.c.

849{
850 if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
851 PQflush(conn->streamConn))
853 (errcode(ERRCODE_CONNECTION_FAILURE),
854 errmsg("could not send data to WAL stream: %s",
855 pchomp(PQerrorMessage(conn->streamConn)))));
856}
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2712

References conn, ereport, errcode(), errmsg(), ERROR, pchomp(), PQerrorMessage(), PQflush(), and PQputCopyData().

◆ libpqrcv_server_version()

static int libpqrcv_server_version ( WalReceiverConn conn)
static

Definition at line 451 of file libpqwalreceiver.c.

452{
453 return PQserverVersion(conn->streamConn);
454}

References conn, and PQserverVersion().

◆ libpqrcv_startstreaming()

static bool libpqrcv_startstreaming ( WalReceiverConn conn,
const WalRcvStreamOptions options 
)
static

Definition at line 511 of file libpqwalreceiver.c.

513{
514 StringInfoData cmd;
515 PGresult *res;
516
517 Assert(options->logical == conn->logical);
518 Assert(options->slotname || !options->logical);
519
520 initStringInfo(&cmd);
521
522 /* Build the command. */
523 appendStringInfoString(&cmd, "START_REPLICATION");
524 if (options->slotname != NULL)
525 appendStringInfo(&cmd, " SLOT \"%s\"",
526 options->slotname);
527
528 if (options->logical)
529 appendStringInfoString(&cmd, " LOGICAL");
530
531 appendStringInfo(&cmd, " %X/%08X", LSN_FORMAT_ARGS(options->startpoint));
532
533 /*
534 * Additional options are different depending on if we are doing logical
535 * or physical replication.
536 */
537 if (options->logical)
538 {
539 char *pubnames_str;
540 List *pubnames;
541 char *pubnames_literal;
542
543 appendStringInfoString(&cmd, " (");
544
545 appendStringInfo(&cmd, "proto_version '%u'",
546 options->proto.logical.proto_version);
547
548 if (options->proto.logical.streaming_str)
549 appendStringInfo(&cmd, ", streaming '%s'",
550 options->proto.logical.streaming_str);
551
552 if (options->proto.logical.twophase &&
553 PQserverVersion(conn->streamConn) >= 150000)
554 appendStringInfoString(&cmd, ", two_phase 'on'");
555
556 if (options->proto.logical.origin &&
557 PQserverVersion(conn->streamConn) >= 160000)
558 appendStringInfo(&cmd, ", origin '%s'",
559 options->proto.logical.origin);
560
561 pubnames = options->proto.logical.publication_names;
562 pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
563 if (!pubnames_str)
565 (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
566 errmsg("could not start WAL streaming: %s",
567 pchomp(PQerrorMessage(conn->streamConn)))));
568 pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
569 strlen(pubnames_str));
570 if (!pubnames_literal)
572 (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
573 errmsg("could not start WAL streaming: %s",
574 pchomp(PQerrorMessage(conn->streamConn)))));
575 appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
576 PQfreemem(pubnames_literal);
577 pfree(pubnames_str);
578
579 if (options->proto.logical.binary &&
580 PQserverVersion(conn->streamConn) >= 140000)
581 appendStringInfoString(&cmd, ", binary 'true'");
582
583 appendStringInfoChar(&cmd, ')');
584 }
585 else
586 appendStringInfo(&cmd, " TIMELINE %u",
587 options->proto.physical.startpointTLI);
588
589 /* Start streaming. */
590 res = libpqsrv_exec(conn->streamConn,
591 cmd.data,
592 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
593 pfree(cmd.data);
594
596 {
597 PQclear(res);
598 return false;
599 }
600 else if (PQresultStatus(res) != PGRES_COPY_BOTH)
602 (errcode(ERRCODE_PROTOCOL_VIOLATION),
603 errmsg("could not start WAL streaming: %s",
604 pchomp(PQerrorMessage(conn->streamConn)))));
605 PQclear(res);
606 return true;
607}
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4399
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
Definition: pg_list.h:54
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47

References appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), Assert(), conn, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, initStringInfo(), libpqsrv_exec(), LSN_FORMAT_ARGS, pchomp(), pfree(), PGRES_COMMAND_OK, PGRES_COPY_BOTH, PQclear, PQerrorMessage(), PQescapeLiteral(), PQfreemem(), PQresultStatus, PQserverVersion(), and stringlist_to_identifierstr().

◆ PG_MODULE_MAGIC_EXT()

PG_MODULE_MAGIC_EXT ( name = "libpqwalreceiver",
version = PG_VERSION 
)

◆ stringlist_to_identifierstr()

static char * stringlist_to_identifierstr ( PGconn conn,
List strings 
)
static

Definition at line 1183 of file libpqwalreceiver.c.

1184{
1185 ListCell *lc;
1186 StringInfoData res;
1187 bool first = true;
1188
1189 initStringInfo(&res);
1190
1191 foreach(lc, strings)
1192 {
1193 char *val = strVal(lfirst(lc));
1194 char *val_escaped;
1195
1196 if (first)
1197 first = false;
1198 else
1199 appendStringInfoChar(&res, ',');
1200
1201 val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1202 if (!val_escaped)
1203 {
1204 free(res.data);
1205 return NULL;
1206 }
1207 appendStringInfoString(&res, val_escaped);
1208 PQfreemem(val_escaped);
1209 }
1210
1211 return res.data;
1212}
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4405
#define free(a)
Definition: header.h:65
long val
Definition: informix.c:689
#define lfirst(lc)
Definition: pg_list.h:172
#define strVal(v)
Definition: value.h:82

References appendStringInfoChar(), appendStringInfoString(), conn, StringInfoData::data, free, initStringInfo(), lfirst, PQescapeIdentifier(), PQfreemem(), strVal, and val.

Referenced by libpqrcv_startstreaming().

Variable Documentation

◆ PQWalReceiverFunctions

WalReceiverFunctionsType PQWalReceiverFunctions
static
Initial value:
= {
.walrcv_connect = libpqrcv_connect,
.walrcv_check_conninfo = libpqrcv_check_conninfo,
.walrcv_get_conninfo = libpqrcv_get_conninfo,
.walrcv_get_senderinfo = libpqrcv_get_senderinfo,
.walrcv_identify_system = libpqrcv_identify_system,
.walrcv_server_version = libpqrcv_server_version,
.walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
.walrcv_startstreaming = libpqrcv_startstreaming,
.walrcv_endstreaming = libpqrcv_endstreaming,
.walrcv_receive = libpqrcv_receive,
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
.walrcv_alter_slot = libpqrcv_alter_slot,
.walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
}
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err)
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
static int libpqrcv_server_version(WalReceiverConn *conn)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, const bool *failover, const bool *two_phase)
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
static char * libpqrcv_get_dbname_from_conninfo(const char *connInfo)

Definition at line 96 of file libpqwalreceiver.c.

Referenced by _PG_init().