PostgreSQL Source Code  git master
libpqwalreceiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpqwalreceiver.c
4  *
5  * This file contains the libpq-specific parts of walreceiver. It's
6  * loaded as a dynamic module to avoid linking the main server binary with
7  * libpq.
8  *
9  * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18 
19 #include <unistd.h>
20 #include <sys/time.h>
21 
22 #include "access/xlog.h"
23 #include "catalog/pg_type.h"
24 #include "common/connect.h"
25 #include "funcapi.h"
26 #include "libpq-fe.h"
27 #include "mb/pg_wchar.h"
28 #include "miscadmin.h"
29 #include "pgstat.h"
30 #include "pqexpbuffer.h"
32 #include "utils/builtins.h"
33 #include "utils/memutils.h"
34 #include "utils/pg_lsn.h"
35 #include "utils/tuplestore.h"
36 
38 
39 void _PG_init(void);
40 
42 {
43  /* Current connection to the primary, if any */
45  /* Used to remember if the connection is logical or physical */
46  bool logical;
47  /* Buffer for currently read records */
48  char *recvBuf;
49 };
50 
51 /* Prototypes for interface functions */
52 static WalReceiverConn *libpqrcv_connect(const char *conninfo,
53  bool logical, const char *appname,
54  char **err);
55 static void libpqrcv_check_conninfo(const char *conninfo);
58  char **sender_host, int *sender_port);
60  TimeLineID *primary_tli);
63  TimeLineID tli, char **filename,
64  char **content, int *len);
68  TimeLineID *next_tli);
69 static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
70  pgsocket *wait_fd);
71 static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
72  int nbytes);
74  const char *slotname,
75  bool temporary,
76  bool two_phase,
77  CRSSnapshotAction snapshot_action,
78  XLogRecPtr *lsn);
81  const char *query,
82  const int nRetTypes,
83  const Oid *retTypes);
85 
102 };
103 
104 /* Prototypes for private functions */
105 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
106 static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
107 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
108 
109 /*
110  * Module initialization function
111  */
112 void
113 _PG_init(void)
114 {
115  if (WalReceiverFunctions != NULL)
116  elog(ERROR, "libpqwalreceiver already loaded");
118 }
119 
120 /*
121  * Establish the connection to the primary server for XLOG streaming
122  *
123  * Returns NULL on error and fills the err with palloc'ed error message.
124  */
125 static WalReceiverConn *
126 libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
127  char **err)
128 {
131  const char *keys[6];
132  const char *vals[6];
133  int i = 0;
134 
135  /*
136  * We use the expand_dbname parameter to process the connection string (or
137  * URI), and pass some extra options.
138  */
139  keys[i] = "dbname";
140  vals[i] = conninfo;
141  keys[++i] = "replication";
142  vals[i] = logical ? "database" : "true";
143  if (!logical)
144  {
145  /*
146  * The database name is ignored by the server in replication mode, but
147  * specify "replication" for .pgpass lookup.
148  */
149  keys[++i] = "dbname";
150  vals[i] = "replication";
151  }
152  keys[++i] = "fallback_application_name";
153  vals[i] = appname;
154  if (logical)
155  {
156  /* Tell the publisher to translate to our encoding */
157  keys[++i] = "client_encoding";
158  vals[i] = GetDatabaseEncodingName();
159 
160  /*
161  * Force assorted GUC parameters to settings that ensure that the
162  * publisher will output data values in a form that is unambiguous to
163  * the subscriber. (We don't want to modify the subscriber's GUC
164  * settings, since that might surprise user-defined code running in
165  * the subscriber, such as triggers.) This should match what pg_dump
166  * does.
167  */
168  keys[++i] = "options";
169  vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
170  }
171  keys[++i] = NULL;
172  vals[i] = NULL;
173 
174  Assert(i < sizeof(keys));
175 
176  conn = palloc0(sizeof(WalReceiverConn));
177  conn->streamConn = PQconnectStartParams(keys, vals,
178  /* expand_dbname = */ true);
179  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
180  {
181  *err = pchomp(PQerrorMessage(conn->streamConn));
182  return NULL;
183  }
184 
185  /*
186  * Poll connection until we have OK or FAILED status.
187  *
188  * Per spec for PQconnectPoll, first wait till socket is write-ready.
189  */
191  do
192  {
193  int io_flag;
194  int rc;
195 
197  io_flag = WL_SOCKET_READABLE;
198 #ifdef WIN32
199  /* Windows needs a different test while waiting for connection-made */
200  else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
201  io_flag = WL_SOCKET_CONNECTED;
202 #endif
203  else
204  io_flag = WL_SOCKET_WRITEABLE;
205 
207  WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
208  PQsocket(conn->streamConn),
209  0,
211 
212  /* Interrupted? */
213  if (rc & WL_LATCH_SET)
214  {
217  }
218 
219  /* If socket is ready, advance the libpq state machine */
220  if (rc & io_flag)
221  status = PQconnectPoll(conn->streamConn);
223 
224  if (PQstatus(conn->streamConn) != CONNECTION_OK)
225  {
226  *err = pchomp(PQerrorMessage(conn->streamConn));
227  return NULL;
228  }
229 
230  if (logical)
231  {
232  PGresult *res;
233 
234  res = libpqrcv_PQexec(conn->streamConn,
237  {
238  PQclear(res);
239  ereport(ERROR,
240  (errmsg("could not clear search path: %s",
241  pchomp(PQerrorMessage(conn->streamConn)))));
242  }
243  PQclear(res);
244  }
245 
246  conn->logical = logical;
247 
248  return conn;
249 }
250 
251 /*
252  * Validate connection info string (just try to parse it)
253  */
254 static void
255 libpqrcv_check_conninfo(const char *conninfo)
256 {
257  PQconninfoOption *opts = NULL;
258  char *err = NULL;
259 
260  opts = PQconninfoParse(conninfo, &err);
261  if (opts == NULL)
262  {
263  /* The error string is malloc'd, so we must free it explicitly */
264  char *errcopy = err ? pstrdup(err) : "out of memory";
265 
266  PQfreemem(err);
267  ereport(ERROR,
268  (errcode(ERRCODE_SYNTAX_ERROR),
269  errmsg("invalid connection string syntax: %s", errcopy)));
270  }
271 
273 }
274 
275 /*
276  * Return a user-displayable conninfo string. Any security-sensitive fields
277  * are obfuscated.
278  */
279 static char *
281 {
282  PQconninfoOption *conn_opts;
283  PQconninfoOption *conn_opt;
285  char *retval;
286 
287  Assert(conn->streamConn != NULL);
288 
290  conn_opts = PQconninfo(conn->streamConn);
291 
292  if (conn_opts == NULL)
293  ereport(ERROR,
294  (errcode(ERRCODE_OUT_OF_MEMORY),
295  errmsg("could not parse connection string: %s",
296  _("out of memory"))));
297 
298  /* build a clean connection string from pieces */
299  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
300  {
301  bool obfuscate;
302 
303  /* Skip debug and empty options */
304  if (strchr(conn_opt->dispchar, 'D') ||
305  conn_opt->val == NULL ||
306  conn_opt->val[0] == '\0')
307  continue;
308 
309  /* Obfuscate security-sensitive options */
310  obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
311 
312  appendPQExpBuffer(&buf, "%s%s=%s",
313  buf.len == 0 ? "" : " ",
314  conn_opt->keyword,
315  obfuscate ? "********" : conn_opt->val);
316  }
317 
318  PQconninfoFree(conn_opts);
319 
320  retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
322  return retval;
323 }
324 
325 /*
326  * Provides information of sender this WAL receiver is connected to.
327  */
328 static void
330  int *sender_port)
331 {
332  char *ret = NULL;
333 
334  *sender_host = NULL;
335  *sender_port = 0;
336 
337  Assert(conn->streamConn != NULL);
338 
339  ret = PQhost(conn->streamConn);
340  if (ret && strlen(ret) != 0)
341  *sender_host = pstrdup(ret);
342 
343  ret = PQport(conn->streamConn);
344  if (ret && strlen(ret) != 0)
345  *sender_port = atoi(ret);
346 }
347 
348 /*
349  * Check that primary's system identifier matches ours, and fetch the current
350  * timeline ID of the primary.
351  */
352 static char *
354 {
355  PGresult *res;
356  char *primary_sysid;
357 
358  /*
359  * Get the system identifier and timeline ID as a DataRow message from the
360  * primary server.
361  */
362  res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
364  {
365  PQclear(res);
366  ereport(ERROR,
367  (errcode(ERRCODE_PROTOCOL_VIOLATION),
368  errmsg("could not receive database system identifier and timeline ID from "
369  "the primary server: %s",
370  pchomp(PQerrorMessage(conn->streamConn)))));
371  }
372  if (PQnfields(res) < 3 || PQntuples(res) != 1)
373  {
374  int ntuples = PQntuples(res);
375  int nfields = PQnfields(res);
376 
377  PQclear(res);
378  ereport(ERROR,
379  (errcode(ERRCODE_PROTOCOL_VIOLATION),
380  errmsg("invalid response from primary server"),
381  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
382  ntuples, nfields, 3, 1)));
383  }
384  primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
385  *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
386  PQclear(res);
387 
388  return primary_sysid;
389 }
390 
391 /*
392  * Thin wrapper around libpq to obtain server version.
393  */
394 static int
396 {
397  return PQserverVersion(conn->streamConn);
398 }
399 
400 /*
401  * Start streaming WAL data from given streaming options.
402  *
403  * Returns true if we switched successfully to copy-both mode. False
404  * means the server received the command and executed it successfully, but
405  * didn't switch to copy-mode. That means that there was no WAL on the
406  * requested timeline and starting point, because the server switched to
407  * another timeline at or before the requested starting point. On failure,
408  * throws an ERROR.
409  */
410 static bool
413 {
414  StringInfoData cmd;
415  PGresult *res;
416 
417  Assert(options->logical == conn->logical);
418  Assert(options->slotname || !options->logical);
419 
420  initStringInfo(&cmd);
421 
422  /* Build the command. */
423  appendStringInfoString(&cmd, "START_REPLICATION");
424  if (options->slotname != NULL)
425  appendStringInfo(&cmd, " SLOT \"%s\"",
426  options->slotname);
427 
428  if (options->logical)
429  appendStringInfoString(&cmd, " LOGICAL");
430 
431  appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
432 
433  /*
434  * Additional options are different depending on if we are doing logical
435  * or physical replication.
436  */
437  if (options->logical)
438  {
439  char *pubnames_str;
440  List *pubnames;
441  char *pubnames_literal;
442 
443  appendStringInfoString(&cmd, " (");
444 
445  appendStringInfo(&cmd, "proto_version '%u'",
446  options->proto.logical.proto_version);
447 
448  if (options->proto.logical.streaming &&
449  PQserverVersion(conn->streamConn) >= 140000)
450  appendStringInfoString(&cmd, ", streaming 'on'");
451 
452  if (options->proto.logical.twophase &&
453  PQserverVersion(conn->streamConn) >= 150000)
454  appendStringInfoString(&cmd, ", two_phase 'on'");
455 
456  pubnames = options->proto.logical.publication_names;
457  pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
458  if (!pubnames_str)
459  ereport(ERROR,
460  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
461  errmsg("could not start WAL streaming: %s",
462  pchomp(PQerrorMessage(conn->streamConn)))));
463  pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
464  strlen(pubnames_str));
465  if (!pubnames_literal)
466  ereport(ERROR,
467  (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
468  errmsg("could not start WAL streaming: %s",
469  pchomp(PQerrorMessage(conn->streamConn)))));
470  appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
471  PQfreemem(pubnames_literal);
472  pfree(pubnames_str);
473 
474  if (options->proto.logical.binary &&
475  PQserverVersion(conn->streamConn) >= 140000)
476  appendStringInfoString(&cmd, ", binary 'true'");
477 
478  appendStringInfoChar(&cmd, ')');
479  }
480  else
481  appendStringInfo(&cmd, " TIMELINE %u",
482  options->proto.physical.startpointTLI);
483 
484  /* Start streaming. */
485  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
486  pfree(cmd.data);
487 
489  {
490  PQclear(res);
491  return false;
492  }
493  else if (PQresultStatus(res) != PGRES_COPY_BOTH)
494  {
495  PQclear(res);
496  ereport(ERROR,
497  (errcode(ERRCODE_PROTOCOL_VIOLATION),
498  errmsg("could not start WAL streaming: %s",
499  pchomp(PQerrorMessage(conn->streamConn)))));
500  }
501  PQclear(res);
502  return true;
503 }
504 
505 /*
506  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
507  * reported by the server, or 0 if it did not report it.
508  */
509 static void
511 {
512  PGresult *res;
513 
514  /*
515  * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
516  * block, but the risk seems small.
517  */
518  if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
519  PQflush(conn->streamConn))
520  ereport(ERROR,
521  (errcode(ERRCODE_CONNECTION_FAILURE),
522  errmsg("could not send end-of-streaming message to primary: %s",
523  pchomp(PQerrorMessage(conn->streamConn)))));
524 
525  *next_tli = 0;
526 
527  /*
528  * After COPY is finished, we should receive a result set indicating the
529  * next timeline's ID, or just CommandComplete if the server was shut
530  * down.
531  *
532  * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
533  * also possible in case we aborted the copy in mid-stream.
534  */
535  res = libpqrcv_PQgetResult(conn->streamConn);
537  {
538  /*
539  * Read the next timeline's ID. The server also sends the timeline's
540  * starting point, but it is ignored.
541  */
542  if (PQnfields(res) < 2 || PQntuples(res) != 1)
543  ereport(ERROR,
544  (errcode(ERRCODE_PROTOCOL_VIOLATION),
545  errmsg("unexpected result set after end-of-streaming")));
546  *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
547  PQclear(res);
548 
549  /* the result set should be followed by CommandComplete */
550  res = libpqrcv_PQgetResult(conn->streamConn);
551  }
552  else if (PQresultStatus(res) == PGRES_COPY_OUT)
553  {
554  PQclear(res);
555 
556  /* End the copy */
557  if (PQendcopy(conn->streamConn))
558  ereport(ERROR,
559  (errcode(ERRCODE_CONNECTION_FAILURE),
560  errmsg("error while shutting down streaming COPY: %s",
561  pchomp(PQerrorMessage(conn->streamConn)))));
562 
563  /* CommandComplete should follow */
564  res = libpqrcv_PQgetResult(conn->streamConn);
565  }
566 
568  ereport(ERROR,
569  (errcode(ERRCODE_PROTOCOL_VIOLATION),
570  errmsg("error reading result of streaming command: %s",
571  pchomp(PQerrorMessage(conn->streamConn)))));
572  PQclear(res);
573 
574  /* Verify that there are no more results */
575  res = libpqrcv_PQgetResult(conn->streamConn);
576  if (res != NULL)
577  ereport(ERROR,
578  (errcode(ERRCODE_PROTOCOL_VIOLATION),
579  errmsg("unexpected result after CommandComplete: %s",
580  pchomp(PQerrorMessage(conn->streamConn)))));
581 }
582 
583 /*
584  * Fetch the timeline history file for 'tli' from primary.
585  */
586 static void
588  TimeLineID tli, char **filename,
589  char **content, int *len)
590 {
591  PGresult *res;
592  char cmd[64];
593 
594  Assert(!conn->logical);
595 
596  /*
597  * Request the primary to send over the history file for given timeline.
598  */
599  snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
600  res = libpqrcv_PQexec(conn->streamConn, cmd);
602  {
603  PQclear(res);
604  ereport(ERROR,
605  (errcode(ERRCODE_PROTOCOL_VIOLATION),
606  errmsg("could not receive timeline history file from "
607  "the primary server: %s",
608  pchomp(PQerrorMessage(conn->streamConn)))));
609  }
610  if (PQnfields(res) != 2 || PQntuples(res) != 1)
611  {
612  int ntuples = PQntuples(res);
613  int nfields = PQnfields(res);
614 
615  PQclear(res);
616  ereport(ERROR,
617  (errcode(ERRCODE_PROTOCOL_VIOLATION),
618  errmsg("invalid response from primary server"),
619  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
620  ntuples, nfields)));
621  }
622  *filename = pstrdup(PQgetvalue(res, 0, 0));
623 
624  *len = PQgetlength(res, 0, 1);
625  *content = palloc(*len);
626  memcpy(*content, PQgetvalue(res, 0, 1), *len);
627  PQclear(res);
628 }
629 
630 /*
631  * Send a query and wait for the results by using the asynchronous libpq
632  * functions and socket readiness events.
633  *
634  * We must not use the regular blocking libpq functions like PQexec()
635  * since they are uninterruptible by signals on some platforms, such as
636  * Windows.
637  *
638  * The function is modeled on PQexec() in libpq, but only implements
639  * those parts that are in use in the walreceiver api.
640  *
641  * May return NULL, rather than an error result, on failure.
642  */
643 static PGresult *
644 libpqrcv_PQexec(PGconn *streamConn, const char *query)
645 {
646  PGresult *lastResult = NULL;
647 
648  /*
649  * PQexec() silently discards any prior query results on the connection.
650  * This is not required for this function as it's expected that the caller
651  * (which is this library in all cases) will behave correctly and we don't
652  * have to be backwards compatible with old libpq.
653  */
654 
655  /*
656  * Submit the query. Since we don't use non-blocking mode, this could
657  * theoretically block. In practice, since we don't send very long query
658  * strings, the risk seems negligible.
659  */
660  if (!PQsendQuery(streamConn, query))
661  return NULL;
662 
663  for (;;)
664  {
665  /* Wait for, and collect, the next PGresult. */
666  PGresult *result;
667 
668  result = libpqrcv_PQgetResult(streamConn);
669  if (result == NULL)
670  break; /* query is complete, or failure */
671 
672  /*
673  * Emulate PQexec()'s behavior of returning the last result when there
674  * are many. We are fine with returning just last error message.
675  */
676  PQclear(lastResult);
677  lastResult = result;
678 
679  if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
680  PQresultStatus(lastResult) == PGRES_COPY_OUT ||
681  PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
682  PQstatus(streamConn) == CONNECTION_BAD)
683  break;
684  }
685 
686  return lastResult;
687 }
688 
689 /*
690  * Perform the equivalent of PQgetResult(), but watch for interrupts.
691  */
692 static PGresult *
694 {
695  /*
696  * Collect data until PQgetResult is ready to get the result without
697  * blocking.
698  */
699  while (PQisBusy(streamConn))
700  {
701  int rc;
702 
703  /*
704  * We don't need to break down the sleep into smaller increments,
705  * since we'll get interrupted by signals and can handle any
706  * interrupts here.
707  */
710  WL_LATCH_SET,
711  PQsocket(streamConn),
712  0,
714 
715  /* Interrupted? */
716  if (rc & WL_LATCH_SET)
717  {
720  }
721 
722  /* Consume whatever data is available from the socket */
723  if (PQconsumeInput(streamConn) == 0)
724  {
725  /* trouble; return NULL */
726  return NULL;
727  }
728  }
729 
730  /* Now we can collect and return the next PGresult */
731  return PQgetResult(streamConn);
732 }
733 
734 /*
735  * Disconnect connection to primary, if any.
736  */
737 static void
739 {
740  PQfinish(conn->streamConn);
741  if (conn->recvBuf != NULL)
742  PQfreemem(conn->recvBuf);
743  pfree(conn);
744 }
745 
746 /*
747  * Receive a message available from XLOG stream.
748  *
749  * Returns:
750  *
751  * If data was received, returns the length of the data. *buffer is set to
752  * point to a buffer holding the received message. The buffer is only valid
753  * until the next libpqrcv_* call.
754  *
755  * If no data was available immediately, returns 0, and *wait_fd is set to a
756  * socket descriptor which can be waited on before trying again.
757  *
758  * -1 if the server ended the COPY.
759  *
760  * ereports on error.
761  */
762 static int
764  pgsocket *wait_fd)
765 {
766  int rawlen;
767 
768  if (conn->recvBuf != NULL)
769  PQfreemem(conn->recvBuf);
770  conn->recvBuf = NULL;
771 
772  /* Try to receive a CopyData message */
773  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
774  if (rawlen == 0)
775  {
776  /* Try consuming some data. */
777  if (PQconsumeInput(conn->streamConn) == 0)
778  ereport(ERROR,
779  (errcode(ERRCODE_CONNECTION_FAILURE),
780  errmsg("could not receive data from WAL stream: %s",
781  pchomp(PQerrorMessage(conn->streamConn)))));
782 
783  /* Now that we've consumed some input, try again */
784  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
785  if (rawlen == 0)
786  {
787  /* Tell caller to try again when our socket is ready. */
788  *wait_fd = PQsocket(conn->streamConn);
789  return 0;
790  }
791  }
792  if (rawlen == -1) /* end-of-streaming or error */
793  {
794  PGresult *res;
795 
796  res = libpqrcv_PQgetResult(conn->streamConn);
798  {
799  PQclear(res);
800 
801  /* Verify that there are no more results. */
802  res = libpqrcv_PQgetResult(conn->streamConn);
803  if (res != NULL)
804  {
805  PQclear(res);
806 
807  /*
808  * If the other side closed the connection orderly (otherwise
809  * we'd seen an error, or PGRES_COPY_IN) don't report an error
810  * here, but let callers deal with it.
811  */
812  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
813  return -1;
814 
815  ereport(ERROR,
816  (errcode(ERRCODE_PROTOCOL_VIOLATION),
817  errmsg("unexpected result after CommandComplete: %s",
818  PQerrorMessage(conn->streamConn))));
819  }
820 
821  return -1;
822  }
823  else if (PQresultStatus(res) == PGRES_COPY_IN)
824  {
825  PQclear(res);
826  return -1;
827  }
828  else
829  {
830  PQclear(res);
831  ereport(ERROR,
832  (errcode(ERRCODE_PROTOCOL_VIOLATION),
833  errmsg("could not receive data from WAL stream: %s",
834  pchomp(PQerrorMessage(conn->streamConn)))));
835  }
836  }
837  if (rawlen < -1)
838  ereport(ERROR,
839  (errcode(ERRCODE_PROTOCOL_VIOLATION),
840  errmsg("could not receive data from WAL stream: %s",
841  pchomp(PQerrorMessage(conn->streamConn)))));
842 
843  /* Return received messages to caller */
844  *buffer = conn->recvBuf;
845  return rawlen;
846 }
847 
848 /*
849  * Send a message to XLOG stream.
850  *
851  * ereports on error.
852  */
853 static void
854 libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
855 {
856  if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
857  PQflush(conn->streamConn))
858  ereport(ERROR,
859  (errcode(ERRCODE_CONNECTION_FAILURE),
860  errmsg("could not send data to WAL stream: %s",
861  pchomp(PQerrorMessage(conn->streamConn)))));
862 }
863 
864 /*
865  * Create new replication slot.
866  * Returns the name of the exported snapshot for logical slot or NULL for
867  * physical slot.
868  */
869 static char *
871  bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
872  XLogRecPtr *lsn)
873 {
874  PGresult *res;
875  StringInfoData cmd;
876  char *snapshot;
877  int use_new_options_syntax;
878 
879  use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
880 
881  initStringInfo(&cmd);
882 
883  appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
884 
885  if (temporary)
886  appendStringInfoString(&cmd, " TEMPORARY");
887 
888  if (conn->logical)
889  {
890  appendStringInfoString(&cmd, " LOGICAL pgoutput ");
891  if (use_new_options_syntax)
892  appendStringInfoChar(&cmd, '(');
893  if (two_phase)
894  {
895  appendStringInfoString(&cmd, "TWO_PHASE");
896  if (use_new_options_syntax)
897  appendStringInfoString(&cmd, ", ");
898  else
899  appendStringInfoChar(&cmd, ' ');
900  }
901 
902  if (use_new_options_syntax)
903  {
904  switch (snapshot_action)
905  {
906  case CRS_EXPORT_SNAPSHOT:
907  appendStringInfoString(&cmd, "SNAPSHOT 'export'");
908  break;
910  appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
911  break;
912  case CRS_USE_SNAPSHOT:
913  appendStringInfoString(&cmd, "SNAPSHOT 'use'");
914  break;
915  }
916  }
917  else
918  {
919  switch (snapshot_action)
920  {
921  case CRS_EXPORT_SNAPSHOT:
922  appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
923  break;
925  appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
926  break;
927  case CRS_USE_SNAPSHOT:
928  appendStringInfoString(&cmd, "USE_SNAPSHOT");
929  break;
930  }
931  }
932 
933  if (use_new_options_syntax)
934  appendStringInfoChar(&cmd, ')');
935  }
936  else
937  {
938  if (use_new_options_syntax)
939  appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
940  else
941  appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
942  }
943 
944  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
945  pfree(cmd.data);
946 
948  {
949  PQclear(res);
950  ereport(ERROR,
951  (errcode(ERRCODE_PROTOCOL_VIOLATION),
952  errmsg("could not create replication slot \"%s\": %s",
953  slotname, pchomp(PQerrorMessage(conn->streamConn)))));
954  }
955 
956  if (lsn)
958  CStringGetDatum(PQgetvalue(res, 0, 1))));
959 
960  if (!PQgetisnull(res, 0, 2))
961  snapshot = pstrdup(PQgetvalue(res, 0, 2));
962  else
963  snapshot = NULL;
964 
965  PQclear(res);
966 
967  return snapshot;
968 }
969 
970 /*
971  * Return PID of remote backend process.
972  */
973 static pid_t
975 {
976  return PQbackendPID(conn->streamConn);
977 }
978 
979 /*
980  * Convert tuple query result to tuplestore.
981  */
982 static void
984  const int nRetTypes, const Oid *retTypes)
985 {
986  int tupn;
987  int coln;
988  int nfields = PQnfields(pgres);
989  HeapTuple tuple;
990  AttInMetadata *attinmeta;
991  MemoryContext rowcontext;
992  MemoryContext oldcontext;
993 
994  /* Make sure we got expected number of fields. */
995  if (nfields != nRetTypes)
996  ereport(ERROR,
997  (errcode(ERRCODE_PROTOCOL_VIOLATION),
998  errmsg("invalid query response"),
999  errdetail("Expected %d fields, got %d fields.",
1000  nRetTypes, nfields)));
1001 
1002  walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1003 
1004  /* Create tuple descriptor corresponding to expected result. */
1005  walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1006  for (coln = 0; coln < nRetTypes; coln++)
1007  TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1008  PQfname(pgres, coln), retTypes[coln], -1, 0);
1009  attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1010 
1011  /* No point in doing more here if there were no tuples returned. */
1012  if (PQntuples(pgres) == 0)
1013  return;
1014 
1015  /* Create temporary context for local allocations. */
1017  "libpqrcv query result context",
1019 
1020  /* Process returned rows. */
1021  for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1022  {
1023  char *cstrs[MaxTupleAttributeNumber];
1024 
1026 
1027  /* Do the allocations in temporary context. */
1028  oldcontext = MemoryContextSwitchTo(rowcontext);
1029 
1030  /*
1031  * Fill cstrs with null-terminated strings of column values.
1032  */
1033  for (coln = 0; coln < nfields; coln++)
1034  {
1035  if (PQgetisnull(pgres, tupn, coln))
1036  cstrs[coln] = NULL;
1037  else
1038  cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1039  }
1040 
1041  /* Convert row to a tuple, and add it to the tuplestore */
1042  tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1043  tuplestore_puttuple(walres->tuplestore, tuple);
1044 
1045  /* Clean up */
1046  MemoryContextSwitchTo(oldcontext);
1047  MemoryContextReset(rowcontext);
1048  }
1049 
1050  MemoryContextDelete(rowcontext);
1051 }
1052 
1053 /*
1054  * Public interface for sending generic queries (and commands).
1055  *
1056  * This can only be called from process connected to database.
1057  */
1058 static WalRcvExecResult *
1059 libpqrcv_exec(WalReceiverConn *conn, const char *query,
1060  const int nRetTypes, const Oid *retTypes)
1061 {
1062  PGresult *pgres = NULL;
1063  WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1064  char *diag_sqlstate;
1065 
1066  if (MyDatabaseId == InvalidOid)
1067  ereport(ERROR,
1068  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1069  errmsg("the query interface requires a database connection")));
1070 
1071  pgres = libpqrcv_PQexec(conn->streamConn, query);
1072 
1073  switch (PQresultStatus(pgres))
1074  {
1075  case PGRES_SINGLE_TUPLE:
1076  case PGRES_TUPLES_OK:
1077  walres->status = WALRCV_OK_TUPLES;
1078  libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1079  break;
1080 
1081  case PGRES_COPY_IN:
1082  walres->status = WALRCV_OK_COPY_IN;
1083  break;
1084 
1085  case PGRES_COPY_OUT:
1086  walres->status = WALRCV_OK_COPY_OUT;
1087  break;
1088 
1089  case PGRES_COPY_BOTH:
1090  walres->status = WALRCV_OK_COPY_BOTH;
1091  break;
1092 
1093  case PGRES_COMMAND_OK:
1094  walres->status = WALRCV_OK_COMMAND;
1095  break;
1096 
1097  /* Empty query is considered error. */
1098  case PGRES_EMPTY_QUERY:
1099  walres->status = WALRCV_ERROR;
1100  walres->err = _("empty query");
1101  break;
1102 
1103  case PGRES_PIPELINE_SYNC:
1105  walres->status = WALRCV_ERROR;
1106  walres->err = _("unexpected pipeline mode");
1107  break;
1108 
1109  case PGRES_NONFATAL_ERROR:
1110  case PGRES_FATAL_ERROR:
1111  case PGRES_BAD_RESPONSE:
1112  walres->status = WALRCV_ERROR;
1113  walres->err = pchomp(PQerrorMessage(conn->streamConn));
1114  diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1115  if (diag_sqlstate)
1116  walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1117  diag_sqlstate[1],
1118  diag_sqlstate[2],
1119  diag_sqlstate[3],
1120  diag_sqlstate[4]);
1121  break;
1122  }
1123 
1124  PQclear(pgres);
1125 
1126  return walres;
1127 }
1128 
1129 /*
1130  * Given a List of strings, return it as single comma separated
1131  * string, quoting identifiers as needed.
1132  *
1133  * This is essentially the reverse of SplitIdentifierString.
1134  *
1135  * The caller should free the result.
1136  */
1137 static char *
1139 {
1140  ListCell *lc;
1142  bool first = true;
1143 
1144  initStringInfo(&res);
1145 
1146  foreach(lc, strings)
1147  {
1148  char *val = strVal(lfirst(lc));
1149  char *val_escaped;
1150 
1151  if (first)
1152  first = false;
1153  else
1154  appendStringInfoChar(&res, ',');
1155 
1156  val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1157  if (!val_escaped)
1158  {
1159  free(res.data);
1160  return NULL;
1161  }
1162  appendStringInfoString(&res, val_escaped);
1163  PQfreemem(val_escaped);
1164  }
1165 
1166  return res.data;
1167 }
int16 AttrNumber
Definition: attnum.h:21
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
int errdetail(const char *fmt,...)
Definition: elog.c:1037
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define _(x)
Definition: elog.c:89
#define ERROR
Definition: elog.h:33
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:50
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2135
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2086
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6898
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5501
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:760
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:6787
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2262
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:6736
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:6692
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6908
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6855
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4261
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:6942
char * PQport(const PGconn *conn)
Definition: fe-connect.c:6823
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6934
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3746
int PQflush(PGconn *conn)
Definition: fe-exec.c:3861
void PQfreemem(void *ptr)
Definition: fe-exec.c:3891
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4172
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3270
void PQclear(PGresult *res)
Definition: fe-exec.c:718
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2885
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2683
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3340
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2627
char * PQfname(const PGresult *res, int field_num)
Definition: fe-exec.c:3426
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4166
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:2004
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3735
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3760
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1424
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2051
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3325
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3348
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2082
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2751
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:777
int work_mem
Definition: globals.c:125
struct Latch * MyLatch
Definition: globals.c:58
Oid MyDatabaseId
Definition: globals.c:89
#define free(a)
Definition: header.h:65
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
long val
Definition: informix.c:664
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:524
void ResetLatch(Latch *latch)
Definition: latch.c:683
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define WL_SOCKET_CONNECTED
Definition: latch.h:135
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
@ CONNECTION_STARTED
Definition: libpq-fe.h:68
@ CONNECTION_BAD
Definition: libpq-fe.h:61
@ CONNECTION_OK
Definition: libpq-fe.h:60
@ PGRES_COPY_IN
Definition: libpq-fe.h:104
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:109
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:108
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:110
@ PGRES_COPY_OUT
Definition: libpq-fe.h:103
@ PGRES_EMPTY_QUERY
Definition: libpq-fe.h:96
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:111
@ PGRES_BAD_RESPONSE
Definition: libpq-fe.h:105
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:112
@ PGRES_NONFATAL_ERROR
Definition: libpq-fe.h:107
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:100
PostgresPollingStatusType
Definition: libpq-fe.h:85
@ PGRES_POLLING_OK
Definition: libpq-fe.h:89
@ PGRES_POLLING_READING
Definition: libpq-fe.h:87
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:88
@ PGRES_POLLING_FAILED
Definition: libpq-fe.h:86
Assert(fmt[strlen(fmt) - 1] !='\n')
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
void _PG_init(void)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
PG_MODULE_MAGIC
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
static int libpqrcv_server_version(WalReceiverConn *conn)
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static void libpqrcv_check_conninfo(const char *conninfo)
static WalReceiverFunctionsType PQWalReceiverFunctions
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1216
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
char * pchomp(const char *in)
Definition: mcxt.c:1333
char * pstrdup(const char *in)
Definition: mcxt.c:1305
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc0(Size size)
Definition: mcxt.c:1099
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
void * palloc(Size size)
Definition: mcxt.c:1068
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
int32 pg_strtoint32(const char *s)
Definition: numutils.c:175
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static AmcheckOptions opts
Definition: pg_amcheck.c:110
const void size_t len
static char * filename
Definition: pg_dumpall.c:94
#define lfirst(lc)
Definition: pg_list.h:169
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
#define DatumGetLSN(X)
Definition: pg_lsn.h:21
static bool two_phase
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229
static char * buf
Definition: pg_test_fsync.c:67
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:225
#define CStringGetDatum(X)
Definition: postgres.h:622
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:92
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:267
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:131
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67
PGconn * conn
Definition: streamutil.c:54
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: pg_list.h:51
Tuplestorestate * tuplestore
Definition: walreceiver.h:219
TupleDesc tupledesc
Definition: walreceiver.h:220
WalRcvExecStatus status
Definition: walreceiver.h:216
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:583
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:730
#define strVal(v)
Definition: value.h:72
@ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE
Definition: wait_event.h:66
@ WAIT_EVENT_LIBPQWALRECEIVER_CONNECT
Definition: wait_event.h:65
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:150
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:96
@ WALRCV_OK_COPY_IN
Definition: walreceiver.h:204
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:201
@ WALRCV_ERROR
Definition: walreceiver.h:200
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:203
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:205
@ WALRCV_OK_COPY_BOTH
Definition: walreceiver.h:206
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59