PostgreSQL Source Code  git master
libpqwalreceiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpqwalreceiver.c
4  *
5  * This file contains the libpq-specific parts of walreceiver. It's
6  * loaded as a dynamic module to avoid linking the main server binary with
7  * libpq.
8  *
9  * Apart from walreceiver, the libpq-specific routines are now being used by
10  * logical replication workers and slot synchronization.
11  *
12  * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
13  *
14  *
15  * IDENTIFICATION
16  * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21 
22 #include <unistd.h>
23 #include <sys/time.h>
24 
25 #include "common/connect.h"
26 #include "funcapi.h"
27 #include "libpq-fe.h"
28 #include "mb/pg_wchar.h"
29 #include "miscadmin.h"
30 #include "pgstat.h"
31 #include "pqexpbuffer.h"
33 #include "utils/builtins.h"
34 #include "utils/memutils.h"
35 #include "utils/pg_lsn.h"
36 #include "utils/tuplestore.h"
37 
39 
41 {
42  /* Current connection to the primary, if any */
44  /* Used to remember if the connection is logical or physical */
45  bool logical;
46  /* Buffer for currently read records */
47  char *recvBuf;
48 };
49 
50 /* Prototypes for interface functions */
51 static WalReceiverConn *libpqrcv_connect(const char *conninfo,
52  bool replication, bool logical,
53  bool must_use_password,
54  const char *appname, char **err);
55 static void libpqrcv_check_conninfo(const char *conninfo,
56  bool must_use_password);
59  char **sender_host, int *sender_port);
61  TimeLineID *primary_tli);
62 static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo);
65  TimeLineID tli, char **filename,
66  char **content, int *len);
70  TimeLineID *next_tli);
71 static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
72  pgsocket *wait_fd);
73 static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
74  int nbytes);
76  const char *slotname,
77  bool temporary,
78  bool two_phase,
79  bool failover,
80  CRSSnapshotAction snapshot_action,
81  XLogRecPtr *lsn);
82 static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
83  bool failover);
86  const char *query,
87  const int nRetTypes,
88  const Oid *retTypes);
90 
93  .walrcv_check_conninfo = libpqrcv_check_conninfo,
94  .walrcv_get_conninfo = libpqrcv_get_conninfo,
95  .walrcv_get_senderinfo = libpqrcv_get_senderinfo,
96  .walrcv_identify_system = libpqrcv_identify_system,
97  .walrcv_server_version = libpqrcv_server_version,
98  .walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
99  .walrcv_startstreaming = libpqrcv_startstreaming,
100  .walrcv_endstreaming = libpqrcv_endstreaming,
101  .walrcv_receive = libpqrcv_receive,
102  .walrcv_send = libpqrcv_send,
103  .walrcv_create_slot = libpqrcv_create_slot,
104  .walrcv_alter_slot = libpqrcv_alter_slot,
105  .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
106  .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
107  .walrcv_exec = libpqrcv_exec,
108  .walrcv_disconnect = libpqrcv_disconnect
109 };
110 
111 /* Prototypes for private functions */
112 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
113 static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
114 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
115 
116 /*
117  * Module initialization function
118  */
119 void
120 _PG_init(void)
121 {
122  if (WalReceiverFunctions != NULL)
123  elog(ERROR, "libpqwalreceiver already loaded");
125 }
126 
127 /*
128  * Establish the connection to the primary server.
129  *
130  * This function can be used for both replication and regular connections.
131  * If it is a replication connection, it could be either logical or physical
132  * based on input argument 'logical'.
133  *
134  * If an error occurs, this function will normally return NULL and set *err
135  * to a palloc'ed error message. However, if must_use_password is true and
136  * the connection fails to use the password, this function will ereport(ERROR).
137  * We do this because in that case the error includes a detail and a hint for
138  * consistency with other parts of the system, and it's not worth adding the
139  * machinery to pass all of those back to the caller just to cover this one
140  * case.
141  */
142 static WalReceiverConn *
143 libpqrcv_connect(const char *conninfo, bool replication, bool logical,
144  bool must_use_password, const char *appname, char **err)
145 {
148  const char *keys[6];
149  const char *vals[6];
150  int i = 0;
151 
152  /*
153  * Re-validate connection string. The validation already happened at DDL
154  * time, but the subscription owner may have changed. If we don't recheck
155  * with the correct must_use_password, it's possible that the connection
156  * will obtain the password from a different source, such as PGPASSFILE or
157  * PGPASSWORD.
158  */
159  libpqrcv_check_conninfo(conninfo, must_use_password);
160 
161  /*
162  * We use the expand_dbname parameter to process the connection string (or
163  * URI), and pass some extra options.
164  */
165  keys[i] = "dbname";
166  vals[i] = conninfo;
167 
168  /* We can not have logical without replication */
169  Assert(replication || !logical);
170 
171  if (replication)
172  {
173  keys[++i] = "replication";
174  vals[i] = logical ? "database" : "true";
175 
176  if (logical)
177  {
178  /* Tell the publisher to translate to our encoding */
179  keys[++i] = "client_encoding";
180  vals[i] = GetDatabaseEncodingName();
181 
182  /*
183  * Force assorted GUC parameters to settings that ensure that the
184  * publisher will output data values in a form that is unambiguous
185  * to the subscriber. (We don't want to modify the subscriber's
186  * GUC settings, since that might surprise user-defined code
187  * running in the subscriber, such as triggers.) This should
188  * match what pg_dump does.
189  */
190  keys[++i] = "options";
191  vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
192  }
193  else
194  {
195  /*
196  * The database name is ignored by the server in replication mode,
197  * but specify "replication" for .pgpass lookup.
198  */
199  keys[++i] = "dbname";
200  vals[i] = "replication";
201  }
202  }
203 
204  keys[++i] = "fallback_application_name";
205  vals[i] = appname;
206 
207  keys[++i] = NULL;
208  vals[i] = NULL;
209 
210  Assert(i < sizeof(keys));
211 
212  conn = palloc0(sizeof(WalReceiverConn));
213  conn->streamConn = PQconnectStartParams(keys, vals,
214  /* expand_dbname = */ true);
215  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
216  goto bad_connection_errmsg;
217 
218  /*
219  * Poll connection until we have OK or FAILED status.
220  *
221  * Per spec for PQconnectPoll, first wait till socket is write-ready.
222  */
223  status = PGRES_POLLING_WRITING;
224  do
225  {
226  int io_flag;
227  int rc;
228 
229  if (status == PGRES_POLLING_READING)
230  io_flag = WL_SOCKET_READABLE;
231 #ifdef WIN32
232  /* Windows needs a different test while waiting for connection-made */
233  else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
234  io_flag = WL_SOCKET_CONNECTED;
235 #endif
236  else
237  io_flag = WL_SOCKET_WRITEABLE;
238 
240  WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
241  PQsocket(conn->streamConn),
242  0,
243  WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
244 
245  /* Interrupted? */
246  if (rc & WL_LATCH_SET)
247  {
250  }
251 
252  /* If socket is ready, advance the libpq state machine */
253  if (rc & io_flag)
254  status = PQconnectPoll(conn->streamConn);
255  } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
256 
257  if (PQstatus(conn->streamConn) != CONNECTION_OK)
258  goto bad_connection_errmsg;
259 
260  if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
261  {
262  PQfinish(conn->streamConn);
263  pfree(conn);
264 
265  ereport(ERROR,
266  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
267  errmsg("password is required"),
268  errdetail("Non-superuser cannot connect if the server does not request a password."),
269  errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
270  }
271 
272  /*
273  * Set always-secure search path for the cases where the connection is
274  * used to run SQL queries, so malicious users can't get control.
275  */
276  if (!replication || logical)
277  {
278  PGresult *res;
279 
280  res = libpqrcv_PQexec(conn->streamConn,
283  {
284  PQclear(res);
285  *err = psprintf(_("could not clear search path: %s"),
286  pchomp(PQerrorMessage(conn->streamConn)));
287  goto bad_connection;
288  }
289  PQclear(res);
290  }
291 
292  conn->logical = logical;
293 
294  return conn;
295 
296  /* error path, using libpq's error message */
297 bad_connection_errmsg:
298  *err = pchomp(PQerrorMessage(conn->streamConn));
299 
300  /* error path, error already set */
301 bad_connection:
302  PQfinish(conn->streamConn);
303  pfree(conn);
304  return NULL;
305 }
306 
307 /*
308  * Validate connection info string, and determine whether it might cause
309  * local filesystem access to be attempted.
310  *
311  * If the connection string can't be parsed, this function will raise
312  * an error and will not return. If it can, it will return true if this
313  * connection string specifies a password and false otherwise.
314  */
315 static void
316 libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
317 {
318  PQconninfoOption *opts = NULL;
319  PQconninfoOption *opt;
320  char *err = NULL;
321 
322  opts = PQconninfoParse(conninfo, &err);
323  if (opts == NULL)
324  {
325  /* The error string is malloc'd, so we must free it explicitly */
326  char *errcopy = err ? pstrdup(err) : "out of memory";
327 
328  PQfreemem(err);
329  ereport(ERROR,
330  (errcode(ERRCODE_SYNTAX_ERROR),
331  errmsg("invalid connection string syntax: %s", errcopy)));
332  }
333 
334  if (must_use_password)
335  {
336  bool uses_password = false;
337 
338  for (opt = opts; opt->keyword != NULL; ++opt)
339  {
340  /* Ignore connection options that are not present. */
341  if (opt->val == NULL)
342  continue;
343 
344  if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
345  {
346  uses_password = true;
347  break;
348  }
349  }
350 
351  if (!uses_password)
352  {
353  /* malloc'd, so we must free it explicitly */
355 
356  ereport(ERROR,
357  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
358  errmsg("password is required"),
359  errdetail("Non-superusers must provide a password in the connection string.")));
360  }
361  }
362 
364 }
365 
366 /*
367  * Return a user-displayable conninfo string. Any security-sensitive fields
368  * are obfuscated.
369  */
370 static char *
372 {
373  PQconninfoOption *conn_opts;
374  PQconninfoOption *conn_opt;
376  char *retval;
377 
378  Assert(conn->streamConn != NULL);
379 
381  conn_opts = PQconninfo(conn->streamConn);
382 
383  if (conn_opts == NULL)
384  ereport(ERROR,
385  (errcode(ERRCODE_OUT_OF_MEMORY),
386  errmsg("could not parse connection string: %s",
387  _("out of memory"))));
388 
389  /* build a clean connection string from pieces */
390  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
391  {
392  bool obfuscate;
393 
394  /* Skip debug and empty options */
395  if (strchr(conn_opt->dispchar, 'D') ||
396  conn_opt->val == NULL ||
397  conn_opt->val[0] == '\0')
398  continue;
399 
400  /* Obfuscate security-sensitive options */
401  obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
402 
403  appendPQExpBuffer(&buf, "%s%s=%s",
404  buf.len == 0 ? "" : " ",
405  conn_opt->keyword,
406  obfuscate ? "********" : conn_opt->val);
407  }
408 
409  PQconninfoFree(conn_opts);
410 
411  retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
413  return retval;
414 }
415 
416 /*
417  * Provides information of sender this WAL receiver is connected to.
418  */
419 static void
421  int *sender_port)
422 {
423  char *ret = NULL;
424 
425  *sender_host = NULL;
426  *sender_port = 0;
427 
428  Assert(conn->streamConn != NULL);
429 
430  ret = PQhost(conn->streamConn);
431  if (ret && strlen(ret) != 0)
432  *sender_host = pstrdup(ret);
433 
434  ret = PQport(conn->streamConn);
435  if (ret && strlen(ret) != 0)
436  *sender_port = atoi(ret);
437 }
438 
439 /*
440  * Check that primary's system identifier matches ours, and fetch the current
441  * timeline ID of the primary.
442  */
443 static char *
445 {
446  PGresult *res;
447  char *primary_sysid;
448 
449  /*
450  * Get the system identifier and timeline ID as a DataRow message from the
451  * primary server.
452  */
453  res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
455  {
456  PQclear(res);
457  ereport(ERROR,
458  (errcode(ERRCODE_PROTOCOL_VIOLATION),
459  errmsg("could not receive database system identifier and timeline ID from "
460  "the primary server: %s",
461  pchomp(PQerrorMessage(conn->streamConn)))));
462  }
463 
464  /*
465  * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
466  * 9.4 and onwards.
467  */
468  if (PQnfields(res) < 3 || PQntuples(res) != 1)
469  {
470  int ntuples = PQntuples(res);
471  int nfields = PQnfields(res);
472 
473  PQclear(res);
474  ereport(ERROR,
475  (errcode(ERRCODE_PROTOCOL_VIOLATION),
476  errmsg("invalid response from primary server"),
477  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
478  ntuples, nfields, 1, 3)));
479  }
480  primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
481  *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
482  PQclear(res);
483 
484  return primary_sysid;
485 }
486 
487 /*
488  * Thin wrapper around libpq to obtain server version.
489  */
490 static int
492 {
493  return PQserverVersion(conn->streamConn);
494 }
495 
496 /*
497  * Get database name from the primary server's conninfo.
498  *
499  * If dbname is not found in connInfo, return NULL value.
500  */
501 static char *
503 {
505  char *dbname = NULL;
506  char *err = NULL;
507 
508  opts = PQconninfoParse(connInfo, &err);
509  if (opts == NULL)
510  {
511  /* The error string is malloc'd, so we must free it explicitly */
512  char *errcopy = err ? pstrdup(err) : "out of memory";
513 
514  PQfreemem(err);
515  ereport(ERROR,
516  (errcode(ERRCODE_SYNTAX_ERROR),
517  errmsg("invalid connection string syntax: %s", errcopy)));
518  }
519 
520  for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
521  {
522  /*
523  * If multiple dbnames are specified, then the last one will be
524  * returned
525  */
526  if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
527  *opt->val)
528  {
529  if (dbname)
530  pfree(dbname);
531 
532  dbname = pstrdup(opt->val);
533  }
534  }
535 
537  return dbname;
538 }
539 
540 /*
541  * Start streaming WAL data from given streaming options.
542  *
543  * Returns true if we switched successfully to copy-both mode. False
544  * means the server received the command and executed it successfully, but
545  * didn't switch to copy-mode. That means that there was no WAL on the
546  * requested timeline and starting point, because the server switched to
547  * another timeline at or before the requested starting point. On failure,
548  * throws an ERROR.
549  */
550 static bool
553 {
554  StringInfoData cmd;
555  PGresult *res;
556 
557  Assert(options->logical == conn->logical);
558  Assert(options->slotname || !options->logical);
559 
560  initStringInfo(&cmd);
561 
562  /* Build the command. */
563  appendStringInfoString(&cmd, "START_REPLICATION");
564  if (options->slotname != NULL)
565  appendStringInfo(&cmd, " SLOT \"%s\"",
566  options->slotname);
567 
568  if (options->logical)
569  appendStringInfoString(&cmd, " LOGICAL");
570 
571  appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
572 
573  /*
574  * Additional options are different depending on if we are doing logical
575  * or physical replication.
576  */
577  if (options->logical)
578  {
579  char *pubnames_str;
580  List *pubnames;
581  char *pubnames_literal;
582 
583  appendStringInfoString(&cmd, " (");
584 
585  appendStringInfo(&cmd, "proto_version '%u'",
586  options->proto.logical.proto_version);
587 
588  if (options->proto.logical.streaming_str)
589  appendStringInfo(&cmd, ", streaming '%s'",
590  options->proto.logical.streaming_str);
591 
592  if (options->proto.logical.twophase &&
593  PQserverVersion(conn->streamConn) >= 150000)
594  appendStringInfoString(&cmd, ", two_phase 'on'");
595 
596  if (options->proto.logical.origin &&
597  PQserverVersion(conn->streamConn) >= 160000)
598  appendStringInfo(&cmd, ", origin '%s'",
599  options->proto.logical.origin);
600 
601  pubnames = options->proto.logical.publication_names;
602  pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
603  if (!pubnames_str)
604  ereport(ERROR,
605  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
606  errmsg("could not start WAL streaming: %s",
607  pchomp(PQerrorMessage(conn->streamConn)))));
608  pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
609  strlen(pubnames_str));
610  if (!pubnames_literal)
611  ereport(ERROR,
612  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
613  errmsg("could not start WAL streaming: %s",
614  pchomp(PQerrorMessage(conn->streamConn)))));
615  appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
616  PQfreemem(pubnames_literal);
617  pfree(pubnames_str);
618 
619  if (options->proto.logical.binary &&
620  PQserverVersion(conn->streamConn) >= 140000)
621  appendStringInfoString(&cmd, ", binary 'true'");
622 
623  appendStringInfoChar(&cmd, ')');
624  }
625  else
626  appendStringInfo(&cmd, " TIMELINE %u",
627  options->proto.physical.startpointTLI);
628 
629  /* Start streaming. */
630  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
631  pfree(cmd.data);
632 
634  {
635  PQclear(res);
636  return false;
637  }
638  else if (PQresultStatus(res) != PGRES_COPY_BOTH)
639  {
640  PQclear(res);
641  ereport(ERROR,
642  (errcode(ERRCODE_PROTOCOL_VIOLATION),
643  errmsg("could not start WAL streaming: %s",
644  pchomp(PQerrorMessage(conn->streamConn)))));
645  }
646  PQclear(res);
647  return true;
648 }
649 
650 /*
651  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
652  * reported by the server, or 0 if it did not report it.
653  */
654 static void
656 {
657  PGresult *res;
658 
659  /*
660  * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
661  * block, but the risk seems small.
662  */
663  if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
664  PQflush(conn->streamConn))
665  ereport(ERROR,
666  (errcode(ERRCODE_CONNECTION_FAILURE),
667  errmsg("could not send end-of-streaming message to primary: %s",
668  pchomp(PQerrorMessage(conn->streamConn)))));
669 
670  *next_tli = 0;
671 
672  /*
673  * After COPY is finished, we should receive a result set indicating the
674  * next timeline's ID, or just CommandComplete if the server was shut
675  * down.
676  *
677  * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
678  * also possible in case we aborted the copy in mid-stream.
679  */
680  res = libpqrcv_PQgetResult(conn->streamConn);
682  {
683  /*
684  * Read the next timeline's ID. The server also sends the timeline's
685  * starting point, but it is ignored.
686  */
687  if (PQnfields(res) < 2 || PQntuples(res) != 1)
688  ereport(ERROR,
689  (errcode(ERRCODE_PROTOCOL_VIOLATION),
690  errmsg("unexpected result set after end-of-streaming")));
691  *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
692  PQclear(res);
693 
694  /* the result set should be followed by CommandComplete */
695  res = libpqrcv_PQgetResult(conn->streamConn);
696  }
697  else if (PQresultStatus(res) == PGRES_COPY_OUT)
698  {
699  PQclear(res);
700 
701  /* End the copy */
702  if (PQendcopy(conn->streamConn))
703  ereport(ERROR,
704  (errcode(ERRCODE_CONNECTION_FAILURE),
705  errmsg("error while shutting down streaming COPY: %s",
706  pchomp(PQerrorMessage(conn->streamConn)))));
707 
708  /* CommandComplete should follow */
709  res = libpqrcv_PQgetResult(conn->streamConn);
710  }
711 
713  ereport(ERROR,
714  (errcode(ERRCODE_PROTOCOL_VIOLATION),
715  errmsg("error reading result of streaming command: %s",
716  pchomp(PQerrorMessage(conn->streamConn)))));
717  PQclear(res);
718 
719  /* Verify that there are no more results */
720  res = libpqrcv_PQgetResult(conn->streamConn);
721  if (res != NULL)
722  ereport(ERROR,
723  (errcode(ERRCODE_PROTOCOL_VIOLATION),
724  errmsg("unexpected result after CommandComplete: %s",
725  pchomp(PQerrorMessage(conn->streamConn)))));
726 }
727 
728 /*
729  * Fetch the timeline history file for 'tli' from primary.
730  */
731 static void
733  TimeLineID tli, char **filename,
734  char **content, int *len)
735 {
736  PGresult *res;
737  char cmd[64];
738 
739  Assert(!conn->logical);
740 
741  /*
742  * Request the primary to send over the history file for given timeline.
743  */
744  snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
745  res = libpqrcv_PQexec(conn->streamConn, cmd);
747  {
748  PQclear(res);
749  ereport(ERROR,
750  (errcode(ERRCODE_PROTOCOL_VIOLATION),
751  errmsg("could not receive timeline history file from "
752  "the primary server: %s",
753  pchomp(PQerrorMessage(conn->streamConn)))));
754  }
755  if (PQnfields(res) != 2 || PQntuples(res) != 1)
756  {
757  int ntuples = PQntuples(res);
758  int nfields = PQnfields(res);
759 
760  PQclear(res);
761  ereport(ERROR,
762  (errcode(ERRCODE_PROTOCOL_VIOLATION),
763  errmsg("invalid response from primary server"),
764  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
765  ntuples, nfields)));
766  }
767  *filename = pstrdup(PQgetvalue(res, 0, 0));
768 
769  *len = PQgetlength(res, 0, 1);
770  *content = palloc(*len);
771  memcpy(*content, PQgetvalue(res, 0, 1), *len);
772  PQclear(res);
773 }
774 
775 /*
776  * Send a query and wait for the results by using the asynchronous libpq
777  * functions and socket readiness events.
778  *
779  * The function is modeled on libpqsrv_exec(), with the behavior difference
780  * being that it calls ProcessWalRcvInterrupts(). As an optimization, it
781  * skips try/catch, since all errors terminate the process.
782  *
783  * May return NULL, rather than an error result, on failure.
784  */
785 static PGresult *
786 libpqrcv_PQexec(PGconn *streamConn, const char *query)
787 {
788  PGresult *lastResult = NULL;
789 
790  /*
791  * PQexec() silently discards any prior query results on the connection.
792  * This is not required for this function as it's expected that the caller
793  * (which is this library in all cases) will behave correctly and we don't
794  * have to be backwards compatible with old libpq.
795  */
796 
797  /*
798  * Submit the query. Since we don't use non-blocking mode, this could
799  * theoretically block. In practice, since we don't send very long query
800  * strings, the risk seems negligible.
801  */
802  if (!PQsendQuery(streamConn, query))
803  return NULL;
804 
805  for (;;)
806  {
807  /* Wait for, and collect, the next PGresult. */
808  PGresult *result;
809 
810  result = libpqrcv_PQgetResult(streamConn);
811  if (result == NULL)
812  break; /* query is complete, or failure */
813 
814  /*
815  * Emulate PQexec()'s behavior of returning the last result when there
816  * are many. We are fine with returning just last error message.
817  */
818  PQclear(lastResult);
819  lastResult = result;
820 
821  if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
822  PQresultStatus(lastResult) == PGRES_COPY_OUT ||
823  PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
824  PQstatus(streamConn) == CONNECTION_BAD)
825  break;
826  }
827 
828  return lastResult;
829 }
830 
831 /*
832  * Perform the equivalent of PQgetResult(), but watch for interrupts.
833  */
834 static PGresult *
836 {
837  /*
838  * Collect data until PQgetResult is ready to get the result without
839  * blocking.
840  */
841  while (PQisBusy(streamConn))
842  {
843  int rc;
844 
845  /*
846  * We don't need to break down the sleep into smaller increments,
847  * since we'll get interrupted by signals and can handle any
848  * interrupts here.
849  */
852  WL_LATCH_SET,
853  PQsocket(streamConn),
854  0,
855  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
856 
857  /* Interrupted? */
858  if (rc & WL_LATCH_SET)
859  {
862  }
863 
864  /* Consume whatever data is available from the socket */
865  if (PQconsumeInput(streamConn) == 0)
866  {
867  /* trouble; return NULL */
868  return NULL;
869  }
870  }
871 
872  /* Now we can collect and return the next PGresult */
873  return PQgetResult(streamConn);
874 }
875 
876 /*
877  * Disconnect connection to primary, if any.
878  */
879 static void
881 {
882  PQfinish(conn->streamConn);
883  PQfreemem(conn->recvBuf);
884  pfree(conn);
885 }
886 
887 /*
888  * Receive a message available from XLOG stream.
889  *
890  * Returns:
891  *
892  * If data was received, returns the length of the data. *buffer is set to
893  * point to a buffer holding the received message. The buffer is only valid
894  * until the next libpqrcv_* call.
895  *
896  * If no data was available immediately, returns 0, and *wait_fd is set to a
897  * socket descriptor which can be waited on before trying again.
898  *
899  * -1 if the server ended the COPY.
900  *
901  * ereports on error.
902  */
903 static int
905  pgsocket *wait_fd)
906 {
907  int rawlen;
908 
909  PQfreemem(conn->recvBuf);
910  conn->recvBuf = NULL;
911 
912  /* Try to receive a CopyData message */
913  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
914  if (rawlen == 0)
915  {
916  /* Try consuming some data. */
917  if (PQconsumeInput(conn->streamConn) == 0)
918  ereport(ERROR,
919  (errcode(ERRCODE_CONNECTION_FAILURE),
920  errmsg("could not receive data from WAL stream: %s",
921  pchomp(PQerrorMessage(conn->streamConn)))));
922 
923  /* Now that we've consumed some input, try again */
924  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
925  if (rawlen == 0)
926  {
927  /* Tell caller to try again when our socket is ready. */
928  *wait_fd = PQsocket(conn->streamConn);
929  return 0;
930  }
931  }
932  if (rawlen == -1) /* end-of-streaming or error */
933  {
934  PGresult *res;
935 
936  res = libpqrcv_PQgetResult(conn->streamConn);
938  {
939  PQclear(res);
940 
941  /* Verify that there are no more results. */
942  res = libpqrcv_PQgetResult(conn->streamConn);
943  if (res != NULL)
944  {
945  PQclear(res);
946 
947  /*
948  * If the other side closed the connection orderly (otherwise
949  * we'd seen an error, or PGRES_COPY_IN) don't report an error
950  * here, but let callers deal with it.
951  */
952  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
953  return -1;
954 
955  ereport(ERROR,
956  (errcode(ERRCODE_PROTOCOL_VIOLATION),
957  errmsg("unexpected result after CommandComplete: %s",
958  PQerrorMessage(conn->streamConn))));
959  }
960 
961  return -1;
962  }
963  else if (PQresultStatus(res) == PGRES_COPY_IN)
964  {
965  PQclear(res);
966  return -1;
967  }
968  else
969  {
970  PQclear(res);
971  ereport(ERROR,
972  (errcode(ERRCODE_PROTOCOL_VIOLATION),
973  errmsg("could not receive data from WAL stream: %s",
974  pchomp(PQerrorMessage(conn->streamConn)))));
975  }
976  }
977  if (rawlen < -1)
978  ereport(ERROR,
979  (errcode(ERRCODE_PROTOCOL_VIOLATION),
980  errmsg("could not receive data from WAL stream: %s",
981  pchomp(PQerrorMessage(conn->streamConn)))));
982 
983  /* Return received messages to caller */
984  *buffer = conn->recvBuf;
985  return rawlen;
986 }
987 
988 /*
989  * Send a message to XLOG stream.
990  *
991  * ereports on error.
992  */
993 static void
994 libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
995 {
996  if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
997  PQflush(conn->streamConn))
998  ereport(ERROR,
999  (errcode(ERRCODE_CONNECTION_FAILURE),
1000  errmsg("could not send data to WAL stream: %s",
1001  pchomp(PQerrorMessage(conn->streamConn)))));
1002 }
1003 
1004 /*
1005  * Create new replication slot.
1006  * Returns the name of the exported snapshot for logical slot or NULL for
1007  * physical slot.
1008  */
1009 static char *
1011  bool temporary, bool two_phase, bool failover,
1012  CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
1013 {
1014  PGresult *res;
1015  StringInfoData cmd;
1016  char *snapshot;
1017  int use_new_options_syntax;
1018 
1019  use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
1020 
1021  initStringInfo(&cmd);
1022 
1023  appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
1024 
1025  if (temporary)
1026  appendStringInfoString(&cmd, " TEMPORARY");
1027 
1028  if (conn->logical)
1029  {
1030  appendStringInfoString(&cmd, " LOGICAL pgoutput ");
1031  if (use_new_options_syntax)
1032  appendStringInfoChar(&cmd, '(');
1033  if (two_phase)
1034  {
1035  appendStringInfoString(&cmd, "TWO_PHASE");
1036  if (use_new_options_syntax)
1037  appendStringInfoString(&cmd, ", ");
1038  else
1039  appendStringInfoChar(&cmd, ' ');
1040  }
1041 
1042  if (failover)
1043  {
1044  appendStringInfoString(&cmd, "FAILOVER");
1045  if (use_new_options_syntax)
1046  appendStringInfoString(&cmd, ", ");
1047  else
1048  appendStringInfoChar(&cmd, ' ');
1049  }
1050 
1051  if (use_new_options_syntax)
1052  {
1053  switch (snapshot_action)
1054  {
1055  case CRS_EXPORT_SNAPSHOT:
1056  appendStringInfoString(&cmd, "SNAPSHOT 'export'");
1057  break;
1058  case CRS_NOEXPORT_SNAPSHOT:
1059  appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
1060  break;
1061  case CRS_USE_SNAPSHOT:
1062  appendStringInfoString(&cmd, "SNAPSHOT 'use'");
1063  break;
1064  }
1065  }
1066  else
1067  {
1068  switch (snapshot_action)
1069  {
1070  case CRS_EXPORT_SNAPSHOT:
1071  appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
1072  break;
1073  case CRS_NOEXPORT_SNAPSHOT:
1074  appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
1075  break;
1076  case CRS_USE_SNAPSHOT:
1077  appendStringInfoString(&cmd, "USE_SNAPSHOT");
1078  break;
1079  }
1080  }
1081 
1082  if (use_new_options_syntax)
1083  appendStringInfoChar(&cmd, ')');
1084  }
1085  else
1086  {
1087  if (use_new_options_syntax)
1088  appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
1089  else
1090  appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
1091  }
1092 
1093  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1094  pfree(cmd.data);
1095 
1097  {
1098  PQclear(res);
1099  ereport(ERROR,
1100  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1101  errmsg("could not create replication slot \"%s\": %s",
1102  slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1103  }
1104 
1105  if (lsn)
1107  CStringGetDatum(PQgetvalue(res, 0, 1))));
1108 
1109  if (!PQgetisnull(res, 0, 2))
1110  snapshot = pstrdup(PQgetvalue(res, 0, 2));
1111  else
1112  snapshot = NULL;
1113 
1114  PQclear(res);
1115 
1116  return snapshot;
1117 }
1118 
1119 /*
1120  * Change the definition of the replication slot.
1121  */
1122 static void
1124  bool failover)
1125 {
1126  StringInfoData cmd;
1127  PGresult *res;
1128 
1129  initStringInfo(&cmd);
1130  appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
1131  quote_identifier(slotname),
1132  failover ? "true" : "false");
1133 
1134  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1135  pfree(cmd.data);
1136 
1138  ereport(ERROR,
1139  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1140  errmsg("could not alter replication slot \"%s\": %s",
1141  slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1142 
1143  PQclear(res);
1144 }
1145 
1146 /*
1147  * Return PID of remote backend process.
1148  */
1149 static pid_t
1151 {
1152  return PQbackendPID(conn->streamConn);
1153 }
1154 
1155 /*
1156  * Convert tuple query result to tuplestore.
1157  */
1158 static void
1160  const int nRetTypes, const Oid *retTypes)
1161 {
1162  int tupn;
1163  int coln;
1164  int nfields = PQnfields(pgres);
1165  HeapTuple tuple;
1166  AttInMetadata *attinmeta;
1167  MemoryContext rowcontext;
1168  MemoryContext oldcontext;
1169 
1170  /* Make sure we got expected number of fields. */
1171  if (nfields != nRetTypes)
1172  ereport(ERROR,
1173  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1174  errmsg("invalid query response"),
1175  errdetail("Expected %d fields, got %d fields.",
1176  nRetTypes, nfields)));
1177 
1178  walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1179 
1180  /* Create tuple descriptor corresponding to expected result. */
1181  walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1182  for (coln = 0; coln < nRetTypes; coln++)
1183  TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1184  PQfname(pgres, coln), retTypes[coln], -1, 0);
1185  attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1186 
1187  /* No point in doing more here if there were no tuples returned. */
1188  if (PQntuples(pgres) == 0)
1189  return;
1190 
1191  /* Create temporary context for local allocations. */
1193  "libpqrcv query result context",
1195 
1196  /* Process returned rows. */
1197  for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1198  {
1199  char *cstrs[MaxTupleAttributeNumber];
1200 
1202 
1203  /* Do the allocations in temporary context. */
1204  oldcontext = MemoryContextSwitchTo(rowcontext);
1205 
1206  /*
1207  * Fill cstrs with null-terminated strings of column values.
1208  */
1209  for (coln = 0; coln < nfields; coln++)
1210  {
1211  if (PQgetisnull(pgres, tupn, coln))
1212  cstrs[coln] = NULL;
1213  else
1214  cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1215  }
1216 
1217  /* Convert row to a tuple, and add it to the tuplestore */
1218  tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1219  tuplestore_puttuple(walres->tuplestore, tuple);
1220 
1221  /* Clean up */
1222  MemoryContextSwitchTo(oldcontext);
1223  MemoryContextReset(rowcontext);
1224  }
1225 
1226  MemoryContextDelete(rowcontext);
1227 }
1228 
1229 /*
1230  * Public interface for sending generic queries (and commands).
1231  *
1232  * This can only be called from process connected to database.
1233  */
1234 static WalRcvExecResult *
1235 libpqrcv_exec(WalReceiverConn *conn, const char *query,
1236  const int nRetTypes, const Oid *retTypes)
1237 {
1238  PGresult *pgres = NULL;
1239  WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1240  char *diag_sqlstate;
1241 
1242  if (MyDatabaseId == InvalidOid)
1243  ereport(ERROR,
1244  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1245  errmsg("the query interface requires a database connection")));
1246 
1247  pgres = libpqrcv_PQexec(conn->streamConn, query);
1248 
1249  switch (PQresultStatus(pgres))
1250  {
1251  case PGRES_TUPLES_OK:
1252  case PGRES_SINGLE_TUPLE:
1253  case PGRES_TUPLES_CHUNK:
1254  walres->status = WALRCV_OK_TUPLES;
1255  libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1256  break;
1257 
1258  case PGRES_COPY_IN:
1259  walres->status = WALRCV_OK_COPY_IN;
1260  break;
1261 
1262  case PGRES_COPY_OUT:
1263  walres->status = WALRCV_OK_COPY_OUT;
1264  break;
1265 
1266  case PGRES_COPY_BOTH:
1267  walres->status = WALRCV_OK_COPY_BOTH;
1268  break;
1269 
1270  case PGRES_COMMAND_OK:
1271  walres->status = WALRCV_OK_COMMAND;
1272  break;
1273 
1274  /* Empty query is considered error. */
1275  case PGRES_EMPTY_QUERY:
1276  walres->status = WALRCV_ERROR;
1277  walres->err = _("empty query");
1278  break;
1279 
1280  case PGRES_PIPELINE_SYNC:
1282  walres->status = WALRCV_ERROR;
1283  walres->err = _("unexpected pipeline mode");
1284  break;
1285 
1286  case PGRES_NONFATAL_ERROR:
1287  case PGRES_FATAL_ERROR:
1288  case PGRES_BAD_RESPONSE:
1289  walres->status = WALRCV_ERROR;
1290  walres->err = pchomp(PQerrorMessage(conn->streamConn));
1291  diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1292  if (diag_sqlstate)
1293  walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1294  diag_sqlstate[1],
1295  diag_sqlstate[2],
1296  diag_sqlstate[3],
1297  diag_sqlstate[4]);
1298  break;
1299  }
1300 
1301  PQclear(pgres);
1302 
1303  return walres;
1304 }
1305 
1306 /*
1307  * Given a List of strings, return it as single comma separated
1308  * string, quoting identifiers as needed.
1309  *
1310  * This is essentially the reverse of SplitIdentifierString.
1311  *
1312  * The caller should free the result.
1313  */
1314 static char *
1316 {
1317  ListCell *lc;
1319  bool first = true;
1320 
1321  initStringInfo(&res);
1322 
1323  foreach(lc, strings)
1324  {
1325  char *val = strVal(lfirst(lc));
1326  char *val_escaped;
1327 
1328  if (first)
1329  first = false;
1330  else
1331  appendStringInfoChar(&res, ',');
1332 
1333  val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1334  if (!val_escaped)
1335  {
1336  free(res.data);
1337  return NULL;
1338  }
1339  appendStringInfoString(&res, val_escaped);
1340  PQfreemem(val_escaped);
1341  }
1342 
1343  return res.data;
1344 }
int16 AttrNumber
Definition: attnum.h:21
#define Assert(condition)
Definition: c.h:858
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
int errdetail(const char *fmt,...)
Definition: elog.c:1205
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define _(x)
Definition: elog.c:90
#define ERROR
Definition: elog.h:39
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
void err(int eval, const char *fmt,...)
Definition: err.c:43
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2222
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2173
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7137
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5728
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:791
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:7026
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7213
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2591
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:6980
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:6936
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7147
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7094
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4868
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7181
char * PQport(const PGconn *conn)
Definition: fe-connect.c:7062
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7173
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3887
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4310
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2949
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2749
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2695
char * PQfname(const PGresult *res, int field_num)
Definition: fe-exec.c:3567
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4304
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3901
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2816
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:792
int work_mem
Definition: globals.c:128
struct Latch * MyLatch
Definition: globals.c:60
Oid MyDatabaseId
Definition: globals.c:91
#define free(a)
Definition: header.h:65
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
long val
Definition: informix.c:670
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
#define WL_SOCKET_CONNECTED
Definition: latch.h:137
#define WL_SOCKET_WRITEABLE
Definition: latch.h:129
@ CONNECTION_STARTED
Definition: libpq-fe.h:69
@ CONNECTION_BAD
Definition: libpq-fe.h:62
@ CONNECTION_OK
Definition: libpq-fe.h:61
@ PGRES_COPY_IN
Definition: libpq-fe.h:107
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:112
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:117
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:111
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:113
@ PGRES_COPY_OUT
Definition: libpq-fe.h:106
@ PGRES_EMPTY_QUERY
Definition: libpq-fe.h:99
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:114
@ PGRES_BAD_RESPONSE
Definition: libpq-fe.h:108
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:115
@ PGRES_NONFATAL_ERROR
Definition: libpq-fe.h:110
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:103
PostgresPollingStatusType
Definition: libpq-fe.h:89
@ PGRES_POLLING_OK
Definition: libpq-fe.h:93
@ PGRES_POLLING_READING
Definition: libpq-fe.h:91
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:92
@ PGRES_POLLING_FAILED
Definition: libpq-fe.h:90
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err)
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
void _PG_init(void)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
PG_MODULE_MAGIC
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
static int libpqrcv_server_version(WalReceiverConn *conn)
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static WalReceiverFunctionsType PQWalReceiverFunctions
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, bool failover)
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
static char * libpqrcv_get_dbname_from_conninfo(const char *conninfo)
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1267
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
char * pchomp(const char *in)
Definition: mcxt.c:1723
char * pstrdup(const char *in)
Definition: mcxt.c:1695
void pfree(void *pointer)
Definition: mcxt.c:1520
void * palloc0(Size size)
Definition: mcxt.c:1346
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
void * palloc(Size size)
Definition: mcxt.c:1316
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
int32 pg_strtoint32(const char *s)
Definition: numutils.c:383
static AmcheckOptions opts
Definition: pg_amcheck.c:111
const void size_t len
static char * filename
Definition: pg_dumpall.c:119
#define lfirst(lc)
Definition: pg_list.h:172
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:63
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
static bool two_phase
static char * buf
Definition: pg_test_fsync.c:73
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:238
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:350
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
MemoryContextSwitchTo(old_ctx)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12623
char * dbname
Definition: streamutil.c:52
PGconn * conn
Definition: streamutil.c:55
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:194
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: pg_list.h:54
Tuplestorestate * tuplestore
Definition: walreceiver.h:222
TupleDesc tupledesc
Definition: walreceiver.h:223
WalRcvExecStatus status
Definition: walreceiver.h:219
walrcv_connect_fn walrcv_connect
Definition: walreceiver.h:411
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:67
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:651
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:730
#define strVal(v)
Definition: value.h:82
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:162
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:93
@ WALRCV_OK_COPY_IN
Definition: walreceiver.h:207
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:204
@ WALRCV_ERROR
Definition: walreceiver.h:203
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:206
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:208
@ WALRCV_OK_COPY_BOTH
Definition: walreceiver.h:209
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59