PostgreSQL Source Code  git master
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
18 #include "commands/defrem.h"
19 #include "mb/pg_wchar.h"
20 #include "miscadmin.h"
21 #include "pgstat.h"
22 #include "postgres_fdw.h"
23 #include "storage/latch.h"
24 #include "utils/hsearch.h"
25 #include "utils/inval.h"
26 #include "utils/memutils.h"
27 #include "utils/syscache.h"
28 
29 /*
30  * Connection cache hash table entry
31  *
32  * The lookup key in this hash table is the user mapping OID. We use just one
33  * connection per user mapping ID, which ensures that all the scans use the
34  * same snapshot during a query. Using the user mapping OID rather than
35  * the foreign server OID + user OID avoids creating multiple connections when
36  * the public user mapping applies to all user OIDs.
37  *
38  * The "conn" pointer can be NULL if we don't currently have a live connection.
39  * When we do have a connection, xact_depth tracks the current depth of
40  * transactions and subtransactions open on the remote side. We need to issue
41  * commands at the same nesting depth on the remote as we're executing at
42  * ourselves, so that rolling back a subtransaction will kill the right
43  * queries and not the wrong ones.
44  */
45 typedef Oid ConnCacheKey;
46 
47 typedef struct ConnCacheEntry
48 {
49  ConnCacheKey key; /* hash key (must be first) */
50  PGconn *conn; /* connection to foreign server, or NULL */
51  /* Remaining fields are invalid when conn is NULL: */
52  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
53  * one level of subxact open, etc */
54  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
55  bool have_error; /* have any subxacts aborted in this xact? */
56  bool changing_xact_state; /* xact state change in process */
57  bool invalidated; /* true if reconnect is pending */
58  uint32 server_hashvalue; /* hash value of foreign server OID */
59  uint32 mapping_hashvalue; /* hash value of user mapping OID */
61 
62 /*
63  * Connection cache (initialized on first use)
64  */
65 static HTAB *ConnectionHash = NULL;
66 
67 /* for assigning cursor numbers and prepared statement numbers */
68 static unsigned int cursor_number = 0;
69 static unsigned int prep_stmt_number = 0;
70 
71 /* tracks whether any work is needed in callback functions */
72 static bool xact_got_connection = false;
73 
74 /* prototypes of private functions */
76 static void disconnect_pg_server(ConnCacheEntry *entry);
77 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
79 static void do_sql_command(PGconn *conn, const char *sql);
80 static void begin_remote_xact(ConnCacheEntry *entry);
81 static void pgfdw_xact_callback(XactEvent event, void *arg);
82 static void pgfdw_subxact_callback(SubXactEvent event,
83  SubTransactionId mySubid,
84  SubTransactionId parentSubid,
85  void *arg);
86 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
88 static bool pgfdw_cancel_query(PGconn *conn);
89 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
90  bool ignore_errors);
91 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
92  PGresult **result);
94 
95 /*
96  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
97  * server with the user's authorization. A new connection is established
98  * if we don't already have a suitable one, and a transaction is opened at
99  * the right subtransaction nesting depth if we didn't do that already.
100  *
101  * will_prep_stmt must be true if caller intends to create any prepared
102  * statements. Since those don't go away automatically at transaction end
103  * (not even on error), we need this flag to cue manual cleanup.
104  */
105 PGconn *
106 GetConnection(UserMapping *user, bool will_prep_stmt)
107 {
108  bool found;
109  ConnCacheEntry *entry;
111 
112  /* First time through, initialize connection cache hashtable */
113  if (ConnectionHash == NULL)
114  {
115  HASHCTL ctl;
116 
117  MemSet(&ctl, 0, sizeof(ctl));
118  ctl.keysize = sizeof(ConnCacheKey);
119  ctl.entrysize = sizeof(ConnCacheEntry);
120  /* allocate ConnectionHash in the cache context */
121  ctl.hcxt = CacheMemoryContext;
122  ConnectionHash = hash_create("postgres_fdw connections", 8,
123  &ctl,
125 
126  /*
127  * Register some callback functions that manage connection cleanup.
128  * This should be done just once in each backend.
129  */
136  }
137 
138  /* Set flag that we did GetConnection during the current transaction */
139  xact_got_connection = true;
140 
141  /* Create hash key for the entry. Assume no pad bytes in key struct */
142  key = user->umid;
143 
144  /*
145  * Find or create cached entry for requested connection.
146  */
147  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
148  if (!found)
149  {
150  /*
151  * We need only clear "conn" here; remaining fields will be filled
152  * later when "conn" is set.
153  */
154  entry->conn = NULL;
155  }
156 
157  /* Reject further use of connections which failed abort cleanup. */
159 
160  /*
161  * If the connection needs to be remade due to invalidation, disconnect as
162  * soon as we're out of all transactions.
163  */
164  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
165  {
166  elog(DEBUG3, "closing connection %p for option changes to take effect",
167  entry->conn);
168  disconnect_pg_server(entry);
169  }
170 
171  /*
172  * We don't check the health of cached connection here, because it would
173  * require some overhead. Broken connection will be detected when the
174  * connection is actually used.
175  */
176 
177  /*
178  * If cache entry doesn't have a connection, we have to establish a new
179  * connection. (If connect_pg_server throws an error, the cache entry
180  * will remain in a valid empty state, ie conn == NULL.)
181  */
182  if (entry->conn == NULL)
183  {
184  ForeignServer *server = GetForeignServer(user->serverid);
185 
186  /* Reset all transient state fields, to be sure all are clean */
187  entry->xact_depth = 0;
188  entry->have_prep_stmt = false;
189  entry->have_error = false;
190  entry->changing_xact_state = false;
191  entry->invalidated = false;
192  entry->server_hashvalue =
194  ObjectIdGetDatum(server->serverid));
195  entry->mapping_hashvalue =
197  ObjectIdGetDatum(user->umid));
198 
199  /* Now try to make the connection */
200  entry->conn = connect_pg_server(server, user);
201 
202  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
203  entry->conn, server->servername, user->umid, user->userid);
204  }
205 
206  /*
207  * Start a new transaction or subtransaction if needed.
208  */
209  begin_remote_xact(entry);
210 
211  /* Remember if caller will prepare statements */
212  entry->have_prep_stmt |= will_prep_stmt;
213 
214  return entry->conn;
215 }
216 
217 /*
218  * Connect to remote server using specified server and user mapping properties.
219  */
220 static PGconn *
222 {
223  PGconn *volatile conn = NULL;
224 
225  /*
226  * Use PG_TRY block to ensure closing connection on error.
227  */
228  PG_TRY();
229  {
230  const char **keywords;
231  const char **values;
232  int n;
233 
234  /*
235  * Construct connection params from generic options of ForeignServer
236  * and UserMapping. (Some of them might not be libpq options, in
237  * which case we'll just waste a few array slots.) Add 3 extra slots
238  * for fallback_application_name, client_encoding, end marker.
239  */
240  n = list_length(server->options) + list_length(user->options) + 3;
241  keywords = (const char **) palloc(n * sizeof(char *));
242  values = (const char **) palloc(n * sizeof(char *));
243 
244  n = 0;
245  n += ExtractConnectionOptions(server->options,
246  keywords + n, values + n);
248  keywords + n, values + n);
249 
250  /* Use "postgres_fdw" as fallback_application_name. */
251  keywords[n] = "fallback_application_name";
252  values[n] = "postgres_fdw";
253  n++;
254 
255  /* Set client_encoding so that libpq can convert encoding properly. */
256  keywords[n] = "client_encoding";
257  values[n] = GetDatabaseEncodingName();
258  n++;
259 
260  keywords[n] = values[n] = NULL;
261 
262  /* verify connection parameters and make connection */
263  check_conn_params(keywords, values, user);
264 
265  conn = PQconnectdbParams(keywords, values, false);
266  if (!conn || PQstatus(conn) != CONNECTION_OK)
267  ereport(ERROR,
268  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
269  errmsg("could not connect to server \"%s\"",
270  server->servername),
271  errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
272 
273  /*
274  * Check that non-superuser has used password to establish connection;
275  * otherwise, he's piggybacking on the postgres server's user
276  * identity. See also dblink_security_check() in contrib/dblink
277  * and check_conn_params.
278  */
279  if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
281  ereport(ERROR,
282  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
283  errmsg("password is required"),
284  errdetail("Non-superuser cannot connect if the server does not request a password."),
285  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
286 
287  /* Prepare new session for use */
289 
290  pfree(keywords);
291  pfree(values);
292  }
293  PG_CATCH();
294  {
295  /* Release PGconn data structure if we managed to create one */
296  if (conn)
297  PQfinish(conn);
298  PG_RE_THROW();
299  }
300  PG_END_TRY();
301 
302  return conn;
303 }
304 
305 /*
306  * Disconnect any open connection for a connection cache entry.
307  */
308 static void
310 {
311  if (entry->conn != NULL)
312  {
313  PQfinish(entry->conn);
314  entry->conn = NULL;
315  }
316 }
317 
318 /*
319  * Return true if the password_required is defined and false for this user
320  * mapping, otherwise false. The mapping has been pre-validated.
321  */
322 static bool
324 {
325  ListCell *cell;
326 
327  foreach(cell, user->options)
328  {
329  DefElem *def = (DefElem *) lfirst(cell);
330  if (strcmp(def->defname, "password_required") == 0)
331  return defGetBoolean(def);
332  }
333 
334  return true;
335 }
336 
337 /*
338  * For non-superusers, insist that the connstr specify a password. This
339  * prevents a password from being picked up from .pgpass, a service file, the
340  * environment, etc. We don't want the postgres user's passwords,
341  * certificates, etc to be accessible to non-superusers. (See also
342  * dblink_connstr_check in contrib/dblink.)
343  */
344 static void
345 check_conn_params(const char **keywords, const char **values, UserMapping *user)
346 {
347  int i;
348 
349  /* no check required if superuser */
350  if (superuser_arg(user->userid))
351  return;
352 
353  /* ok if params contain a non-empty password */
354  for (i = 0; keywords[i] != NULL; i++)
355  {
356  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
357  return;
358  }
359 
360  /* ok if the superuser explicitly said so at user mapping creation time */
361  if (!UserMappingPasswordRequired(user))
362  return;
363 
364  ereport(ERROR,
365  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
366  errmsg("password is required"),
367  errdetail("Non-superusers must provide a password in the user mapping.")));
368 }
369 
370 /*
371  * Issue SET commands to make sure remote session is configured properly.
372  *
373  * We do this just once at connection, assuming nothing will change the
374  * values later. Since we'll never send volatile function calls to the
375  * remote, there shouldn't be any way to break this assumption from our end.
376  * It's possible to think of ways to break it at the remote end, eg making
377  * a foreign table point to a view that includes a set_config call ---
378  * but once you admit the possibility of a malicious view definition,
379  * there are any number of ways to break things.
380  */
381 static void
383 {
384  int remoteversion = PQserverVersion(conn);
385 
386  /* Force the search path to contain only pg_catalog (see deparse.c) */
387  do_sql_command(conn, "SET search_path = pg_catalog");
388 
389  /*
390  * Set remote timezone; this is basically just cosmetic, since all
391  * transmitted and returned timestamptzs should specify a zone explicitly
392  * anyway. However it makes the regression test outputs more predictable.
393  *
394  * We don't risk setting remote zone equal to ours, since the remote
395  * server might use a different timezone database. Instead, use UTC
396  * (quoted, because very old servers are picky about case).
397  */
398  do_sql_command(conn, "SET timezone = 'UTC'");
399 
400  /*
401  * Set values needed to ensure unambiguous data output from remote. (This
402  * logic should match what pg_dump does. See also set_transmission_modes
403  * in postgres_fdw.c.)
404  */
405  do_sql_command(conn, "SET datestyle = ISO");
406  if (remoteversion >= 80400)
407  do_sql_command(conn, "SET intervalstyle = postgres");
408  if (remoteversion >= 90000)
409  do_sql_command(conn, "SET extra_float_digits = 3");
410  else
411  do_sql_command(conn, "SET extra_float_digits = 2");
412 }
413 
414 /*
415  * Convenience subroutine to issue a non-data-returning SQL command to remote
416  */
417 static void
418 do_sql_command(PGconn *conn, const char *sql)
419 {
420  PGresult *res;
421 
422  if (!PQsendQuery(conn, sql))
423  pgfdw_report_error(ERROR, NULL, conn, false, sql);
424  res = pgfdw_get_result(conn, sql);
425  if (PQresultStatus(res) != PGRES_COMMAND_OK)
426  pgfdw_report_error(ERROR, res, conn, true, sql);
427  PQclear(res);
428 }
429 
430 /*
431  * Start remote transaction or subtransaction, if needed.
432  *
433  * Note that we always use at least REPEATABLE READ in the remote session.
434  * This is so that, if a query initiates multiple scans of the same or
435  * different foreign tables, we will get snapshot-consistent results from
436  * those scans. A disadvantage is that we can't provide sane emulation of
437  * READ COMMITTED behavior --- it would be nice if we had some other way to
438  * control which remote queries share a snapshot.
439  */
440 static void
442 {
443  int curlevel = GetCurrentTransactionNestLevel();
444 
445  /* Start main transaction if we haven't yet */
446  if (entry->xact_depth <= 0)
447  {
448  const char *sql;
449 
450  elog(DEBUG3, "starting remote transaction on connection %p",
451  entry->conn);
452 
454  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
455  else
456  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
457  entry->changing_xact_state = true;
458  do_sql_command(entry->conn, sql);
459  entry->xact_depth = 1;
460  entry->changing_xact_state = false;
461  }
462 
463  /*
464  * If we're in a subtransaction, stack up savepoints to match our level.
465  * This ensures we can rollback just the desired effects when a
466  * subtransaction aborts.
467  */
468  while (entry->xact_depth < curlevel)
469  {
470  char sql[64];
471 
472  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
473  entry->changing_xact_state = true;
474  do_sql_command(entry->conn, sql);
475  entry->xact_depth++;
476  entry->changing_xact_state = false;
477  }
478 }
479 
480 /*
481  * Release connection reference count created by calling GetConnection.
482  */
483 void
485 {
486  /*
487  * Currently, we don't actually track connection references because all
488  * cleanup is managed on a transaction or subtransaction basis instead. So
489  * there's nothing to do here.
490  */
491 }
492 
493 /*
494  * Assign a "unique" number for a cursor.
495  *
496  * These really only need to be unique per connection within a transaction.
497  * For the moment we ignore the per-connection point and assign them across
498  * all connections in the transaction, but we ask for the connection to be
499  * supplied in case we want to refine that.
500  *
501  * Note that even if wraparound happens in a very long transaction, actual
502  * collisions are highly improbable; just be sure to use %u not %d to print.
503  */
504 unsigned int
506 {
507  return ++cursor_number;
508 }
509 
510 /*
511  * Assign a "unique" number for a prepared statement.
512  *
513  * This works much like GetCursorNumber, except that we never reset the counter
514  * within a session. That's because we can't be 100% sure we've gotten rid
515  * of all prepared statements on all connections, and it's not really worth
516  * increasing the risk of prepared-statement name collisions by resetting.
517  */
518 unsigned int
520 {
521  return ++prep_stmt_number;
522 }
523 
524 /*
525  * Submit a query and wait for the result.
526  *
527  * This function is interruptible by signals.
528  *
529  * Caller is responsible for the error handling on the result.
530  */
531 PGresult *
532 pgfdw_exec_query(PGconn *conn, const char *query)
533 {
534  /*
535  * Submit a query. Since we don't use non-blocking mode, this also can
536  * block. But its risk is relatively small, so we ignore that for now.
537  */
538  if (!PQsendQuery(conn, query))
539  pgfdw_report_error(ERROR, NULL, conn, false, query);
540 
541  /* Wait for the result. */
542  return pgfdw_get_result(conn, query);
543 }
544 
545 /*
546  * Wait for the result from a prior asynchronous execution function call.
547  *
548  * This function offers quick responsiveness by checking for any interruptions.
549  *
550  * This function emulates PQexec()'s behavior of returning the last result
551  * when there are many.
552  *
553  * Caller is responsible for the error handling on the result.
554  */
555 PGresult *
556 pgfdw_get_result(PGconn *conn, const char *query)
557 {
558  PGresult *volatile last_res = NULL;
559 
560  /* In what follows, do not leak any PGresults on an error. */
561  PG_TRY();
562  {
563  for (;;)
564  {
565  PGresult *res;
566 
567  while (PQisBusy(conn))
568  {
569  int wc;
570 
571  /* Sleep until there's something to do */
575  PQsocket(conn),
576  -1L, PG_WAIT_EXTENSION);
578 
580 
581  /* Data available in socket? */
582  if (wc & WL_SOCKET_READABLE)
583  {
584  if (!PQconsumeInput(conn))
585  pgfdw_report_error(ERROR, NULL, conn, false, query);
586  }
587  }
588 
589  res = PQgetResult(conn);
590  if (res == NULL)
591  break; /* query is complete */
592 
593  PQclear(last_res);
594  last_res = res;
595  }
596  }
597  PG_CATCH();
598  {
599  PQclear(last_res);
600  PG_RE_THROW();
601  }
602  PG_END_TRY();
603 
604  return last_res;
605 }
606 
607 /*
608  * Report an error we got from the remote server.
609  *
610  * elevel: error level to use (typically ERROR, but might be less)
611  * res: PGresult containing the error
612  * conn: connection we did the query on
613  * clear: if true, PQclear the result (otherwise caller will handle it)
614  * sql: NULL, or text of remote command we tried to execute
615  *
616  * Note: callers that choose not to throw ERROR for a remote error are
617  * responsible for making sure that the associated ConnCacheEntry gets
618  * marked with have_error = true.
619  */
620 void
622  bool clear, const char *sql)
623 {
624  /* If requested, PGresult must be released before leaving this function. */
625  PG_TRY();
626  {
627  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
628  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
629  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
630  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
631  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
632  int sqlstate;
633 
634  if (diag_sqlstate)
635  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
636  diag_sqlstate[1],
637  diag_sqlstate[2],
638  diag_sqlstate[3],
639  diag_sqlstate[4]);
640  else
641  sqlstate = ERRCODE_CONNECTION_FAILURE;
642 
643  /*
644  * If we don't get a message from the PGresult, try the PGconn. This
645  * is needed because for connection-level failures, PQexec may just
646  * return NULL, not a PGresult at all.
647  */
648  if (message_primary == NULL)
649  message_primary = pchomp(PQerrorMessage(conn));
650 
651  ereport(elevel,
652  (errcode(sqlstate),
653  message_primary ? errmsg_internal("%s", message_primary) :
654  errmsg("could not obtain message string for remote error"),
655  message_detail ? errdetail_internal("%s", message_detail) : 0,
656  message_hint ? errhint("%s", message_hint) : 0,
657  message_context ? errcontext("%s", message_context) : 0,
658  sql ? errcontext("remote SQL command: %s", sql) : 0));
659  }
660  PG_FINALLY();
661  {
662  if (clear)
663  PQclear(res);
664  }
665  PG_END_TRY();
666 }
667 
668 /*
669  * pgfdw_xact_callback --- cleanup at main-transaction end.
670  */
671 static void
673 {
674  HASH_SEQ_STATUS scan;
675  ConnCacheEntry *entry;
676 
677  /* Quick exit if no connections were touched in this transaction. */
678  if (!xact_got_connection)
679  return;
680 
681  /*
682  * Scan all connection cache entries to find open remote transactions, and
683  * close them.
684  */
685  hash_seq_init(&scan, ConnectionHash);
686  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
687  {
688  PGresult *res;
689 
690  /* Ignore cache entry if no open connection right now */
691  if (entry->conn == NULL)
692  continue;
693 
694  /* If it has an open remote transaction, try to close it */
695  if (entry->xact_depth > 0)
696  {
697  bool abort_cleanup_failure = false;
698 
699  elog(DEBUG3, "closing remote transaction on connection %p",
700  entry->conn);
701 
702  switch (event)
703  {
706 
707  /*
708  * If abort cleanup previously failed for this connection,
709  * we can't issue any more commands against it.
710  */
712 
713  /* Commit all remote transactions during pre-commit */
714  entry->changing_xact_state = true;
715  do_sql_command(entry->conn, "COMMIT TRANSACTION");
716  entry->changing_xact_state = false;
717 
718  /*
719  * If there were any errors in subtransactions, and we
720  * made prepared statements, do a DEALLOCATE ALL to make
721  * sure we get rid of all prepared statements. This is
722  * annoying and not terribly bulletproof, but it's
723  * probably not worth trying harder.
724  *
725  * DEALLOCATE ALL only exists in 8.3 and later, so this
726  * constrains how old a server postgres_fdw can
727  * communicate with. We intentionally ignore errors in
728  * the DEALLOCATE, so that we can hobble along to some
729  * extent with older servers (leaking prepared statements
730  * as we go; but we don't really support update operations
731  * pre-8.3 anyway).
732  */
733  if (entry->have_prep_stmt && entry->have_error)
734  {
735  res = PQexec(entry->conn, "DEALLOCATE ALL");
736  PQclear(res);
737  }
738  entry->have_prep_stmt = false;
739  entry->have_error = false;
740  break;
742 
743  /*
744  * We disallow any remote transactions, since it's not
745  * very reasonable to hold them open until the prepared
746  * transaction is committed. For the moment, throw error
747  * unconditionally; later we might allow read-only cases.
748  * Note that the error will cause us to come right back
749  * here with event == XACT_EVENT_ABORT, so we'll clean up
750  * the connection state at that point.
751  */
752  ereport(ERROR,
753  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
754  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
755  break;
757  case XACT_EVENT_COMMIT:
758  case XACT_EVENT_PREPARE:
759  /* Pre-commit should have closed the open transaction */
760  elog(ERROR, "missed cleaning up connection during pre-commit");
761  break;
763  case XACT_EVENT_ABORT:
764 
765  /*
766  * Don't try to clean up the connection if we're already
767  * in error recursion trouble.
768  */
770  entry->changing_xact_state = true;
771 
772  /*
773  * If connection is already unsalvageable, don't touch it
774  * further.
775  */
776  if (entry->changing_xact_state)
777  break;
778 
779  /*
780  * Mark this connection as in the process of changing
781  * transaction state.
782  */
783  entry->changing_xact_state = true;
784 
785  /* Assume we might have lost track of prepared statements */
786  entry->have_error = true;
787 
788  /*
789  * If a command has been submitted to the remote server by
790  * using an asynchronous execution function, the command
791  * might not have yet completed. Check to see if a
792  * command is still being processed by the remote server,
793  * and if so, request cancellation of the command.
794  */
795  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
796  !pgfdw_cancel_query(entry->conn))
797  {
798  /* Unable to cancel running query. */
799  abort_cleanup_failure = true;
800  }
801  else if (!pgfdw_exec_cleanup_query(entry->conn,
802  "ABORT TRANSACTION",
803  false))
804  {
805  /* Unable to abort remote transaction. */
806  abort_cleanup_failure = true;
807  }
808  else if (entry->have_prep_stmt && entry->have_error &&
810  "DEALLOCATE ALL",
811  true))
812  {
813  /* Trouble clearing prepared statements. */
814  abort_cleanup_failure = true;
815  }
816  else
817  {
818  entry->have_prep_stmt = false;
819  entry->have_error = false;
820  }
821 
822  /* Disarm changing_xact_state if it all worked. */
823  entry->changing_xact_state = abort_cleanup_failure;
824  break;
825  }
826  }
827 
828  /* Reset state to show we're out of a transaction */
829  entry->xact_depth = 0;
830 
831  /*
832  * If the connection isn't in a good idle state, discard it to
833  * recover. Next GetConnection will open a new connection.
834  */
835  if (PQstatus(entry->conn) != CONNECTION_OK ||
837  entry->changing_xact_state)
838  {
839  elog(DEBUG3, "discarding connection %p", entry->conn);
840  disconnect_pg_server(entry);
841  }
842  }
843 
844  /*
845  * Regardless of the event type, we can now mark ourselves as out of the
846  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
847  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
848  */
849  xact_got_connection = false;
850 
851  /* Also reset cursor numbering for next transaction */
852  cursor_number = 0;
853 }
854 
855 /*
856  * pgfdw_subxact_callback --- cleanup at subtransaction end.
857  */
858 static void
860  SubTransactionId parentSubid, void *arg)
861 {
862  HASH_SEQ_STATUS scan;
863  ConnCacheEntry *entry;
864  int curlevel;
865 
866  /* Nothing to do at subxact start, nor after commit. */
867  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
868  event == SUBXACT_EVENT_ABORT_SUB))
869  return;
870 
871  /* Quick exit if no connections were touched in this transaction. */
872  if (!xact_got_connection)
873  return;
874 
875  /*
876  * Scan all connection cache entries to find open remote subtransactions
877  * of the current level, and close them.
878  */
879  curlevel = GetCurrentTransactionNestLevel();
880  hash_seq_init(&scan, ConnectionHash);
881  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
882  {
883  char sql[100];
884 
885  /*
886  * We only care about connections with open remote subtransactions of
887  * the current level.
888  */
889  if (entry->conn == NULL || entry->xact_depth < curlevel)
890  continue;
891 
892  if (entry->xact_depth > curlevel)
893  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
894  entry->xact_depth);
895 
896  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
897  {
898  /*
899  * If abort cleanup previously failed for this connection, we
900  * can't issue any more commands against it.
901  */
903 
904  /* Commit all remote subtransactions during pre-commit */
905  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
906  entry->changing_xact_state = true;
907  do_sql_command(entry->conn, sql);
908  entry->changing_xact_state = false;
909  }
910  else if (in_error_recursion_trouble())
911  {
912  /*
913  * Don't try to clean up the connection if we're already in error
914  * recursion trouble.
915  */
916  entry->changing_xact_state = true;
917  }
918  else if (!entry->changing_xact_state)
919  {
920  bool abort_cleanup_failure = false;
921 
922  /* Remember that abort cleanup is in progress. */
923  entry->changing_xact_state = true;
924 
925  /* Assume we might have lost track of prepared statements */
926  entry->have_error = true;
927 
928  /*
929  * If a command has been submitted to the remote server by using
930  * an asynchronous execution function, the command might not have
931  * yet completed. Check to see if a command is still being
932  * processed by the remote server, and if so, request cancellation
933  * of the command.
934  */
935  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
936  !pgfdw_cancel_query(entry->conn))
937  abort_cleanup_failure = true;
938  else
939  {
940  /* Rollback all remote subtransactions during abort */
941  snprintf(sql, sizeof(sql),
942  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
943  curlevel, curlevel);
944  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
945  abort_cleanup_failure = true;
946  }
947 
948  /* Disarm changing_xact_state if it all worked. */
949  entry->changing_xact_state = abort_cleanup_failure;
950  }
951 
952  /* OK, we're outta that level of subtransaction */
953  entry->xact_depth--;
954  }
955 }
956 
957 /*
958  * Connection invalidation callback function
959  *
960  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
961  * mark connections depending on that entry as needing to be remade.
962  * We can't immediately destroy them, since they might be in the midst of
963  * a transaction, but we'll remake them at the next opportunity.
964  *
965  * Although most cache invalidation callbacks blow away all the related stuff
966  * regardless of the given hashvalue, connections are expensive enough that
967  * it's worth trying to avoid that.
968  *
969  * NB: We could avoid unnecessary disconnection more strictly by examining
970  * individual option values, but it seems too much effort for the gain.
971  */
972 static void
973 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
974 {
975  HASH_SEQ_STATUS scan;
976  ConnCacheEntry *entry;
977 
978  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
979 
980  /* ConnectionHash must exist already, if we're registered */
981  hash_seq_init(&scan, ConnectionHash);
982  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
983  {
984  /* Ignore invalid entries */
985  if (entry->conn == NULL)
986  continue;
987 
988  /* hashvalue == 0 means a cache reset, must clear all state */
989  if (hashvalue == 0 ||
990  (cacheid == FOREIGNSERVEROID &&
991  entry->server_hashvalue == hashvalue) ||
992  (cacheid == USERMAPPINGOID &&
993  entry->mapping_hashvalue == hashvalue))
994  entry->invalidated = true;
995  }
996 }
997 
998 /*
999  * Raise an error if the given connection cache entry is marked as being
1000  * in the middle of an xact state change. This should be called at which no
1001  * such change is expected to be in progress; if one is found to be in
1002  * progress, it means that we aborted in the middle of a previous state change
1003  * and now don't know what the remote transaction state actually is.
1004  * Such connections can't safely be further used. Re-establishing the
1005  * connection would change the snapshot and roll back any writes already
1006  * performed, so that's not an option, either. Thus, we must abort.
1007  */
1008 static void
1010 {
1011  HeapTuple tup;
1012  Form_pg_user_mapping umform;
1013  ForeignServer *server;
1014 
1015  /* nothing to do for inactive entries and entries of sane state */
1016  if (entry->conn == NULL || !entry->changing_xact_state)
1017  return;
1018 
1019  /* make sure this entry is inactive */
1020  disconnect_pg_server(entry);
1021 
1022  /* find server name to be shown in the message below */
1024  ObjectIdGetDatum(entry->key));
1025  if (!HeapTupleIsValid(tup))
1026  elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
1027  umform = (Form_pg_user_mapping) GETSTRUCT(tup);
1028  server = GetForeignServer(umform->umserver);
1029  ReleaseSysCache(tup);
1030 
1031  ereport(ERROR,
1032  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1033  errmsg("connection to server \"%s\" was lost",
1034  server->servername)));
1035 }
1036 
1037 /*
1038  * Cancel the currently-in-progress query (whose query text we do not have)
1039  * and ignore the result. Returns true if we successfully cancel the query
1040  * and discard any pending result, and false if not.
1041  */
1042 static bool
1044 {
1045  PGcancel *cancel;
1046  char errbuf[256];
1047  PGresult *result = NULL;
1048  TimestampTz endtime;
1049 
1050  /*
1051  * If it takes too long to cancel the query and discard the result, assume
1052  * the connection is dead.
1053  */
1055 
1056  /*
1057  * Issue cancel request. Unfortunately, there's no good way to limit the
1058  * amount of time that we might block inside PQgetCancel().
1059  */
1060  if ((cancel = PQgetCancel(conn)))
1061  {
1062  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1063  {
1064  ereport(WARNING,
1065  (errcode(ERRCODE_CONNECTION_FAILURE),
1066  errmsg("could not send cancel request: %s",
1067  errbuf)));
1068  PQfreeCancel(cancel);
1069  return false;
1070  }
1071  PQfreeCancel(cancel);
1072  }
1073 
1074  /* Get and discard the result of the query. */
1075  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1076  return false;
1077  PQclear(result);
1078 
1079  return true;
1080 }
1081 
1082 /*
1083  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1084  * result. If the query is executed without error, the return value is true.
1085  * If the query is executed successfully but returns an error, the return
1086  * value is true if and only if ignore_errors is set. If the query can't be
1087  * sent or times out, the return value is false.
1088  */
1089 static bool
1090 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1091 {
1092  PGresult *result = NULL;
1093  TimestampTz endtime;
1094 
1095  /*
1096  * If it takes too long to execute a cleanup query, assume the connection
1097  * is dead. It's fairly likely that this is why we aborted in the first
1098  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1099  * be too long.
1100  */
1102 
1103  /*
1104  * Submit a query. Since we don't use non-blocking mode, this also can
1105  * block. But its risk is relatively small, so we ignore that for now.
1106  */
1107  if (!PQsendQuery(conn, query))
1108  {
1109  pgfdw_report_error(WARNING, NULL, conn, false, query);
1110  return false;
1111  }
1112 
1113  /* Get the result of the query. */
1114  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1115  return false;
1116 
1117  /* Issue a warning if not successful. */
1118  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1119  {
1120  pgfdw_report_error(WARNING, result, conn, true, query);
1121  return ignore_errors;
1122  }
1123  PQclear(result);
1124 
1125  return true;
1126 }
1127 
1128 /*
1129  * Get, during abort cleanup, the result of a query that is in progress. This
1130  * might be a query that is being interrupted by transaction abort, or it might
1131  * be a query that was initiated as part of transaction abort to get the remote
1132  * side back to the appropriate state.
1133  *
1134  * It's not a huge problem if we throw an ERROR here, but if we get into error
1135  * recursion trouble, we'll end up slamming the connection shut, which will
1136  * necessitate failing the entire toplevel transaction even if subtransactions
1137  * were used. Try to use WARNING where we can.
1138  *
1139  * endtime is the time at which we should give up and assume the remote
1140  * side is dead. Returns true if the timeout expired, otherwise false.
1141  * Sets *result except in case of a timeout.
1142  */
1143 static bool
1145 {
1146  volatile bool timed_out = false;
1147  PGresult *volatile last_res = NULL;
1148 
1149  /* In what follows, do not leak any PGresults on an error. */
1150  PG_TRY();
1151  {
1152  for (;;)
1153  {
1154  PGresult *res;
1155 
1156  while (PQisBusy(conn))
1157  {
1158  int wc;
1160  long secs;
1161  int microsecs;
1162  long cur_timeout;
1163 
1164  /* If timeout has expired, give up, else get sleep time. */
1165  if (now >= endtime)
1166  {
1167  timed_out = true;
1168  goto exit;
1169  }
1170  TimestampDifference(now, endtime, &secs, &microsecs);
1171 
1172  /* To protect against clock skew, limit sleep to one minute. */
1173  cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
1174 
1175  /* Sleep until there's something to do */
1179  PQsocket(conn),
1180  cur_timeout, PG_WAIT_EXTENSION);
1182 
1184 
1185  /* Data available in socket? */
1186  if (wc & WL_SOCKET_READABLE)
1187  {
1188  if (!PQconsumeInput(conn))
1189  {
1190  /* connection trouble; treat the same as a timeout */
1191  timed_out = true;
1192  goto exit;
1193  }
1194  }
1195  }
1196 
1197  res = PQgetResult(conn);
1198  if (res == NULL)
1199  break; /* query is complete */
1200 
1201  PQclear(last_res);
1202  last_res = res;
1203  }
1204 exit: ;
1205  }
1206  PG_CATCH();
1207  {
1208  PQclear(last_res);
1209  PG_RE_THROW();
1210  }
1211  PG_END_TRY();
1212 
1213  if (timed_out)
1214  PQclear(last_res);
1215  else
1216  *result = last_res;
1217  return timed_out;
1218 }
Oid umid
Definition: foreign.h:47
XactEvent
Definition: xact.h:108
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6631
static void configure_remote_session(PGconn *conn)
Definition: connection.c:382
int errhint(const char *fmt,...)
Definition: elog.c:1069
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
#define WL_TIMEOUT
Definition: latch.h:127
MemoryContext hcxt
Definition: hsearch.h:78
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1144
#define DEBUG3
Definition: elog.h:23
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
#define USECS_PER_SEC
Definition: timestamp.h:94
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
struct ConnCacheEntry ConnCacheEntry
int64 TimestampTz
Definition: timestamp.h:39
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:62
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4277
ConnCacheKey key
Definition: connection.c:49
#define Min(x, y)
Definition: c.h:911
Size entrysize
Definition: hsearch.h:73
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:345
int errcode(int sqlerrcode)
Definition: elog.c:608
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4133
#define MemSet(start, val, len)
Definition: c.h:962
#define WL_SOCKET_READABLE
Definition: latch.h:125
bool have_prep_stmt
Definition: connection.c:54
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:201
uint32 server_hashvalue
Definition: connection.c:58
uint32 SubTransactionId
Definition: c.h:518
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6621
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
Oid userid
Definition: foreign.h:48
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
void ResetLatch(Latch *latch)
Definition: latch.c:519
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:630
int errdetail_internal(const char *fmt,...)
Definition: elog.c:982
void ReleaseConnection(PGconn *conn)
Definition: connection.c:484
char * pchomp(const char *in)
Definition: mcxt.c:1214
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1234
Definition: dynahash.c:208
bool defGetBoolean(DefElem *def)
Definition: define.c:111
void pfree(void *pointer)
Definition: mcxt.c:1056
static unsigned int prep_stmt_number
Definition: connection.c:69
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:333
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
bool changing_xact_state
Definition: connection.c:56
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:418
FormData_pg_user_mapping * Form_pg_user_mapping
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4254
int errdetail(const char *fmt,...)
Definition: elog.c:955
List * options
Definition: foreign.h:50
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
unsigned int uint32
Definition: c.h:359
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:621
static unsigned int cursor_number
Definition: connection.c:68
#define ereport(elevel, rest)
Definition: elog.h:141
bool invalidated
Definition: connection.c:57
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6586
#define WARNING
Definition: elog.h:40
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1090
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:973
static int elevel
Definition: vacuumlazy.c:294
#define HASH_BLOBS
Definition: hsearch.h:88
SubXactEvent
Definition: xact.h:122
#define PG_FINALLY()
Definition: elog.h:339
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:672
static HTAB * ConnectionHash
Definition: connection.c:65
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
uintptr_t Datum
Definition: postgres.h:367
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3534
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1043
#define PG_WAIT_EXTENSION
Definition: pgstat.h:759
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
Size keysize
Definition: hsearch.h:72
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:323
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
void PQclear(PGresult *res)
Definition: fe-exec.c:694
uint32 mapping_hashvalue
Definition: connection.c:59
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
bool in_error_recursion_trouble(void)
Definition: elog.c:197
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:106
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
#define PG_CATCH()
Definition: elog.h:332
PGconn * conn
Definition: connection.c:50
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:556
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2754
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:309
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1052
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3479
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1009
static bool xact_got_connection
Definition: connection.c:72
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:505
static int list_length(const List *l)
Definition: pg_list.h:169
Oid serverid
Definition: foreign.h:49
#define PG_RE_THROW()
Definition: elog.h:363
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6680
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4409
static char * user
Definition: pg_regress.c:94
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define IsolationIsSerializable()
Definition: xact.h:52
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:228
int i
#define errcontext
Definition: elog.h:183
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1939
void * arg
struct Latch * MyLatch
Definition: globals.c:54
char * defname
Definition: parsenodes.h:730
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:519
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:859
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6578
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1657
#define PG_TRY()
Definition: elog.h:322
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:441
Oid ConnCacheKey
Definition: connection.c:45
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:532
#define snprintf
Definition: port.h:192
List * options
Definition: foreign.h:42
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6649
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
Oid serverid
Definition: foreign.h:36
#define PG_END_TRY()
Definition: elog.h:347
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:221