PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
libpqwalreceiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpqwalreceiver.c
4  *
5  * This file contains the libpq-specific parts of walreceiver. It's
6  * loaded as a dynamic module to avoid linking the main server binary with
7  * libpq.
8  *
9  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18 
19 #include <unistd.h>
20 #include <sys/time.h>
21 
22 #include "libpq-fe.h"
23 #include "pqexpbuffer.h"
24 #include "access/xlog.h"
25 #include "mb/pg_wchar.h"
26 #include "miscadmin.h"
27 #include "pgstat.h"
30 #include "storage/proc.h"
31 #include "utils/builtins.h"
32 #include "utils/pg_lsn.h"
33 
35 
36 void _PG_init(void);
37 
39 {
40  /* Current connection to the primary, if any */
42  /* Used to remember if the connection is logical or physical */
43  bool logical;
44  /* Buffer for currently read records */
45  char *recvBuf;
46 };
47 
48 /* Prototypes for interface functions */
49 static WalReceiverConn *libpqrcv_connect(const char *conninfo,
50  bool logical, const char *appname,
51  char **err);
52 static void libpqrcv_check_conninfo(const char *conninfo);
55  TimeLineID *primary_tli,
56  int *server_version);
58  TimeLineID tli, char **filename,
59  char **content, int *len);
63  TimeLineID *next_tli);
64 static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
65  pgsocket *wait_fd);
66 static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
67  int nbytes);
69  const char *slotname,
70  bool temporary,
71  XLogRecPtr *lsn);
73  const char *cmd, char **err);
75 
89 };
90 
91 /* Prototypes for private functions */
92 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
93 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
94 
95 /*
96  * Module initialization function
97  */
98 void
99 _PG_init(void)
100 {
101  if (WalReceiverFunctions != NULL)
102  elog(ERROR, "libpqwalreceiver already loaded");
104 }
105 
106 /*
107  * Establish the connection to the primary server for XLOG streaming
108  *
109  * Returns NULL on error and fills the err with palloc'ed error message.
110  */
111 static WalReceiverConn *
112 libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
113  char **err)
114 {
116  const char *keys[5];
117  const char *vals[5];
118  int i = 0;
119 
120  /*
121  * We use the expand_dbname parameter to process the connection string (or
122  * URI), and pass some extra options. The deliberately undocumented
123  * parameter "replication=true" makes it a replication connection. The
124  * database name is ignored by the server in replication mode, but specify
125  * "replication" for .pgpass lookup.
126  */
127  keys[i] = "dbname";
128  vals[i] = conninfo;
129  keys[++i] = "replication";
130  vals[i] = logical ? "database" : "true";
131  if (!logical)
132  {
133  keys[++i] = "dbname";
134  vals[i] = "replication";
135  }
136  keys[++i] = "fallback_application_name";
137  vals[i] = appname;
138  if (logical)
139  {
140  keys[++i] = "client_encoding";
141  vals[i] = GetDatabaseEncodingName();
142  }
143  keys[++i] = NULL;
144  vals[i] = NULL;
145 
146  Assert(i < sizeof(keys));
147 
148  conn = palloc0(sizeof(WalReceiverConn));
149  conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
150  if (PQstatus(conn->streamConn) != CONNECTION_OK)
151  {
152  *err = pstrdup(PQerrorMessage(conn->streamConn));
153  return NULL;
154  }
155 
156  conn->logical = logical;
157 
158  return conn;
159 }
160 
161 /*
162  * Validate connection info string (just try to parse it)
163  */
164 static void
165 libpqrcv_check_conninfo(const char *conninfo)
166 {
167  PQconninfoOption *opts = NULL;
168  char *err = NULL;
169 
170  opts = PQconninfoParse(conninfo, &err);
171  if (opts == NULL)
172  ereport(ERROR,
173  (errcode(ERRCODE_SYNTAX_ERROR),
174  errmsg("invalid connection string syntax: %s", err)));
175 
176  PQconninfoFree(opts);
177 }
178 
179 /*
180  * Return a user-displayable conninfo string. Any security-sensitive fields
181  * are obfuscated.
182  */
183 static char *
185 {
186  PQconninfoOption *conn_opts;
187  PQconninfoOption *conn_opt;
189  char *retval;
190 
191  Assert(conn->streamConn != NULL);
192 
193  initPQExpBuffer(&buf);
194  conn_opts = PQconninfo(conn->streamConn);
195 
196  if (conn_opts == NULL)
197  ereport(ERROR,
198  (errmsg("could not parse connection string: %s",
199  _("out of memory"))));
200 
201  /* build a clean connection string from pieces */
202  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
203  {
204  bool obfuscate;
205 
206  /* Skip debug and empty options */
207  if (strchr(conn_opt->dispchar, 'D') ||
208  conn_opt->val == NULL ||
209  conn_opt->val[0] == '\0')
210  continue;
211 
212  /* Obfuscate security-sensitive options */
213  obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
214 
215  appendPQExpBuffer(&buf, "%s%s=%s",
216  buf.len == 0 ? "" : " ",
217  conn_opt->keyword,
218  obfuscate ? "********" : conn_opt->val);
219  }
220 
221  PQconninfoFree(conn_opts);
222 
223  retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
224  termPQExpBuffer(&buf);
225  return retval;
226 }
227 
228 /*
229  * Check that primary's system identifier matches ours, and fetch the current
230  * timeline ID of the primary.
231  */
232 static char *
234  int *server_version)
235 {
236  PGresult *res;
237  char *primary_sysid;
238 
239  /*
240  * Get the system identifier and timeline ID as a DataRow message from the
241  * primary server.
242  */
243  res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
244  if (PQresultStatus(res) != PGRES_TUPLES_OK)
245  {
246  PQclear(res);
247  ereport(ERROR,
248  (errmsg("could not receive database system identifier and timeline ID from "
249  "the primary server: %s",
250  PQerrorMessage(conn->streamConn))));
251  }
252  if (PQnfields(res) < 3 || PQntuples(res) != 1)
253  {
254  int ntuples = PQntuples(res);
255  int nfields = PQnfields(res);
256 
257  PQclear(res);
258  ereport(ERROR,
259  (errmsg("invalid response from primary server"),
260  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
261  ntuples, nfields, 3, 1)));
262  }
263  primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
264  *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
265  PQclear(res);
266 
267  *server_version = PQserverVersion(conn->streamConn);
268 
269  return primary_sysid;
270 }
271 
272 /*
273  * Start streaming WAL data from given streaming options.
274  *
275  * Returns true if we switched successfully to copy-both mode. False
276  * means the server received the command and executed it successfully, but
277  * didn't switch to copy-mode. That means that there was no WAL on the
278  * requested timeline and starting point, because the server switched to
279  * another timeline at or before the requested starting point. On failure,
280  * throws an ERROR.
281  */
282 static bool
285 {
286  StringInfoData cmd;
287  PGresult *res;
288 
289  Assert(options->logical == conn->logical);
290  Assert(options->slotname || !options->logical);
291 
292  initStringInfo(&cmd);
293 
294  /* Build the command. */
295  appendStringInfoString(&cmd, "START_REPLICATION");
296  if (options->slotname != NULL)
297  appendStringInfo(&cmd, " SLOT \"%s\"",
298  options->slotname);
299 
300  if (options->logical)
301  appendStringInfo(&cmd, " LOGICAL");
302 
303  appendStringInfo(&cmd, " %X/%X",
304  (uint32) (options->startpoint >> 32),
305  (uint32) options->startpoint);
306 
307  /*
308  * Additional options are different depending on if we are doing logical
309  * or physical replication.
310  */
311  if (options->logical)
312  {
313  char *pubnames_str;
314  List *pubnames;
315  char *pubnames_literal;
316 
317  appendStringInfoString(&cmd, " (");
318 
319  appendStringInfo(&cmd, "proto_version '%u'",
320  options->proto.logical.proto_version);
321 
322  pubnames = options->proto.logical.publication_names;
323  pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
324  if (!pubnames_str)
325  ereport(ERROR,
326  (errmsg("could not start WAL streaming: %s",
327  PQerrorMessage(conn->streamConn))));
328  pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
329  strlen(pubnames_str));
330  if (!pubnames_literal)
331  ereport(ERROR,
332  (errmsg("could not start WAL streaming: %s",
333  PQerrorMessage(conn->streamConn))));
334  appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
335  PQfreemem(pubnames_literal);
336  pfree(pubnames_str);
337 
338  appendStringInfoChar(&cmd, ')');
339  }
340  else
341  appendStringInfo(&cmd, " TIMELINE %u",
342  options->proto.physical.startpointTLI);
343 
344  /* Start streaming. */
345  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
346  pfree(cmd.data);
347 
348  if (PQresultStatus(res) == PGRES_COMMAND_OK)
349  {
350  PQclear(res);
351  return false;
352  }
353  else if (PQresultStatus(res) != PGRES_COPY_BOTH)
354  {
355  PQclear(res);
356  ereport(ERROR,
357  (errmsg("could not start WAL streaming: %s",
358  PQerrorMessage(conn->streamConn))));
359  }
360  PQclear(res);
361  return true;
362 }
363 
364 /*
365  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
366  * reported by the server, or 0 if it did not report it.
367  */
368 static void
370 {
371  PGresult *res;
372 
373  if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
374  PQflush(conn->streamConn))
375  ereport(ERROR,
376  (errmsg("could not send end-of-streaming message to primary: %s",
377  PQerrorMessage(conn->streamConn))));
378 
379  *next_tli = 0;
380 
381  /*
382  * After COPY is finished, we should receive a result set indicating the
383  * next timeline's ID, or just CommandComplete if the server was shut
384  * down.
385  *
386  * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
387  * would also be possible. However, at the moment this function is only
388  * called after receiving CopyDone from the backend - the walreceiver
389  * never terminates replication on its own initiative.
390  */
391  res = PQgetResult(conn->streamConn);
392  if (PQresultStatus(res) == PGRES_TUPLES_OK)
393  {
394  /*
395  * Read the next timeline's ID. The server also sends the timeline's
396  * starting point, but it is ignored.
397  */
398  if (PQnfields(res) < 2 || PQntuples(res) != 1)
399  ereport(ERROR,
400  (errmsg("unexpected result set after end-of-streaming")));
401  *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
402  PQclear(res);
403 
404  /* the result set should be followed by CommandComplete */
405  res = PQgetResult(conn->streamConn);
406  }
407  else if (PQresultStatus(res) == PGRES_COPY_OUT)
408  {
409  PQclear(res);
410 
411  /* End the copy */
412  PQendcopy(conn->streamConn);
413 
414  /* CommandComplete should follow */
415  res = PQgetResult(conn->streamConn);
416  }
417 
418  if (PQresultStatus(res) != PGRES_COMMAND_OK)
419  ereport(ERROR,
420  (errmsg("error reading result of streaming command: %s",
421  PQerrorMessage(conn->streamConn))));
422  PQclear(res);
423 
424  /* Verify that there are no more results */
425  res = PQgetResult(conn->streamConn);
426  if (res != NULL)
427  ereport(ERROR,
428  (errmsg("unexpected result after CommandComplete: %s",
429  PQerrorMessage(conn->streamConn))));
430 }
431 
432 /*
433  * Fetch the timeline history file for 'tli' from primary.
434  */
435 static void
437  TimeLineID tli, char **filename,
438  char **content, int *len)
439 {
440  PGresult *res;
441  char cmd[64];
442 
443  Assert(!conn->logical);
444 
445  /*
446  * Request the primary to send over the history file for given timeline.
447  */
448  snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
449  res = libpqrcv_PQexec(conn->streamConn, cmd);
450  if (PQresultStatus(res) != PGRES_TUPLES_OK)
451  {
452  PQclear(res);
453  ereport(ERROR,
454  (errmsg("could not receive timeline history file from "
455  "the primary server: %s",
456  PQerrorMessage(conn->streamConn))));
457  }
458  if (PQnfields(res) != 2 || PQntuples(res) != 1)
459  {
460  int ntuples = PQntuples(res);
461  int nfields = PQnfields(res);
462 
463  PQclear(res);
464  ereport(ERROR,
465  (errmsg("invalid response from primary server"),
466  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
467  ntuples, nfields)));
468  }
469  *filename = pstrdup(PQgetvalue(res, 0, 0));
470 
471  *len = PQgetlength(res, 0, 1);
472  *content = palloc(*len);
473  memcpy(*content, PQgetvalue(res, 0, 1), *len);
474  PQclear(res);
475 }
476 
477 /*
478  * Send a query and wait for the results by using the asynchronous libpq
479  * functions and socket readiness events.
480  *
481  * We must not use the regular blocking libpq functions like PQexec()
482  * since they are uninterruptible by signals on some platforms, such as
483  * Windows.
484  *
485  * The function is modeled on PQexec() in libpq, but only implements
486  * those parts that are in use in the walreceiver.
487  *
488  * Queries are always executed on the connection in streamConn.
489  */
490 static PGresult *
491 libpqrcv_PQexec(PGconn *streamConn, const char *query)
492 {
493  PGresult *result = NULL;
494  PGresult *lastResult = NULL;
495 
496  /*
497  * PQexec() silently discards any prior query results on the connection.
498  * This is not required for walreceiver since it's expected that walsender
499  * won't generate any such junk results.
500  */
501 
502  /*
503  * Submit a query. Since we don't use non-blocking mode, this also can
504  * block. But its risk is relatively small, so we ignore that for now.
505  */
506  if (!PQsendQuery(streamConn, query))
507  return NULL;
508 
509  for (;;)
510  {
511  /*
512  * Receive data until PQgetResult is ready to get the result without
513  * blocking.
514  */
515  while (PQisBusy(streamConn))
516  {
517  int rc;
518 
519  /*
520  * We don't need to break down the sleep into smaller increments,
521  * since we'll get interrupted by signals and can either handle
522  * interrupts here or elog(FATAL) within SIGTERM signal handler if
523  * the signal arrives in the middle of establishment of
524  * replication connection.
525  */
529  WL_LATCH_SET,
530  PQsocket(streamConn),
531  0,
533  if (rc & WL_POSTMASTER_DEATH)
534  exit(1);
535 
536  /* interrupted */
537  if (rc & WL_LATCH_SET)
538  {
540  continue;
541  }
542  if (PQconsumeInput(streamConn) == 0)
543  return NULL; /* trouble */
544  }
545 
546  /*
547  * Emulate the PQexec()'s behavior of returning the last result when
548  * there are many. Since walsender will never generate multiple
549  * results, we skip the concatenation of error messages.
550  */
551  result = PQgetResult(streamConn);
552  if (result == NULL)
553  break; /* query is complete */
554 
555  PQclear(lastResult);
556  lastResult = result;
557 
558  if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
559  PQresultStatus(lastResult) == PGRES_COPY_OUT ||
560  PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
561  PQstatus(streamConn) == CONNECTION_BAD)
562  break;
563  }
564 
565  return lastResult;
566 }
567 
568 /*
569  * Disconnect connection to primary, if any.
570  */
571 static void
573 {
574  PQfinish(conn->streamConn);
575  if (conn->recvBuf != NULL)
576  PQfreemem(conn->recvBuf);
577  pfree(conn);
578 }
579 
580 /*
581  * Receive a message available from XLOG stream.
582  *
583  * Returns:
584  *
585  * If data was received, returns the length of the data. *buffer is set to
586  * point to a buffer holding the received message. The buffer is only valid
587  * until the next libpqrcv_* call.
588  *
589  * If no data was available immediately, returns 0, and *wait_fd is set to a
590  * socket descriptor which can be waited on before trying again.
591  *
592  * -1 if the server ended the COPY.
593  *
594  * ereports on error.
595  */
596 static int
598  pgsocket *wait_fd)
599 {
600  int rawlen;
601 
602  if (conn->recvBuf != NULL)
603  PQfreemem(conn->recvBuf);
604  conn->recvBuf = NULL;
605 
606  /* Try to receive a CopyData message */
607  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
608  if (rawlen == 0)
609  {
610  /* Try consuming some data. */
611  if (PQconsumeInput(conn->streamConn) == 0)
612  ereport(ERROR,
613  (errmsg("could not receive data from WAL stream: %s",
614  PQerrorMessage(conn->streamConn))));
615 
616  /* Now that we've consumed some input, try again */
617  rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
618  if (rawlen == 0)
619  {
620  /* Tell caller to try again when our socket is ready. */
621  *wait_fd = PQsocket(conn->streamConn);
622  return 0;
623  }
624  }
625  if (rawlen == -1) /* end-of-streaming or error */
626  {
627  PGresult *res;
628 
629  res = PQgetResult(conn->streamConn);
630  if (PQresultStatus(res) == PGRES_COMMAND_OK ||
632  {
633  PQclear(res);
634  return -1;
635  }
636  else
637  {
638  PQclear(res);
639  ereport(ERROR,
640  (errmsg("could not receive data from WAL stream: %s",
641  PQerrorMessage(conn->streamConn))));
642  }
643  }
644  if (rawlen < -1)
645  ereport(ERROR,
646  (errmsg("could not receive data from WAL stream: %s",
647  PQerrorMessage(conn->streamConn))));
648 
649  /* Return received messages to caller */
650  *buffer = conn->recvBuf;
651  return rawlen;
652 }
653 
654 /*
655  * Send a message to XLOG stream.
656  *
657  * ereports on error.
658  */
659 static void
660 libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
661 {
662  if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
663  PQflush(conn->streamConn))
664  ereport(ERROR,
665  (errmsg("could not send data to WAL stream: %s",
666  PQerrorMessage(conn->streamConn))));
667 }
668 
669 /*
670  * Create new replication slot.
671  * Returns the name of the exported snapshot for logical slot or NULL for
672  * physical slot.
673  */
674 static char *
676  bool temporary, XLogRecPtr *lsn)
677 {
678  PGresult *res;
679  StringInfoData cmd;
680  char *snapshot;
681 
682  initStringInfo(&cmd);
683 
684  appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname);
685 
686  if (temporary)
687  appendStringInfo(&cmd, "TEMPORARY ");
688 
689  if (conn->logical)
690  appendStringInfo(&cmd, "LOGICAL pgoutput");
691 
692  res = libpqrcv_PQexec(conn->streamConn, cmd.data);
693  pfree(cmd.data);
694 
695  if (PQresultStatus(res) != PGRES_TUPLES_OK)
696  {
697  PQclear(res);
698  ereport(ERROR,
699  (errmsg("could not create replication slot \"%s\": %s",
700  slotname, PQerrorMessage(conn->streamConn))));
701  }
702 
704  CStringGetDatum(PQgetvalue(res, 0, 1))));
705  if (!PQgetisnull(res, 0, 2))
706  snapshot = pstrdup(PQgetvalue(res, 0, 2));
707  else
708  snapshot = NULL;
709 
710  PQclear(res);
711 
712  return snapshot;
713 }
714 
715 /*
716  * Run command.
717  *
718  * Returns if the command has succeeded and fills the err with palloced
719  * error message if not.
720  */
721 static bool
722 libpqrcv_command(WalReceiverConn *conn, const char *cmd, char **err)
723 {
724  PGresult *res;
725 
726  res = libpqrcv_PQexec(conn->streamConn, cmd);
727 
728  if (PQresultStatus(res) != PGRES_COMMAND_OK)
729  {
730  PQclear(res);
731  *err = pstrdup(PQerrorMessage(conn->streamConn));
732  return false;
733  }
734 
735  PQclear(res);
736 
737  return true;
738 }
739 
740 /*
741  * Given a List of strings, return it as single comma separated
742  * string, quoting identifiers as needed.
743  *
744  * This is essentially the reverse of SplitIdentifierString.
745  *
746  * The caller should free the result.
747  */
748 static char *
750 {
751  ListCell *lc;
752  StringInfoData res;
753  bool first = true;
754 
755  initStringInfo(&res);
756 
757  foreach (lc, strings)
758  {
759  char *val = strVal(lfirst(lc));
760  char *val_escaped;
761 
762  if (first)
763  first = false;
764  else
765  appendStringInfoChar(&res, ',');
766 
767  val_escaped = PQescapeIdentifier(conn, val, strlen(val));
768  if (!val_escaped)
769  {
770  free(res.data);
771  return NULL;
772  }
773  appendStringInfoString(&res, val_escaped);
774  PQfreemem(val_escaped);
775  }
776 
777  return res.data;
778 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2221
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3078
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:5959
static void libpqrcv_check_conninfo(const char *conninfo)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
uint32 TimeLineID
Definition: xlogdefs.h:45
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
PGPROC * MyProc
Definition: proc.c:67
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:128
void _PG_init(void)
char * pstrdup(const char *in)
Definition: mcxt.c:1165
union WalRcvStreamOptions::@53 proto
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3516
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2288
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:5949
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:30
Latch procLatch
Definition: proc.h:93
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2377
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:470
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1132
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:3474
static bool libpqrcv_command(WalReceiverConn *conn, const char *cmd, char **err)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:4603
void pfree(void *pointer)
Definition: mcxt.c:992
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:110
#define ERROR
Definition: elog.h:43
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
PG_MODULE_MAGIC
PGconn * conn
Definition: streamutil.c:42
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:262
int PQflush(PGconn *conn)
Definition: fe-exec.c:3187
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:189
static char * buf
Definition: pg_test_fsync.c:65
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, XLogRecPtr *lsn)
int errdetail(const char *fmt,...)
Definition: elog.c:873
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:5809
#define CStringGetDatum(X)
Definition: postgres.h:586
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:1016
XLogRecPtr startpoint
Definition: walreceiver.h:144
unsigned int uint32
Definition: c.h:265
struct WalRcvStreamOptions::@53::@54 physical
int pgsocket
Definition: port.h:22
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:5768
#define ereport(elevel, rest)
Definition: elog.h:122
static WalReceiverFunctionsType PQWalReceiverFunctions
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:201
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
void * palloc0(Size size)
Definition: mcxt.c:920
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:79
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1631
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli, int *server_version)
static void libpqrcv_disconnect(WalReceiverConn *conn)
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:3468
static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query)
#define InvalidOid
Definition: postgres_ext.h:36
#define DatumGetLSN(X)
Definition: pg_lsn.h:21
void PQclear(PGresult *res)
Definition: fe-exec.c:650
#define free(a)
Definition: header.h:60
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2522
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
#define lfirst(lc)
Definition: pg_list.h:106
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1021
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1681
static int server_version
Definition: pg_dumpall.c:77
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err)
static char * filename
Definition: pg_dumpall.c:80
void * palloc(Size size)
Definition: mcxt.c:891
int errmsg(const char *fmt,...)
Definition: elog.c:797
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
int i
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3092
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:5906
Definition: pg_list.h:45
#define WL_LATCH_SET
Definition: latch.h:124
#define _(x)
Definition: elog.c:84
void PQfreemem(void *ptr)
Definition: fe-exec.c:3200
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:5977
long val
Definition: informix.c:689
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1702
int32 pg_atoi(const char *s, int size, int c)
Definition: numutils.c:37
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:89