PostgreSQL Source Code  git master
libpqwalreceiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpqwalreceiver.c
4  *
5  * This file contains the libpq-specific parts of walreceiver. It's
6  * loaded as a dynamic module to avoid linking the main server binary with
7  * libpq.
8  *
9  * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18 
19 #include <unistd.h>
20 #include <sys/time.h>
21 
22 #include "access/xlog.h"
23 #include "catalog/pg_type.h"
24 #include "common/connect.h"
25 #include "funcapi.h"
26 #include "libpq-fe.h"
27 #include "mb/pg_wchar.h"
28 #include "miscadmin.h"
29 #include "pgstat.h"
30 #include "pqexpbuffer.h"
32 #include "utils/builtins.h"
33 #include "utils/memutils.h"
34 #include "utils/pg_lsn.h"
35 #include "utils/tuplestore.h"
36 
38 
39 void _PG_init(void);
40 
42 {
43  /* Current connection to the primary, if any */
45  /* Used to remember if the connection is logical or physical */
46  bool logical;
47  /* Buffer for currently read records */
48  char *recvBuf;
49 };
50 
51 /* Prototypes for interface functions */
52 static WalReceiverConn *libpqrcv_connect(const char *conninfo,
53  bool logical, const char *appname,
54  char **err);
55 static void libpqrcv_check_conninfo(const char *conninfo);
58  char **sender_host, int *sender_port);
60  TimeLineID *primary_tli);
63  TimeLineID tli, char **filename,
64  char **content, int *len);
68  TimeLineID *next_tli);
69 static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
70  pgsocket *wait_fd);
71 static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
72  int nbytes);
74  const char *slotname,
75  bool temporary,
76  CRSSnapshotAction snapshot_action,
77  XLogRecPtr *lsn);
80  const char *query,
81  const int nRetTypes,
82  const Oid *retTypes);
84 
101 };
102 
103 /* Prototypes for private functions */
104 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
105 static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
106 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
107 
108 /*
109  * Module initialization function
110  */
111 void
112 _PG_init(void)
113 {
114  if (WalReceiverFunctions != NULL)
115  elog(ERROR, "libpqwalreceiver already loaded");
117 }
118 
119 /*
120  * Establish the connection to the primary server for XLOG streaming
121  *
122  * Returns NULL on error and fills the err with palloc'ed error message.
123  */
124 static WalReceiverConn *
125 libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
126  char **err)
127 {
130  const char *keys[5];
131  const char *vals[5];
132  int i = 0;
133 
134  /*
135  * We use the expand_dbname parameter to process the connection string (or
136  * URI), and pass some extra options.
137  */
138  keys[i] = "dbname";
139  vals[i] = conninfo;
140  keys[++i] = "replication";
141  vals[i] = logical ? "database" : "true";
142  if (!logical)
143  {
144  /*
145  * The database name is ignored by the server in replication mode, but
146  * specify "replication" for .pgpass lookup.
147  */
148  keys[++i] = "dbname";
149  vals[i] = "replication";
150  }
151  keys[++i] = "fallback_application_name";
152  vals[i] = appname;
153  if (logical)
154  {
155  keys[++i] = "client_encoding";
156  vals[i] = GetDatabaseEncodingName();
157  }
158  keys[++i] = NULL;
159  vals[i] = NULL;
160 
161  Assert(i < sizeof(keys));
162 
163  conn = palloc0(sizeof(WalReceiverConn));
164  conn->streamConn = PQconnectStartParams(keys, vals,
165  /* expand_dbname = */ true);
166  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
167  {
168  *err = pchomp(PQerrorMessage(conn->streamConn));
169  return NULL;
170  }
171 
172  /*
173  * Poll connection until we have OK or FAILED status.
174  *
175  * Per spec for PQconnectPoll, first wait till socket is write-ready.
176  */
177  status = PGRES_POLLING_WRITING;
178  do
179  {
180  int io_flag;
181  int rc;
182 
183  if (status == PGRES_POLLING_READING)
184  io_flag = WL_SOCKET_READABLE;
185 #ifdef WIN32
186  /* Windows needs a different test while waiting for connection-made */
187  else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
188  io_flag = WL_SOCKET_CONNECTED;
189 #endif
190  else
191  io_flag = WL_SOCKET_WRITEABLE;
192 
194  WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
195  PQsocket(conn->streamConn),
196  0,
198 
199  /* Interrupted? */
200  if (rc & WL_LATCH_SET)
201  {
204  }
205 
206  /* If socket is ready, advance the libpq state machine */
207  if (rc & io_flag)
208  status = PQconnectPoll(conn->streamConn);
209  } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
210 
211  if (PQstatus(conn->streamConn) != CONNECTION_OK)
212  {
213  *err = pchomp(PQerrorMessage(conn->streamConn));
214  return NULL;
215  }
216 
217  if (logical)
218  {
219  PGresult *res;
220 
221  res = libpqrcv_PQexec(conn->streamConn,
223  if (PQresultStatus(res) != PGRES_TUPLES_OK)
224  {
225  PQclear(res);
226  ereport(ERROR,
227  (errmsg("could not clear search path: %s",
228  pchomp(PQerrorMessage(conn->streamConn)))));
229  }
230  PQclear(res);
231  }
232 
233  conn->logical = logical;
234 
235  return conn;
236 }
237 
238 /*
239  * Validate connection info string (just try to parse it)
240  */
241 static void
242 libpqrcv_check_conninfo(const char *conninfo)
243 {
244  PQconninfoOption *opts = NULL;
245  char *err = NULL;
246 
247  opts = PQconninfoParse(conninfo, &err);
248  if (opts == NULL)
249  {
250  /* The error string is malloc'd, so we must free it explicitly */
251  char *errcopy = err ? pstrdup(err) : "out of memory";
252 
253  PQfreemem(err);
254  ereport(ERROR,
255  (errcode(ERRCODE_SYNTAX_ERROR),
256  errmsg("invalid connection string syntax: %s", errcopy)));
257  }
258 
259  PQconninfoFree(opts);
260 }
261 
262 /*
263  * Return a user-displayable conninfo string. Any security-sensitive fields
264  * are obfuscated.
265  */
266 static char *
268 {
269  PQconninfoOption *conn_opts;
270  PQconninfoOption *conn_opt;
272  char *retval;
273 
274  Assert(conn->streamConn != NULL);
275 
276  initPQExpBuffer(&buf);
277  conn_opts = PQconninfo(conn->streamConn);
278 
279  if (conn_opts == NULL)
280  ereport(ERROR,
281  (errmsg("could not parse connection string: %s",
282  _("out of memory"))));
283 
284  /* build a clean connection string from pieces */
285  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
286  {
287  bool obfuscate;
288 
289  /* Skip debug and empty options */
290  if (strchr(conn_opt->dispchar, 'D') ||
291  conn_opt->val == NULL ||
292  conn_opt->val[0] == '\0')
293  continue;
294 
295  /* Obfuscate security-sensitive options */
296  obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
297 
298  appendPQExpBuffer(&buf, "%s%s=%s",
299  buf.len == 0 ? "" : " ",
300  conn_opt->keyword,
301  obfuscate ? "********" : conn_opt->val);
302  }
303 
304  PQconninfoFree(conn_opts);
305 
306  retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
307  termPQExpBuffer(&buf);
308  return retval;
309 }
310 
311 /*
312  * Provides information of sender this WAL receiver is connected to.
313  */
314 static void
315 libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
316  int *sender_port)
317 {
318  char *ret = NULL;
319 
320  *sender_host = NULL;
321  *sender_port = 0;
322 
323  Assert(conn->streamConn != NULL);
324 
325  ret = PQhost(conn->streamConn);
326  if (ret && strlen(ret) != 0)
327  *sender_host = pstrdup(ret);
328 
329  ret = PQport(conn->streamConn);
330  if (ret && strlen(ret) != 0)
331  *sender_port = atoi(ret);
332 }
333 
334 /*
335  * Check that primary's system identifier matches ours, and fetch the current
336  * timeline ID of the primary.
337  */
338 static char *
340 {
341  PGresult *res;
342  char *primary_sysid;
343 
344  /*
345  * Get the system identifier and timeline ID as a DataRow message from the
346  * primary server.
347  */
348  res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
349  if (PQresultStatus(res) != PGRES_TUPLES_OK)
350  {
351  PQclear(res);
352  ereport(ERROR,
353  (errmsg("could not receive database system identifier and timeline ID from "
354  "the primary server: %s",
355  pchomp(PQerrorMessage(conn->streamConn)))));
356  }
357  if (PQnfields(res) < 3 || PQntuples(res) != 1)
358  {
359  int ntuples = PQntuples(res);
360  int nfields = PQnfields(res);
361 
362  PQclear(res);
363  ereport(ERROR,
364  (errmsg("invalid response from primary server"),
365  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
366  ntuples, nfields, 3, 1)));
367  }
368  primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
369  *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
370  PQclear(res);
371 
372  return primary_sysid;
373 }
374 
375 /*
376  * Thin wrapper around libpq to obtain server version.
377  */
378 static int
380 {
381  return PQserverVersion(conn->streamConn);
382 }
383 
384 /*
385  * Start streaming WAL data from given streaming options.
386  *
387  * Returns true if we switched successfully to copy-both mode. False
388  * means the server received the command and executed it successfully, but
389  * didn't switch to copy-mode. That means that there was no WAL on the
390  * requested timeline and starting point, because the server switched to
391  * another timeline at or before the requested starting point. On failure,
392  * throws an ERROR.
393  */
394 static bool
397 {
398  StringInfoData cmd;
399  PGresult *res;
400 
401  Assert(options->logical == conn->logical);
402  Assert(options->slotname || !options->logical);
403 
404  initStringInfo(&cmd);
405 
406  /* Build the command. */
407  appendStringInfoString(&cmd, "START_REPLICATION");
408  if (options->slotname != NULL)
409  appendStringInfo(&cmd, " SLOT \"%s\"",
410  options->slotname);
411 
412  if (options->logical)
413  appendStringInfoString(&cmd, " LOGICAL");
414 
415  appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
416 
417  /*
418  * Additional options are different depending on if we are doing logical
419  * or physical replication.
420  */
421  if (options->logical)
422  {
423  char *pubnames_str;
424  List *pubnames;
425  char *pubnames_literal;
426 
427  appendStringInfoString(&cmd, " (");
428 
429  appendStringInfo(&cmd, "proto_version '%u'",
430  options->proto.logical.proto_version);
431 
432  if (options->proto.logical.streaming &&
433  PQserverVersion(conn->streamConn) >= 140000)
434  appendStringInfoString(&cmd, ", streaming 'on'");
435 
436  pubnames = options->proto.logical.publication_names;
437  pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
438  if (!pubnames_str)
439  ereport(ERROR,
440  (errmsg("could not start WAL streaming: %s",
441  pchomp(PQerrorMessage(conn->streamConn)))));
442  pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
443  strlen(pubnames_str));
444  if (!pubnames_literal)
445  ereport(ERROR,
446  (errmsg("could not start WAL streaming: %s",
447  pchomp(PQerrorMessage(conn->streamConn)))));
448  appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
449  PQfreemem(pubnames_literal);
450  pfree(pubnames_str);
451 
452  if (options->proto.logical.binary &&
453  PQserverVersion(conn->streamConn) >= 140000)
454  appendStringInfoString(&cmd, ", binary 'true'");
455 
456  appendStringInfoChar(&cmd, ')');
457  }
458  else
459  appendStringInfo(&cmd, " TIMELINE %u",
460  options->proto.physical.startpointTLI);
461 
462  /* Start streaming. */
463  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
464  pfree(cmd.data);
465 
466  if (PQresultStatus(res) == PGRES_COMMAND_OK)
467  {
468  PQclear(res);
469  return false;
470  }
471  else if (PQresultStatus(res) != PGRES_COPY_BOTH)
472  {
473  PQclear(res);
474  ereport(ERROR,
475  (errmsg("could not start WAL streaming: %s",
476  pchomp(PQerrorMessage(conn->streamConn)))));
477  }
478  PQclear(res);
479  return true;
480 }
481 
482 /*
483  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
484  * reported by the server, or 0 if it did not report it.
485  */
486 static void
488 {
489  PGresult *res;
490 
491  /*
492  * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
493  * block, but the risk seems small.
494  */
495  if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
496  PQflush(conn->streamConn))
497  ereport(ERROR,
498  (errmsg("could not send end-of-streaming message to primary: %s",
499  pchomp(PQerrorMessage(conn->streamConn)))));
500 
501  *next_tli = 0;
502 
503  /*
504  * After COPY is finished, we should receive a result set indicating the
505  * next timeline's ID, or just CommandComplete if the server was shut
506  * down.
507  *
508  * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
509  * also possible in case we aborted the copy in mid-stream.
510  */
511  res = libpqrcv_PQgetResult(conn->streamConn);
512  if (PQresultStatus(res) == PGRES_TUPLES_OK)
513  {
514  /*
515  * Read the next timeline's ID. The server also sends the timeline's
516  * starting point, but it is ignored.
517  */
518  if (PQnfields(res) < 2 || PQntuples(res) != 1)
519  ereport(ERROR,
520  (errmsg("unexpected result set after end-of-streaming")));
521  *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
522  PQclear(res);
523 
524  /* the result set should be followed by CommandComplete */
525  res = libpqrcv_PQgetResult(conn->streamConn);
526  }
527  else if (PQresultStatus(res) == PGRES_COPY_OUT)
528  {
529  PQclear(res);
530 
531  /* End the copy */
532  if (PQendcopy(conn->streamConn))
533  ereport(ERROR,
534  (errmsg("error while shutting down streaming COPY: %s",
535  pchomp(PQerrorMessage(conn->streamConn)))));
536 
537  /* CommandComplete should follow */
538  res = libpqrcv_PQgetResult(conn->streamConn);
539  }
540 
541  if (PQresultStatus(res) != PGRES_COMMAND_OK)
542  ereport(ERROR,
543  (errmsg("error reading result of streaming command: %s",
544  pchomp(PQerrorMessage(conn->streamConn)))));
545  PQclear(res);
546 
547  /* Verify that there are no more results */
548  res = libpqrcv_PQgetResult(conn->streamConn);
549  if (res != NULL)
550  ereport(ERROR,
551  (errmsg("unexpected result after CommandComplete: %s",
552  pchomp(PQerrorMessage(conn->streamConn)))));
553 }
554 
555 /*
556  * Fetch the timeline history file for 'tli' from primary.
557  */
558 static void
560  TimeLineID tli, char **filename,
561  char **content, int *len)
562 {
563  PGresult *res;
564  char cmd[64];
565 
566  Assert(!conn->logical);
567 
568  /*
569  * Request the primary to send over the history file for given timeline.
570  */
571  snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
572  res = libpqrcv_PQexec(conn->streamConn, cmd);
573  if (PQresultStatus(res) != PGRES_TUPLES_OK)
574  {
575  PQclear(res);
576  ereport(ERROR,
577  (errmsg("could not receive timeline history file from "
578  "the primary server: %s",
579  pchomp(PQerrorMessage(conn->streamConn)))));
580  }
581  if (PQnfields(res) != 2 || PQntuples(res) != 1)
582  {
583  int ntuples = PQntuples(res);
584  int nfields = PQnfields(res);
585 
586  PQclear(res);
587  ereport(ERROR,
588  (errmsg("invalid response from primary server"),
589  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
590  ntuples, nfields)));
591  }
592  *filename = pstrdup(PQgetvalue(res, 0, 0));
593 
594  *len = PQgetlength(res, 0, 1);
595  *content = palloc(*len);
596  memcpy(*content, PQgetvalue(res, 0, 1), *len);
597  PQclear(res);
598 }
599 
600 /*
601  * Send a query and wait for the results by using the asynchronous libpq
602  * functions and socket readiness events.
603  *
604  * We must not use the regular blocking libpq functions like PQexec()
605  * since they are uninterruptible by signals on some platforms, such as
606  * Windows.
607  *
608  * The function is modeled on PQexec() in libpq, but only implements
609  * those parts that are in use in the walreceiver api.
610  *
611  * May return NULL, rather than an error result, on failure.
612  */
613 static PGresult *
614 libpqrcv_PQexec(PGconn *streamConn, const char *query)
615 {
616  PGresult *lastResult = NULL;
617 
618  /*
619  * PQexec() silently discards any prior query results on the connection.
620  * This is not required for this function as it's expected that the caller
621  * (which is this library in all cases) will behave correctly and we don't
622  * have to be backwards compatible with old libpq.
623  */
624 
625  /*
626  * Submit the query. Since we don't use non-blocking mode, this could
627  * theoretically block. In practice, since we don't send very long query
628  * strings, the risk seems negligible.
629  */
630  if (!PQsendQuery(streamConn, query))
631  return NULL;
632 
633  for (;;)
634  {
635  /* Wait for, and collect, the next PGresult. */
636  PGresult *result;
637 
638  result = libpqrcv_PQgetResult(streamConn);
639  if (result == NULL)
640  break; /* query is complete, or failure */
641 
642  /*
643  * Emulate PQexec()'s behavior of returning the last result when there
644  * are many. We are fine with returning just last error message.
645  */
646  PQclear(lastResult);
647  lastResult = result;
648 
649  if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
650  PQresultStatus(lastResult) == PGRES_COPY_OUT ||
651  PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
652  PQstatus(streamConn) == CONNECTION_BAD)
653  break;
654  }
655 
656  return lastResult;
657 }
658 
659 /*
660  * Perform the equivalent of PQgetResult(), but watch for interrupts.
661  */
662 static PGresult *
664 {
665  /*
666  * Collect data until PQgetResult is ready to get the result without
667  * blocking.
668  */
669  while (PQisBusy(streamConn))
670  {
671  int rc;
672 
673  /*
674  * We don't need to break down the sleep into smaller increments,
675  * since we'll get interrupted by signals and can handle any
676  * interrupts here.
677  */
680  WL_LATCH_SET,
681  PQsocket(streamConn),
682  0,
684 
685  /* Interrupted? */
686  if (rc & WL_LATCH_SET)
687  {
690  }
691 
692  /* Consume whatever data is available from the socket */
693  if (PQconsumeInput(streamConn) == 0)
694  {
695  /* trouble; return NULL */
696  return NULL;
697  }
698  }
699 
700  /* Now we can collect and return the next PGresult */
701  return PQgetResult(streamConn);
702 }
703 
704 /*
705  * Disconnect connection to primary, if any.
706  */
707 static void
709 {
710  PQfinish(conn->streamConn);
711  if (conn->recvBuf != NULL)
712  PQfreemem(conn->recvBuf);
713  pfree(conn);
714 }
715 
716 /*
717  * Receive a message available from XLOG stream.
718  *
719  * Returns:
720  *
721  * If data was received, returns the length of the data. *buffer is set to
722  * point to a buffer holding the received message. The buffer is only valid
723  * until the next libpqrcv_* call.
724  *
725  * If no data was available immediately, returns 0, and *wait_fd is set to a
726  * socket descriptor which can be waited on before trying again.
727  *
728  * -1 if the server ended the COPY.
729  *
730  * ereports on error.
731  */
732 static int
733 libpqrcv_receive(WalReceiverConn *conn, char **buffer,
734  pgsocket *wait_fd)
735 {
736  int rawlen;
737 
738  if (conn->recvBuf != NULL)
739  PQfreemem(conn->recvBuf);
740  conn->recvBuf = NULL;
741 
742  /* Try to receive a CopyData message */
743  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
744  if (rawlen == 0)
745  {
746  /* Try consuming some data. */
747  if (PQconsumeInput(conn->streamConn) == 0)
748  ereport(ERROR,
749  (errmsg("could not receive data from WAL stream: %s",
750  pchomp(PQerrorMessage(conn->streamConn)))));
751 
752  /* Now that we've consumed some input, try again */
753  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
754  if (rawlen == 0)
755  {
756  /* Tell caller to try again when our socket is ready. */
757  *wait_fd = PQsocket(conn->streamConn);
758  return 0;
759  }
760  }
761  if (rawlen == -1) /* end-of-streaming or error */
762  {
763  PGresult *res;
764 
765  res = libpqrcv_PQgetResult(conn->streamConn);
766  if (PQresultStatus(res) == PGRES_COMMAND_OK)
767  {
768  PQclear(res);
769 
770  /* Verify that there are no more results. */
771  res = libpqrcv_PQgetResult(conn->streamConn);
772  if (res != NULL)
773  {
774  PQclear(res);
775 
776  /*
777  * If the other side closed the connection orderly (otherwise
778  * we'd seen an error, or PGRES_COPY_IN) don't report an error
779  * here, but let callers deal with it.
780  */
781  if (PQstatus(conn->streamConn) == CONNECTION_BAD)
782  return -1;
783 
784  ereport(ERROR,
785  (errmsg("unexpected result after CommandComplete: %s",
786  PQerrorMessage(conn->streamConn))));
787  }
788 
789  return -1;
790  }
791  else if (PQresultStatus(res) == PGRES_COPY_IN)
792  {
793  PQclear(res);
794  return -1;
795  }
796  else
797  {
798  PQclear(res);
799  ereport(ERROR,
800  (errmsg("could not receive data from WAL stream: %s",
801  pchomp(PQerrorMessage(conn->streamConn)))));
802  }
803  }
804  if (rawlen < -1)
805  ereport(ERROR,
806  (errmsg("could not receive data from WAL stream: %s",
807  pchomp(PQerrorMessage(conn->streamConn)))));
808 
809  /* Return received messages to caller */
810  *buffer = conn->recvBuf;
811  return rawlen;
812 }
813 
814 /*
815  * Send a message to XLOG stream.
816  *
817  * ereports on error.
818  */
819 static void
820 libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
821 {
822  if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
823  PQflush(conn->streamConn))
824  ereport(ERROR,
825  (errmsg("could not send data to WAL stream: %s",
826  pchomp(PQerrorMessage(conn->streamConn)))));
827 }
828 
829 /*
830  * Create new replication slot.
831  * Returns the name of the exported snapshot for logical slot or NULL for
832  * physical slot.
833  */
834 static char *
835 libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
836  bool temporary, CRSSnapshotAction snapshot_action,
837  XLogRecPtr *lsn)
838 {
839  PGresult *res;
840  StringInfoData cmd;
841  char *snapshot;
842 
843  initStringInfo(&cmd);
844 
845  appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
846 
847  if (temporary)
848  appendStringInfoString(&cmd, " TEMPORARY");
849 
850  if (conn->logical)
851  {
852  appendStringInfoString(&cmd, " LOGICAL pgoutput");
853  switch (snapshot_action)
854  {
855  case CRS_EXPORT_SNAPSHOT:
856  appendStringInfoString(&cmd, " EXPORT_SNAPSHOT");
857  break;
859  appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT");
860  break;
861  case CRS_USE_SNAPSHOT:
862  appendStringInfoString(&cmd, " USE_SNAPSHOT");
863  break;
864  }
865  }
866  else
867  {
868  appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
869  }
870 
871  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
872  pfree(cmd.data);
873 
874  if (PQresultStatus(res) != PGRES_TUPLES_OK)
875  {
876  PQclear(res);
877  ereport(ERROR,
878  (errmsg("could not create replication slot \"%s\": %s",
879  slotname, pchomp(PQerrorMessage(conn->streamConn)))));
880  }
881 
882  if (lsn)
884  CStringGetDatum(PQgetvalue(res, 0, 1))));
885 
886  if (!PQgetisnull(res, 0, 2))
887  snapshot = pstrdup(PQgetvalue(res, 0, 2));
888  else
889  snapshot = NULL;
890 
891  PQclear(res);
892 
893  return snapshot;
894 }
895 
896 /*
897  * Return PID of remote backend process.
898  */
899 static pid_t
901 {
902  return PQbackendPID(conn->streamConn);
903 }
904 
905 /*
906  * Convert tuple query result to tuplestore.
907  */
908 static void
910  const int nRetTypes, const Oid *retTypes)
911 {
912  int tupn;
913  int coln;
914  int nfields = PQnfields(pgres);
915  HeapTuple tuple;
916  AttInMetadata *attinmeta;
917  MemoryContext rowcontext;
918  MemoryContext oldcontext;
919 
920  /* Make sure we got expected number of fields. */
921  if (nfields != nRetTypes)
922  ereport(ERROR,
923  (errmsg("invalid query response"),
924  errdetail("Expected %d fields, got %d fields.",
925  nRetTypes, nfields)));
926 
927  walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
928 
929  /* Create tuple descriptor corresponding to expected result. */
930  walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
931  for (coln = 0; coln < nRetTypes; coln++)
932  TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
933  PQfname(pgres, coln), retTypes[coln], -1, 0);
934  attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
935 
936  /* No point in doing more here if there were no tuples returned. */
937  if (PQntuples(pgres) == 0)
938  return;
939 
940  /* Create temporary context for local allocations. */
942  "libpqrcv query result context",
944 
945  /* Process returned rows. */
946  for (tupn = 0; tupn < PQntuples(pgres); tupn++)
947  {
948  char *cstrs[MaxTupleAttributeNumber];
949 
951 
952  /* Do the allocations in temporary context. */
953  oldcontext = MemoryContextSwitchTo(rowcontext);
954 
955  /*
956  * Fill cstrs with null-terminated strings of column values.
957  */
958  for (coln = 0; coln < nfields; coln++)
959  {
960  if (PQgetisnull(pgres, tupn, coln))
961  cstrs[coln] = NULL;
962  else
963  cstrs[coln] = PQgetvalue(pgres, tupn, coln);
964  }
965 
966  /* Convert row to a tuple, and add it to the tuplestore */
967  tuple = BuildTupleFromCStrings(attinmeta, cstrs);
968  tuplestore_puttuple(walres->tuplestore, tuple);
969 
970  /* Clean up */
971  MemoryContextSwitchTo(oldcontext);
972  MemoryContextReset(rowcontext);
973  }
974 
975  MemoryContextDelete(rowcontext);
976 }
977 
978 /*
979  * Public interface for sending generic queries (and commands).
980  *
981  * This can only be called from process connected to database.
982  */
983 static WalRcvExecResult *
984 libpqrcv_exec(WalReceiverConn *conn, const char *query,
985  const int nRetTypes, const Oid *retTypes)
986 {
987  PGresult *pgres = NULL;
988  WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
989  char *diag_sqlstate;
990 
991  if (MyDatabaseId == InvalidOid)
992  ereport(ERROR,
993  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
994  errmsg("the query interface requires a database connection")));
995 
996  pgres = libpqrcv_PQexec(conn->streamConn, query);
997 
998  switch (PQresultStatus(pgres))
999  {
1000  case PGRES_SINGLE_TUPLE:
1001  case PGRES_TUPLES_OK:
1002  walres->status = WALRCV_OK_TUPLES;
1003  libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1004  break;
1005 
1006  case PGRES_COPY_IN:
1007  walres->status = WALRCV_OK_COPY_IN;
1008  break;
1009 
1010  case PGRES_COPY_OUT:
1011  walres->status = WALRCV_OK_COPY_OUT;
1012  break;
1013 
1014  case PGRES_COPY_BOTH:
1015  walres->status = WALRCV_OK_COPY_BOTH;
1016  break;
1017 
1018  case PGRES_COMMAND_OK:
1019  walres->status = WALRCV_OK_COMMAND;
1020  break;
1021 
1022  /* Empty query is considered error. */
1023  case PGRES_EMPTY_QUERY:
1024  walres->status = WALRCV_ERROR;
1025  walres->err = _("empty query");
1026  break;
1027 
1028  case PGRES_PIPELINE_SYNC:
1030  walres->status = WALRCV_ERROR;
1031  walres->err = _("unexpected pipeline mode");
1032  break;
1033 
1034  case PGRES_NONFATAL_ERROR:
1035  case PGRES_FATAL_ERROR:
1036  case PGRES_BAD_RESPONSE:
1037  walres->status = WALRCV_ERROR;
1038  walres->err = pchomp(PQerrorMessage(conn->streamConn));
1039  diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1040  if (diag_sqlstate)
1041  walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1042  diag_sqlstate[1],
1043  diag_sqlstate[2],
1044  diag_sqlstate[3],
1045  diag_sqlstate[4]);
1046  break;
1047  }
1048 
1049  PQclear(pgres);
1050 
1051  return walres;
1052 }
1053 
1054 /*
1055  * Given a List of strings, return it as single comma separated
1056  * string, quoting identifiers as needed.
1057  *
1058  * This is essentially the reverse of SplitIdentifierString.
1059  *
1060  * The caller should free the result.
1061  */
1062 static char *
1064 {
1065  ListCell *lc;
1066  StringInfoData res;
1067  bool first = true;
1068 
1069  initStringInfo(&res);
1070 
1071  foreach(lc, strings)
1072  {
1073  char *val = strVal(lfirst(lc));
1074  char *val_escaped;
1075 
1076  if (first)
1077  first = false;
1078  else
1079  appendStringInfoChar(&res, ',');
1080 
1081  val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1082  if (!val_escaped)
1083  {
1084  free(res.data);
1085  return NULL;
1086  }
1087  appendStringInfoString(&res, val_escaped);
1088  PQfreemem(val_escaped);
1089  }
1090 
1091  return res.data;
1092 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2494
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3572
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3175
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6735
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
static void libpqrcv_check_conninfo(const char *conninfo)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
uint32 TimeLineID
Definition: xlogdefs.h:59
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3561
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
char * PQfname(const PGresult *res, int field_num)
Definition: fe-exec.c:3253
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:131
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:64
void _PG_init(void)
char * pstrdup(const char *in)
Definition: mcxt.c:1299
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:698
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4229
char * PQport(const PGconn *conn)
Definition: fe-connect.c:6650
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2550
#define WL_SOCKET_READABLE
Definition: latch.h:126
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6725
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3167
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3097
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
void ResetLatch(Latch *latch)
Definition: latch.c:660
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2618
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2146
static PGresult * libpqrcv_PQgetResult(PGconn *streamConn)
char * pchomp(const char *in)
Definition: mcxt.c:1327
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1279
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:3994
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5328
void pfree(void *pointer)
Definition: mcxt.c:1169
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TupleDesc tupledesc
Definition: walreceiver.h:218
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
#define ERROR
Definition: elog.h:46
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
PG_MODULE_MAGIC
PGconn * conn
Definition: streamutil.c:54
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:267
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:730
int PQflush(PGconn *conn)
Definition: fe-exec.c:3685
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
static char * buf
Definition: pg_test_fsync.c:68
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
int errdetail(const char *fmt,...)
Definition: elog.c:1042
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:6563
#define CStringGetDatum(X)
Definition: postgres.h:622
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:791
XLogRecPtr startpoint
Definition: walreceiver.h:170
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
int pgsocket
Definition: port.h:31
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:6519
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:580
static WalReceiverFunctionsType PQWalReceiverFunctions
static AmcheckOptions opts
Definition: pg_amcheck.c:110
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
struct WalRcvStreamOptions::@103::@104 physical
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
void * palloc0(Size size)
Definition: mcxt.c:1093
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:6761
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:95
Oid MyDatabaseId
Definition: globals.c:88
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2097
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1853
int work_mem
Definition: globals.c:124
static void libpqrcv_disconnect(WalReceiverConn *conn)
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2251
static int libpqrcv_server_version(WalReceiverConn *conn)
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:3988
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
#define InvalidOid
Definition: postgres_ext.h:36
#define DatumGetLSN(X)
Definition: pg_lsn.h:21
#define ereport(elevel,...)
Definition: elog.h:157
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:6614
void PQclear(PGresult *res)
Definition: fe-exec.c:680
#define free(a)
Definition: header.h:65
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2752
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3152
Tuplestorestate * tuplestore
Definition: walreceiver.h:217
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
WalRcvExecStatus status
Definition: walreceiver.h:214
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1216
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1900
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
int32 pg_strtoint32(const char *s)
Definition: numutils.c:263
PostgresPollingStatusType
Definition: libpq-fe.h:73
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err)
static char * filename
Definition: pg_dumpall.c:91
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
CRSSnapshotAction
Definition: walsender.h:20
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
#define elog(elevel,...)
Definition: elog.h:232
int i
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:765
struct Latch * MyLatch
Definition: globals.c:57
union WalRcvStreamOptions::@103 proto
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3586
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6682
#define WL_SOCKET_CONNECTED
Definition: latch.h:135
Definition: pg_list.h:50
#define snprintf
Definition: port.h:216
#define WL_LATCH_SET
Definition: latch.h:125
int16 AttrNumber
Definition: attnum.h:21
#define _(x)
Definition: elog.c:89
void PQfreemem(void *ptr)
Definition: fe-exec.c:3715
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6753
long val
Definition: informix.c:664
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1927
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:92
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130