PostgreSQL Source Code git master
libpqwalreceiver.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * libpqwalreceiver.c
4 *
5 * This file contains the libpq-specific parts of walreceiver. It's
6 * loaded as a dynamic module to avoid linking the main server binary with
7 * libpq.
8 *
9 * Apart from walreceiver, the libpq-specific routines are now being used by
10 * logical replication workers and slot synchronization.
11 *
12 * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
13 *
14 *
15 * IDENTIFICATION
16 * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
17 *
18 *-------------------------------------------------------------------------
19 */
20#include "postgres.h"
21
22#include <unistd.h>
23#include <sys/time.h>
24
25#include "common/connect.h"
26#include "funcapi.h"
27#include "libpq-fe.h"
28#include "mb/pg_wchar.h"
29#include "miscadmin.h"
30#include "pgstat.h"
31#include "pqexpbuffer.h"
33#include "storage/latch.h"
34#include "utils/builtins.h"
35#include "utils/memutils.h"
36#include "utils/pg_lsn.h"
37#include "utils/tuplestore.h"
38
40
42{
43 /* Current connection to the primary, if any */
45 /* Used to remember if the connection is logical or physical */
46 bool logical;
47 /* Buffer for currently read records */
48 char *recvBuf;
49};
50
51/* Prototypes for interface functions */
52static WalReceiverConn *libpqrcv_connect(const char *conninfo,
53 bool replication, bool logical,
54 bool must_use_password,
55 const char *appname, char **err);
56static void libpqrcv_check_conninfo(const char *conninfo,
57 bool must_use_password);
60 char **sender_host, int *sender_port);
62 TimeLineID *primary_tli);
63static char *libpqrcv_get_dbname_from_conninfo(const char *connInfo);
66 TimeLineID tli, char **filename,
67 char **content, int *len);
71 TimeLineID *next_tli);
72static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
73 pgsocket *wait_fd);
74static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
75 int nbytes);
77 const char *slotname,
78 bool temporary,
79 bool two_phase,
80 bool failover,
81 CRSSnapshotAction snapshot_action,
82 XLogRecPtr *lsn);
83static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
84 const bool *failover, const bool *two_phase);
87 const char *query,
88 const int nRetTypes,
89 const Oid *retTypes);
91
94 .walrcv_check_conninfo = libpqrcv_check_conninfo,
95 .walrcv_get_conninfo = libpqrcv_get_conninfo,
96 .walrcv_get_senderinfo = libpqrcv_get_senderinfo,
97 .walrcv_identify_system = libpqrcv_identify_system,
98 .walrcv_server_version = libpqrcv_server_version,
99 .walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
100 .walrcv_startstreaming = libpqrcv_startstreaming,
101 .walrcv_endstreaming = libpqrcv_endstreaming,
102 .walrcv_receive = libpqrcv_receive,
103 .walrcv_send = libpqrcv_send,
104 .walrcv_create_slot = libpqrcv_create_slot,
105 .walrcv_alter_slot = libpqrcv_alter_slot,
106 .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
107 .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
108 .walrcv_exec = libpqrcv_exec,
109 .walrcv_disconnect = libpqrcv_disconnect
110};
111
112/* Prototypes for private functions */
113static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
114static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
115static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
116
117/*
118 * Module initialization function
119 */
120void
122{
123 if (WalReceiverFunctions != NULL)
124 elog(ERROR, "libpqwalreceiver already loaded");
126}
127
128/*
129 * Establish the connection to the primary server.
130 *
131 * This function can be used for both replication and regular connections.
132 * If it is a replication connection, it could be either logical or physical
133 * based on input argument 'logical'.
134 *
135 * If an error occurs, this function will normally return NULL and set *err
136 * to a palloc'ed error message. However, if must_use_password is true and
137 * the connection fails to use the password, this function will ereport(ERROR).
138 * We do this because in that case the error includes a detail and a hint for
139 * consistency with other parts of the system, and it's not worth adding the
140 * machinery to pass all of those back to the caller just to cover this one
141 * case.
142 */
143static WalReceiverConn *
144libpqrcv_connect(const char *conninfo, bool replication, bool logical,
145 bool must_use_password, const char *appname, char **err)
146{
149 const char *keys[6];
150 const char *vals[6];
151 int i = 0;
152
153 /*
154 * Re-validate connection string. The validation already happened at DDL
155 * time, but the subscription owner may have changed. If we don't recheck
156 * with the correct must_use_password, it's possible that the connection
157 * will obtain the password from a different source, such as PGPASSFILE or
158 * PGPASSWORD.
159 */
160 libpqrcv_check_conninfo(conninfo, must_use_password);
161
162 /*
163 * We use the expand_dbname parameter to process the connection string (or
164 * URI), and pass some extra options.
165 */
166 keys[i] = "dbname";
167 vals[i] = conninfo;
168
169 /* We can not have logical without replication */
170 Assert(replication || !logical);
171
172 if (replication)
173 {
174 keys[++i] = "replication";
175 vals[i] = logical ? "database" : "true";
176
177 if (logical)
178 {
179 /* Tell the publisher to translate to our encoding */
180 keys[++i] = "client_encoding";
181 vals[i] = GetDatabaseEncodingName();
182
183 /*
184 * Force assorted GUC parameters to settings that ensure that the
185 * publisher will output data values in a form that is unambiguous
186 * to the subscriber. (We don't want to modify the subscriber's
187 * GUC settings, since that might surprise user-defined code
188 * running in the subscriber, such as triggers.) This should
189 * match what pg_dump does.
190 */
191 keys[++i] = "options";
192 vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
193 }
194 else
195 {
196 /*
197 * The database name is ignored by the server in replication mode,
198 * but specify "replication" for .pgpass lookup.
199 */
200 keys[++i] = "dbname";
201 vals[i] = "replication";
202 }
203 }
204
205 keys[++i] = "fallback_application_name";
206 vals[i] = appname;
207
208 keys[++i] = NULL;
209 vals[i] = NULL;
210
211 Assert(i < sizeof(keys));
212
213 conn = palloc0(sizeof(WalReceiverConn));
214 conn->streamConn = PQconnectStartParams(keys, vals,
215 /* expand_dbname = */ true);
216 if (PQstatus(conn->streamConn) == CONNECTION_BAD)
217 goto bad_connection_errmsg;
218
219 /*
220 * Poll connection until we have OK or FAILED status.
221 *
222 * Per spec for PQconnectPoll, first wait till socket is write-ready.
223 */
224 status = PGRES_POLLING_WRITING;
225 do
226 {
227 int io_flag;
228 int rc;
229
230 if (status == PGRES_POLLING_READING)
231 io_flag = WL_SOCKET_READABLE;
232#ifdef WIN32
233 /* Windows needs a different test while waiting for connection-made */
234 else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
235 io_flag = WL_SOCKET_CONNECTED;
236#endif
237 else
238 io_flag = WL_SOCKET_WRITEABLE;
239
242 PQsocket(conn->streamConn),
243 0,
244 WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
245
246 /* Interrupted? */
247 if (rc & WL_LATCH_SET)
248 {
251 }
252
253 /* If socket is ready, advance the libpq state machine */
254 if (rc & io_flag)
255 status = PQconnectPoll(conn->streamConn);
256 } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
257
258 if (PQstatus(conn->streamConn) != CONNECTION_OK)
259 goto bad_connection_errmsg;
260
261 if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
262 {
263 PQfinish(conn->streamConn);
264 pfree(conn);
265
267 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
268 errmsg("password is required"),
269 errdetail("Non-superuser cannot connect if the server does not request a password."),
270 errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
271 }
272
273 /*
274 * Set always-secure search path for the cases where the connection is
275 * used to run SQL queries, so malicious users can't get control.
276 */
277 if (!replication || logical)
278 {
279 PGresult *res;
280
281 res = libpqrcv_PQexec(conn->streamConn,
284 {
285 PQclear(res);
286 *err = psprintf(_("could not clear search path: %s"),
287 pchomp(PQerrorMessage(conn->streamConn)));
288 goto bad_connection;
289 }
290 PQclear(res);
291 }
292
293 conn->logical = logical;
294
295 return conn;
296
297 /* error path, using libpq's error message */
298bad_connection_errmsg:
299 *err = pchomp(PQerrorMessage(conn->streamConn));
300
301 /* error path, error already set */
302bad_connection:
303 PQfinish(conn->streamConn);
304 pfree(conn);
305 return NULL;
306}
307
308/*
309 * Validate connection info string.
310 *
311 * If the connection string can't be parsed, this function will raise
312 * an error. If must_use_password is true, the function raises an error
313 * if no password is provided in the connection string. In any other case
314 * it successfully completes.
315 */
316static void
317libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
318{
319 PQconninfoOption *opts = NULL;
320 PQconninfoOption *opt;
321 char *err = NULL;
322
323 opts = PQconninfoParse(conninfo, &err);
324 if (opts == NULL)
325 {
326 /* The error string is malloc'd, so we must free it explicitly */
327 char *errcopy = err ? pstrdup(err) : "out of memory";
328
329 PQfreemem(err);
331 (errcode(ERRCODE_SYNTAX_ERROR),
332 errmsg("invalid connection string syntax: %s", errcopy)));
333 }
334
335 if (must_use_password)
336 {
337 bool uses_password = false;
338
339 for (opt = opts; opt->keyword != NULL; ++opt)
340 {
341 /* Ignore connection options that are not present. */
342 if (opt->val == NULL)
343 continue;
344
345 if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
346 {
347 uses_password = true;
348 break;
349 }
350 }
351
352 if (!uses_password)
353 {
354 /* malloc'd, so we must free it explicitly */
356
358 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
359 errmsg("password is required"),
360 errdetail("Non-superusers must provide a password in the connection string.")));
361 }
362 }
363
365}
366
367/*
368 * Return a user-displayable conninfo string. Any security-sensitive fields
369 * are obfuscated.
370 */
371static char *
373{
374 PQconninfoOption *conn_opts;
375 PQconninfoOption *conn_opt;
377 char *retval;
378
379 Assert(conn->streamConn != NULL);
380
382 conn_opts = PQconninfo(conn->streamConn);
383
384 if (conn_opts == NULL)
386 (errcode(ERRCODE_OUT_OF_MEMORY),
387 errmsg("could not parse connection string: %s",
388 _("out of memory"))));
389
390 /* build a clean connection string from pieces */
391 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
392 {
393 bool obfuscate;
394
395 /* Skip debug and empty options */
396 if (strchr(conn_opt->dispchar, 'D') ||
397 conn_opt->val == NULL ||
398 conn_opt->val[0] == '\0')
399 continue;
400
401 /* Obfuscate security-sensitive options */
402 obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
403
404 appendPQExpBuffer(&buf, "%s%s=%s",
405 buf.len == 0 ? "" : " ",
406 conn_opt->keyword,
407 obfuscate ? "********" : conn_opt->val);
408 }
409
410 PQconninfoFree(conn_opts);
411
412 retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
414 return retval;
415}
416
417/*
418 * Provides information of sender this WAL receiver is connected to.
419 */
420static void
422 int *sender_port)
423{
424 char *ret = NULL;
425
426 *sender_host = NULL;
427 *sender_port = 0;
428
429 Assert(conn->streamConn != NULL);
430
431 ret = PQhost(conn->streamConn);
432 if (ret && strlen(ret) != 0)
433 *sender_host = pstrdup(ret);
434
435 ret = PQport(conn->streamConn);
436 if (ret && strlen(ret) != 0)
437 *sender_port = atoi(ret);
438}
439
440/*
441 * Check that primary's system identifier matches ours, and fetch the current
442 * timeline ID of the primary.
443 */
444static char *
446{
447 PGresult *res;
448 char *primary_sysid;
449
450 /*
451 * Get the system identifier and timeline ID as a DataRow message from the
452 * primary server.
453 */
454 res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
456 {
457 PQclear(res);
459 (errcode(ERRCODE_PROTOCOL_VIOLATION),
460 errmsg("could not receive database system identifier and timeline ID from "
461 "the primary server: %s",
462 pchomp(PQerrorMessage(conn->streamConn)))));
463 }
464
465 /*
466 * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
467 * 9.4 and onwards.
468 */
469 if (PQnfields(res) < 3 || PQntuples(res) != 1)
470 {
471 int ntuples = PQntuples(res);
472 int nfields = PQnfields(res);
473
474 PQclear(res);
476 (errcode(ERRCODE_PROTOCOL_VIOLATION),
477 errmsg("invalid response from primary server"),
478 errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
479 ntuples, nfields, 1, 3)));
480 }
481 primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
482 *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
483 PQclear(res);
484
485 return primary_sysid;
486}
487
488/*
489 * Thin wrapper around libpq to obtain server version.
490 */
491static int
493{
494 return PQserverVersion(conn->streamConn);
495}
496
497/*
498 * Get database name from the primary server's conninfo.
499 *
500 * If dbname is not found in connInfo, return NULL value.
501 */
502static char *
504{
506 char *dbname = NULL;
507 char *err = NULL;
508
509 opts = PQconninfoParse(connInfo, &err);
510 if (opts == NULL)
511 {
512 /* The error string is malloc'd, so we must free it explicitly */
513 char *errcopy = err ? pstrdup(err) : "out of memory";
514
515 PQfreemem(err);
517 (errcode(ERRCODE_SYNTAX_ERROR),
518 errmsg("invalid connection string syntax: %s", errcopy)));
519 }
520
521 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
522 {
523 /*
524 * If multiple dbnames are specified, then the last one will be
525 * returned
526 */
527 if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
528 *opt->val)
529 {
530 if (dbname)
531 pfree(dbname);
532
533 dbname = pstrdup(opt->val);
534 }
535 }
536
538 return dbname;
539}
540
541/*
542 * Start streaming WAL data from given streaming options.
543 *
544 * Returns true if we switched successfully to copy-both mode. False
545 * means the server received the command and executed it successfully, but
546 * didn't switch to copy-mode. That means that there was no WAL on the
547 * requested timeline and starting point, because the server switched to
548 * another timeline at or before the requested starting point. On failure,
549 * throws an ERROR.
550 */
551static bool
554{
555 StringInfoData cmd;
556 PGresult *res;
557
558 Assert(options->logical == conn->logical);
559 Assert(options->slotname || !options->logical);
560
561 initStringInfo(&cmd);
562
563 /* Build the command. */
564 appendStringInfoString(&cmd, "START_REPLICATION");
565 if (options->slotname != NULL)
566 appendStringInfo(&cmd, " SLOT \"%s\"",
567 options->slotname);
568
569 if (options->logical)
570 appendStringInfoString(&cmd, " LOGICAL");
571
572 appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
573
574 /*
575 * Additional options are different depending on if we are doing logical
576 * or physical replication.
577 */
578 if (options->logical)
579 {
580 char *pubnames_str;
581 List *pubnames;
582 char *pubnames_literal;
583
584 appendStringInfoString(&cmd, " (");
585
586 appendStringInfo(&cmd, "proto_version '%u'",
587 options->proto.logical.proto_version);
588
589 if (options->proto.logical.streaming_str)
590 appendStringInfo(&cmd, ", streaming '%s'",
591 options->proto.logical.streaming_str);
592
593 if (options->proto.logical.twophase &&
594 PQserverVersion(conn->streamConn) >= 150000)
595 appendStringInfoString(&cmd, ", two_phase 'on'");
596
597 if (options->proto.logical.origin &&
598 PQserverVersion(conn->streamConn) >= 160000)
599 appendStringInfo(&cmd, ", origin '%s'",
600 options->proto.logical.origin);
601
602 pubnames = options->proto.logical.publication_names;
603 pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
604 if (!pubnames_str)
606 (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
607 errmsg("could not start WAL streaming: %s",
608 pchomp(PQerrorMessage(conn->streamConn)))));
609 pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
610 strlen(pubnames_str));
611 if (!pubnames_literal)
613 (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
614 errmsg("could not start WAL streaming: %s",
615 pchomp(PQerrorMessage(conn->streamConn)))));
616 appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
617 PQfreemem(pubnames_literal);
618 pfree(pubnames_str);
619
620 if (options->proto.logical.binary &&
621 PQserverVersion(conn->streamConn) >= 140000)
622 appendStringInfoString(&cmd, ", binary 'true'");
623
624 appendStringInfoChar(&cmd, ')');
625 }
626 else
627 appendStringInfo(&cmd, " TIMELINE %u",
628 options->proto.physical.startpointTLI);
629
630 /* Start streaming. */
631 res = libpqrcv_PQexec(conn->streamConn, cmd.data);
632 pfree(cmd.data);
633
635 {
636 PQclear(res);
637 return false;
638 }
640 {
641 PQclear(res);
643 (errcode(ERRCODE_PROTOCOL_VIOLATION),
644 errmsg("could not start WAL streaming: %s",
645 pchomp(PQerrorMessage(conn->streamConn)))));
646 }
647 PQclear(res);
648 return true;
649}
650
651/*
652 * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
653 * reported by the server, or 0 if it did not report it.
654 */
655static void
657{
658 PGresult *res;
659
660 /*
661 * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
662 * block, but the risk seems small.
663 */
664 if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
665 PQflush(conn->streamConn))
667 (errcode(ERRCODE_CONNECTION_FAILURE),
668 errmsg("could not send end-of-streaming message to primary: %s",
669 pchomp(PQerrorMessage(conn->streamConn)))));
670
671 *next_tli = 0;
672
673 /*
674 * After COPY is finished, we should receive a result set indicating the
675 * next timeline's ID, or just CommandComplete if the server was shut
676 * down.
677 *
678 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
679 * also possible in case we aborted the copy in mid-stream.
680 */
681 res = libpqrcv_PQgetResult(conn->streamConn);
683 {
684 /*
685 * Read the next timeline's ID. The server also sends the timeline's
686 * starting point, but it is ignored.
687 */
688 if (PQnfields(res) < 2 || PQntuples(res) != 1)
690 (errcode(ERRCODE_PROTOCOL_VIOLATION),
691 errmsg("unexpected result set after end-of-streaming")));
692 *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
693 PQclear(res);
694
695 /* the result set should be followed by CommandComplete */
696 res = libpqrcv_PQgetResult(conn->streamConn);
697 }
698 else if (PQresultStatus(res) == PGRES_COPY_OUT)
699 {
700 PQclear(res);
701
702 /* End the copy */
703 if (PQendcopy(conn->streamConn))
705 (errcode(ERRCODE_CONNECTION_FAILURE),
706 errmsg("error while shutting down streaming COPY: %s",
707 pchomp(PQerrorMessage(conn->streamConn)))));
708
709 /* CommandComplete should follow */
710 res = libpqrcv_PQgetResult(conn->streamConn);
711 }
712
715 (errcode(ERRCODE_PROTOCOL_VIOLATION),
716 errmsg("error reading result of streaming command: %s",
717 pchomp(PQerrorMessage(conn->streamConn)))));
718 PQclear(res);
719
720 /* Verify that there are no more results */
721 res = libpqrcv_PQgetResult(conn->streamConn);
722 if (res != NULL)
724 (errcode(ERRCODE_PROTOCOL_VIOLATION),
725 errmsg("unexpected result after CommandComplete: %s",
726 pchomp(PQerrorMessage(conn->streamConn)))));
727}
728
729/*
730 * Fetch the timeline history file for 'tli' from primary.
731 */
732static void
734 TimeLineID tli, char **filename,
735 char **content, int *len)
736{
737 PGresult *res;
738 char cmd[64];
739
740 Assert(!conn->logical);
741
742 /*
743 * Request the primary to send over the history file for given timeline.
744 */
745 snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
746 res = libpqrcv_PQexec(conn->streamConn, cmd);
748 {
749 PQclear(res);
751 (errcode(ERRCODE_PROTOCOL_VIOLATION),
752 errmsg("could not receive timeline history file from "
753 "the primary server: %s",
754 pchomp(PQerrorMessage(conn->streamConn)))));
755 }
756 if (PQnfields(res) != 2 || PQntuples(res) != 1)
757 {
758 int ntuples = PQntuples(res);
759 int nfields = PQnfields(res);
760
761 PQclear(res);
763 (errcode(ERRCODE_PROTOCOL_VIOLATION),
764 errmsg("invalid response from primary server"),
765 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
766 ntuples, nfields)));
767 }
768 *filename = pstrdup(PQgetvalue(res, 0, 0));
769
770 *len = PQgetlength(res, 0, 1);
771 *content = palloc(*len);
772 memcpy(*content, PQgetvalue(res, 0, 1), *len);
773 PQclear(res);
774}
775
776/*
777 * Send a query and wait for the results by using the asynchronous libpq
778 * functions and socket readiness events.
779 *
780 * The function is modeled on libpqsrv_exec(), with the behavior difference
781 * being that it calls ProcessWalRcvInterrupts(). As an optimization, it
782 * skips try/catch, since all errors terminate the process.
783 *
784 * May return NULL, rather than an error result, on failure.
785 */
786static PGresult *
787libpqrcv_PQexec(PGconn *streamConn, const char *query)
788{
789 PGresult *lastResult = NULL;
790
791 /*
792 * PQexec() silently discards any prior query results on the connection.
793 * This is not required for this function as it's expected that the caller
794 * (which is this library in all cases) will behave correctly and we don't
795 * have to be backwards compatible with old libpq.
796 */
797
798 /*
799 * Submit the query. Since we don't use non-blocking mode, this could
800 * theoretically block. In practice, since we don't send very long query
801 * strings, the risk seems negligible.
802 */
803 if (!PQsendQuery(streamConn, query))
804 return NULL;
805
806 for (;;)
807 {
808 /* Wait for, and collect, the next PGresult. */
809 PGresult *result;
810
811 result = libpqrcv_PQgetResult(streamConn);
812 if (result == NULL)
813 break; /* query is complete, or failure */
814
815 /*
816 * Emulate PQexec()'s behavior of returning the last result when there
817 * are many. We are fine with returning just last error message.
818 */
819 PQclear(lastResult);
820 lastResult = result;
821
822 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
823 PQresultStatus(lastResult) == PGRES_COPY_OUT ||
824 PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
825 PQstatus(streamConn) == CONNECTION_BAD)
826 break;
827 }
828
829 return lastResult;
830}
831
832/*
833 * Perform the equivalent of PQgetResult(), but watch for interrupts.
834 */
835static PGresult *
837{
838 /*
839 * Collect data until PQgetResult is ready to get the result without
840 * blocking.
841 */
842 while (PQisBusy(streamConn))
843 {
844 int rc;
845
846 /*
847 * We don't need to break down the sleep into smaller increments,
848 * since we'll get interrupted by signals and can handle any
849 * interrupts here.
850 */
854 PQsocket(streamConn),
855 0,
856 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
857
858 /* Interrupted? */
859 if (rc & WL_LATCH_SET)
860 {
863 }
864
865 /* Consume whatever data is available from the socket */
866 if (PQconsumeInput(streamConn) == 0)
867 {
868 /* trouble; return NULL */
869 return NULL;
870 }
871 }
872
873 /* Now we can collect and return the next PGresult */
874 return PQgetResult(streamConn);
875}
876
877/*
878 * Disconnect connection to primary, if any.
879 */
880static void
882{
883 PQfinish(conn->streamConn);
884 PQfreemem(conn->recvBuf);
885 pfree(conn);
886}
887
888/*
889 * Receive a message available from XLOG stream.
890 *
891 * Returns:
892 *
893 * If data was received, returns the length of the data. *buffer is set to
894 * point to a buffer holding the received message. The buffer is only valid
895 * until the next libpqrcv_* call.
896 *
897 * If no data was available immediately, returns 0, and *wait_fd is set to a
898 * socket descriptor which can be waited on before trying again.
899 *
900 * -1 if the server ended the COPY.
901 *
902 * ereports on error.
903 */
904static int
906 pgsocket *wait_fd)
907{
908 int rawlen;
909
910 PQfreemem(conn->recvBuf);
911 conn->recvBuf = NULL;
912
913 /* Try to receive a CopyData message */
914 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
915 if (rawlen == 0)
916 {
917 /* Try consuming some data. */
918 if (PQconsumeInput(conn->streamConn) == 0)
920 (errcode(ERRCODE_CONNECTION_FAILURE),
921 errmsg("could not receive data from WAL stream: %s",
922 pchomp(PQerrorMessage(conn->streamConn)))));
923
924 /* Now that we've consumed some input, try again */
925 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
926 if (rawlen == 0)
927 {
928 /* Tell caller to try again when our socket is ready. */
929 *wait_fd = PQsocket(conn->streamConn);
930 return 0;
931 }
932 }
933 if (rawlen == -1) /* end-of-streaming or error */
934 {
935 PGresult *res;
936
937 res = libpqrcv_PQgetResult(conn->streamConn);
939 {
940 PQclear(res);
941
942 /* Verify that there are no more results. */
943 res = libpqrcv_PQgetResult(conn->streamConn);
944 if (res != NULL)
945 {
946 PQclear(res);
947
948 /*
949 * If the other side closed the connection orderly (otherwise
950 * we'd seen an error, or PGRES_COPY_IN) don't report an error
951 * here, but let callers deal with it.
952 */
953 if (PQstatus(conn->streamConn) == CONNECTION_BAD)
954 return -1;
955
957 (errcode(ERRCODE_PROTOCOL_VIOLATION),
958 errmsg("unexpected result after CommandComplete: %s",
959 PQerrorMessage(conn->streamConn))));
960 }
961
962 return -1;
963 }
964 else if (PQresultStatus(res) == PGRES_COPY_IN)
965 {
966 PQclear(res);
967 return -1;
968 }
969 else
970 {
971 PQclear(res);
973 (errcode(ERRCODE_PROTOCOL_VIOLATION),
974 errmsg("could not receive data from WAL stream: %s",
975 pchomp(PQerrorMessage(conn->streamConn)))));
976 }
977 }
978 if (rawlen < -1)
980 (errcode(ERRCODE_PROTOCOL_VIOLATION),
981 errmsg("could not receive data from WAL stream: %s",
982 pchomp(PQerrorMessage(conn->streamConn)))));
983
984 /* Return received messages to caller */
985 *buffer = conn->recvBuf;
986 return rawlen;
987}
988
989/*
990 * Send a message to XLOG stream.
991 *
992 * ereports on error.
993 */
994static void
995libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
996{
997 if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
998 PQflush(conn->streamConn))
1000 (errcode(ERRCODE_CONNECTION_FAILURE),
1001 errmsg("could not send data to WAL stream: %s",
1002 pchomp(PQerrorMessage(conn->streamConn)))));
1003}
1004
1005/*
1006 * Create new replication slot.
1007 * Returns the name of the exported snapshot for logical slot or NULL for
1008 * physical slot.
1009 */
1010static char *
1012 bool temporary, bool two_phase, bool failover,
1013 CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
1014{
1015 PGresult *res;
1016 StringInfoData cmd;
1017 char *snapshot;
1018 int use_new_options_syntax;
1019
1020 use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
1021
1022 initStringInfo(&cmd);
1023
1024 appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
1025
1026 if (temporary)
1027 appendStringInfoString(&cmd, " TEMPORARY");
1028
1029 if (conn->logical)
1030 {
1031 appendStringInfoString(&cmd, " LOGICAL pgoutput ");
1032 if (use_new_options_syntax)
1033 appendStringInfoChar(&cmd, '(');
1034 if (two_phase)
1035 {
1036 appendStringInfoString(&cmd, "TWO_PHASE");
1037 if (use_new_options_syntax)
1038 appendStringInfoString(&cmd, ", ");
1039 else
1040 appendStringInfoChar(&cmd, ' ');
1041 }
1042
1043 if (failover)
1044 {
1045 appendStringInfoString(&cmd, "FAILOVER");
1046 if (use_new_options_syntax)
1047 appendStringInfoString(&cmd, ", ");
1048 else
1049 appendStringInfoChar(&cmd, ' ');
1050 }
1051
1052 if (use_new_options_syntax)
1053 {
1054 switch (snapshot_action)
1055 {
1057 appendStringInfoString(&cmd, "SNAPSHOT 'export'");
1058 break;
1060 appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
1061 break;
1062 case CRS_USE_SNAPSHOT:
1063 appendStringInfoString(&cmd, "SNAPSHOT 'use'");
1064 break;
1065 }
1066 }
1067 else
1068 {
1069 switch (snapshot_action)
1070 {
1072 appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
1073 break;
1075 appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
1076 break;
1077 case CRS_USE_SNAPSHOT:
1078 appendStringInfoString(&cmd, "USE_SNAPSHOT");
1079 break;
1080 }
1081 }
1082
1083 if (use_new_options_syntax)
1084 appendStringInfoChar(&cmd, ')');
1085 }
1086 else
1087 {
1088 if (use_new_options_syntax)
1089 appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
1090 else
1091 appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
1092 }
1093
1094 res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1095 pfree(cmd.data);
1096
1098 {
1099 PQclear(res);
1100 ereport(ERROR,
1101 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1102 errmsg("could not create replication slot \"%s\": %s",
1103 slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1104 }
1105
1106 if (lsn)
1108 CStringGetDatum(PQgetvalue(res, 0, 1))));
1109
1110 if (!PQgetisnull(res, 0, 2))
1111 snapshot = pstrdup(PQgetvalue(res, 0, 2));
1112 else
1113 snapshot = NULL;
1114
1115 PQclear(res);
1116
1117 return snapshot;
1118}
1119
1120/*
1121 * Change the definition of the replication slot.
1122 */
1123static void
1125 const bool *failover, const bool *two_phase)
1126{
1127 StringInfoData cmd;
1128 PGresult *res;
1129
1130 initStringInfo(&cmd);
1131 appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
1132 quote_identifier(slotname));
1133
1134 if (failover)
1135 appendStringInfo(&cmd, "FAILOVER %s",
1136 *failover ? "true" : "false");
1137
1138 if (failover && two_phase)
1139 appendStringInfo(&cmd, ", ");
1140
1141 if (two_phase)
1142 appendStringInfo(&cmd, "TWO_PHASE %s",
1143 *two_phase ? "true" : "false");
1144
1145 appendStringInfoString(&cmd, " );");
1146
1147 res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1148 pfree(cmd.data);
1149
1151 ereport(ERROR,
1152 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1153 errmsg("could not alter replication slot \"%s\": %s",
1154 slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1155
1156 PQclear(res);
1157}
1158
1159/*
1160 * Return PID of remote backend process.
1161 */
1162static pid_t
1164{
1165 return PQbackendPID(conn->streamConn);
1166}
1167
1168/*
1169 * Convert tuple query result to tuplestore.
1170 */
1171static void
1173 const int nRetTypes, const Oid *retTypes)
1174{
1175 int tupn;
1176 int coln;
1177 int nfields = PQnfields(pgres);
1178 HeapTuple tuple;
1179 AttInMetadata *attinmeta;
1180 MemoryContext rowcontext;
1181 MemoryContext oldcontext;
1182
1183 /* Make sure we got expected number of fields. */
1184 if (nfields != nRetTypes)
1185 ereport(ERROR,
1186 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1187 errmsg("invalid query response"),
1188 errdetail("Expected %d fields, got %d fields.",
1189 nRetTypes, nfields)));
1190
1191 walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1192
1193 /* Create tuple descriptor corresponding to expected result. */
1194 walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1195 for (coln = 0; coln < nRetTypes; coln++)
1196 TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1197 PQfname(pgres, coln), retTypes[coln], -1, 0);
1198 attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1199
1200 /* No point in doing more here if there were no tuples returned. */
1201 if (PQntuples(pgres) == 0)
1202 return;
1203
1204 /* Create temporary context for local allocations. */
1206 "libpqrcv query result context",
1208
1209 /* Process returned rows. */
1210 for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1211 {
1212 char *cstrs[MaxTupleAttributeNumber];
1213
1215
1216 /* Do the allocations in temporary context. */
1217 oldcontext = MemoryContextSwitchTo(rowcontext);
1218
1219 /*
1220 * Fill cstrs with null-terminated strings of column values.
1221 */
1222 for (coln = 0; coln < nfields; coln++)
1223 {
1224 if (PQgetisnull(pgres, tupn, coln))
1225 cstrs[coln] = NULL;
1226 else
1227 cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1228 }
1229
1230 /* Convert row to a tuple, and add it to the tuplestore */
1231 tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1232 tuplestore_puttuple(walres->tuplestore, tuple);
1233
1234 /* Clean up */
1235 MemoryContextSwitchTo(oldcontext);
1236 MemoryContextReset(rowcontext);
1237 }
1238
1239 MemoryContextDelete(rowcontext);
1240}
1241
1242/*
1243 * Public interface for sending generic queries (and commands).
1244 *
1245 * This can only be called from process connected to database.
1246 */
1247static WalRcvExecResult *
1249 const int nRetTypes, const Oid *retTypes)
1250{
1251 PGresult *pgres = NULL;
1252 WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1253 char *diag_sqlstate;
1254
1255 if (MyDatabaseId == InvalidOid)
1256 ereport(ERROR,
1257 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1258 errmsg("the query interface requires a database connection")));
1259
1260 pgres = libpqrcv_PQexec(conn->streamConn, query);
1261
1262 switch (PQresultStatus(pgres))
1263 {
1264 case PGRES_TUPLES_OK:
1265 case PGRES_SINGLE_TUPLE:
1266 case PGRES_TUPLES_CHUNK:
1267 walres->status = WALRCV_OK_TUPLES;
1268 libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1269 break;
1270
1271 case PGRES_COPY_IN:
1272 walres->status = WALRCV_OK_COPY_IN;
1273 break;
1274
1275 case PGRES_COPY_OUT:
1276 walres->status = WALRCV_OK_COPY_OUT;
1277 break;
1278
1279 case PGRES_COPY_BOTH:
1280 walres->status = WALRCV_OK_COPY_BOTH;
1281 break;
1282
1283 case PGRES_COMMAND_OK:
1284 walres->status = WALRCV_OK_COMMAND;
1285 break;
1286
1287 /* Empty query is considered error. */
1288 case PGRES_EMPTY_QUERY:
1289 walres->status = WALRCV_ERROR;
1290 walres->err = _("empty query");
1291 break;
1292
1295 walres->status = WALRCV_ERROR;
1296 walres->err = _("unexpected pipeline mode");
1297 break;
1298
1300 case PGRES_FATAL_ERROR:
1301 case PGRES_BAD_RESPONSE:
1302 walres->status = WALRCV_ERROR;
1303 walres->err = pchomp(PQerrorMessage(conn->streamConn));
1304 diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1305 if (diag_sqlstate)
1306 walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1307 diag_sqlstate[1],
1308 diag_sqlstate[2],
1309 diag_sqlstate[3],
1310 diag_sqlstate[4]);
1311 break;
1312 }
1313
1314 PQclear(pgres);
1315
1316 return walres;
1317}
1318
1319/*
1320 * Given a List of strings, return it as single comma separated
1321 * string, quoting identifiers as needed.
1322 *
1323 * This is essentially the reverse of SplitIdentifierString.
1324 *
1325 * The caller should free the result.
1326 */
1327static char *
1329{
1330 ListCell *lc;
1332 bool first = true;
1333
1335
1336 foreach(lc, strings)
1337 {
1338 char *val = strVal(lfirst(lc));
1339 char *val_escaped;
1340
1341 if (first)
1342 first = false;
1343 else
1345
1346 val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1347 if (!val_escaped)
1348 {
1349 free(res.data);
1350 return NULL;
1351 }
1352 appendStringInfoString(&res, val_escaped);
1353 PQfreemem(val_escaped);
1354 }
1355
1356 return res.data;
1357}
int16 AttrNumber
Definition: attnum.h:21
#define Assert(condition)
Definition: c.h:815
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define _(x)
Definition: elog.c:90
#define ERROR
Definition: elog.h:39
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
void err(int eval, const char *fmt,...)
Definition: err.c:43
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2322
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2273
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7497
char * PQport(const PGconn *conn)
Definition: fe-connect.c:7412
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:7376
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7575
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:7278
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2818
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7322
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:6038
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7444
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:819
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5178
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7543
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7507
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7533
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3887
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2949
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2749
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2695
char * PQfname(const PGresult *res, int field_num)
Definition: fe-exec.c:3567
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4363
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3901
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4369
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2816
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:792
int work_mem
Definition: globals.c:130
struct Latch * MyLatch
Definition: globals.c:62
Oid MyDatabaseId
Definition: globals.c:93
#define free(a)
Definition: header.h:65
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
long val
Definition: informix.c:689
int i
Definition: isn.c:72
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
#define WL_SOCKET_CONNECTED
Definition: latch.h:137
#define WL_SOCKET_WRITEABLE
Definition: latch.h:129
@ CONNECTION_STARTED
Definition: libpq-fe.h:89
@ CONNECTION_BAD
Definition: libpq-fe.h:82
@ CONNECTION_OK
Definition: libpq-fe.h:81
@ PGRES_COPY_IN
Definition: libpq-fe.h:129
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:134
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:122
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:139
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:133
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:135
@ PGRES_COPY_OUT
Definition: libpq-fe.h:128
@ PGRES_EMPTY_QUERY
Definition: libpq-fe.h:121
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:136
@ PGRES_BAD_RESPONSE
Definition: libpq-fe.h:130
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:137
@ PGRES_NONFATAL_ERROR
Definition: libpq-fe.h:132
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:125
PostgresPollingStatusType
Definition: libpq-fe.h:111
@ PGRES_POLLING_OK
Definition: libpq-fe.h:115
@ PGRES_POLLING_READING
Definition: libpq-fe.h:113
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:114
@ PGRES_POLLING_FAILED
Definition: libpq-fe.h:112
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err)
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
void _PG_init(void)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
PG_MODULE_MAGIC
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
static int libpqrcv_server_version(WalReceiverConn *conn)
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, const bool *failover, const bool *two_phase)
static WalReceiverFunctionsType PQWalReceiverFunctions
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
static char * libpqrcv_get_dbname_from_conninfo(const char *connInfo)
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1267
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
char * pchomp(const char *in)
Definition: mcxt.c:1724
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
int32 pg_strtoint32(const char *s)
Definition: numutils.c:383
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
static AmcheckOptions opts
Definition: pg_amcheck.c:112
const void size_t len
static char * filename
Definition: pg_dumpall.c:119
#define lfirst(lc)
Definition: pg_list.h:172
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:63
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
static bool two_phase
static char * buf
Definition: pg_test_fsync.c:72
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:239
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:355
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12940
char * dbname
Definition: streamutil.c:50
PGconn * conn
Definition: streamutil.c:53
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: pg_list.h:54
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
walrcv_connect_fn walrcv_connect
Definition: walreceiver.h:414
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:164
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:801
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:330
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:764
#define strVal(v)
Definition: value.h:82
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:162
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:93
@ WALRCV_OK_COPY_IN
Definition: walreceiver.h:208
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_ERROR
Definition: walreceiver.h:204
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
@ WALRCV_OK_COPY_BOTH
Definition: walreceiver.h:210
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59