PostgreSQL Source Code  git master
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
18 #include "commands/defrem.h"
19 #include "funcapi.h"
20 #include "mb/pg_wchar.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postgres_fdw.h"
24 #include "storage/fd.h"
25 #include "storage/latch.h"
26 #include "utils/builtins.h"
27 #include "utils/datetime.h"
28 #include "utils/hsearch.h"
29 #include "utils/inval.h"
30 #include "utils/memutils.h"
31 #include "utils/syscache.h"
32 
33 /*
34  * Connection cache hash table entry
35  *
36  * The lookup key in this hash table is the user mapping OID. We use just one
37  * connection per user mapping ID, which ensures that all the scans use the
38  * same snapshot during a query. Using the user mapping OID rather than
39  * the foreign server OID + user OID avoids creating multiple connections when
40  * the public user mapping applies to all user OIDs.
41  *
42  * The "conn" pointer can be NULL if we don't currently have a live connection.
43  * When we do have a connection, xact_depth tracks the current depth of
44  * transactions and subtransactions open on the remote side. We need to issue
45  * commands at the same nesting depth on the remote as we're executing at
46  * ourselves, so that rolling back a subtransaction will kill the right
47  * queries and not the wrong ones.
48  */
49 typedef Oid ConnCacheKey;
50 
51 typedef struct ConnCacheEntry
52 {
53  ConnCacheKey key; /* hash key (must be first) */
54  PGconn *conn; /* connection to foreign server, or NULL */
55  /* Remaining fields are invalid when conn is NULL: */
56  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
57  * one level of subxact open, etc */
58  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
59  bool have_error; /* have any subxacts aborted in this xact? */
60  bool changing_xact_state; /* xact state change in process */
61  bool invalidated; /* true if reconnect is pending */
62  bool keep_connections; /* setting value of keep_connections
63  * server option */
64  Oid serverid; /* foreign server OID used to get server name */
65  uint32 server_hashvalue; /* hash value of foreign server OID */
66  uint32 mapping_hashvalue; /* hash value of user mapping OID */
67  PgFdwConnState state; /* extra per-connection state */
69 
70 /*
71  * Connection cache (initialized on first use)
72  */
73 static HTAB *ConnectionHash = NULL;
74 
75 /* for assigning cursor numbers and prepared statement numbers */
76 static unsigned int cursor_number = 0;
77 static unsigned int prep_stmt_number = 0;
78 
79 /* tracks whether any work is needed in callback functions */
80 static bool xact_got_connection = false;
81 
82 /*
83  * SQL functions
84  */
88 
89 /* prototypes of private functions */
92 static void disconnect_pg_server(ConnCacheEntry *entry);
93 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
95 static void begin_remote_xact(ConnCacheEntry *entry);
96 static void pgfdw_xact_callback(XactEvent event, void *arg);
97 static void pgfdw_subxact_callback(SubXactEvent event,
98  SubTransactionId mySubid,
99  SubTransactionId parentSubid,
100  void *arg);
101 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
103 static bool pgfdw_cancel_query(PGconn *conn);
104 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
105  bool ignore_errors);
106 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
107  PGresult **result);
108 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql,
109  bool toplevel);
112 
113 /*
114  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
115  * server with the user's authorization. A new connection is established
116  * if we don't already have a suitable one, and a transaction is opened at
117  * the right subtransaction nesting depth if we didn't do that already.
118  *
119  * will_prep_stmt must be true if caller intends to create any prepared
120  * statements. Since those don't go away automatically at transaction end
121  * (not even on error), we need this flag to cue manual cleanup.
122  *
123  * If state is not NULL, *state receives the per-connection state associated
124  * with the PGconn.
125  */
126 PGconn *
128 {
129  bool found;
130  bool retry = false;
131  ConnCacheEntry *entry;
134 
135  /* First time through, initialize connection cache hashtable */
136  if (ConnectionHash == NULL)
137  {
138  HASHCTL ctl;
139 
140  ctl.keysize = sizeof(ConnCacheKey);
141  ctl.entrysize = sizeof(ConnCacheEntry);
142  ConnectionHash = hash_create("postgres_fdw connections", 8,
143  &ctl,
145 
146  /*
147  * Register some callback functions that manage connection cleanup.
148  * This should be done just once in each backend.
149  */
156  }
157 
158  /* Set flag that we did GetConnection during the current transaction */
159  xact_got_connection = true;
160 
161  /* Create hash key for the entry. Assume no pad bytes in key struct */
162  key = user->umid;
163 
164  /*
165  * Find or create cached entry for requested connection.
166  */
167  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
168  if (!found)
169  {
170  /*
171  * We need only clear "conn" here; remaining fields will be filled
172  * later when "conn" is set.
173  */
174  entry->conn = NULL;
175  }
176 
177  /* Reject further use of connections which failed abort cleanup. */
179 
180  /*
181  * If the connection needs to be remade due to invalidation, disconnect as
182  * soon as we're out of all transactions.
183  */
184  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
185  {
186  elog(DEBUG3, "closing connection %p for option changes to take effect",
187  entry->conn);
188  disconnect_pg_server(entry);
189  }
190 
191  /*
192  * If cache entry doesn't have a connection, we have to establish a new
193  * connection. (If connect_pg_server throws an error, the cache entry
194  * will remain in a valid empty state, ie conn == NULL.)
195  */
196  if (entry->conn == NULL)
197  make_new_connection(entry, user);
198 
199  /*
200  * We check the health of the cached connection here when using it. In
201  * cases where we're out of all transactions, if a broken connection is
202  * detected, we try to reestablish a new connection later.
203  */
204  PG_TRY();
205  {
206  /* Process a pending asynchronous request if any. */
207  if (entry->state.pendingAreq)
209  /* Start a new transaction or subtransaction if needed. */
210  begin_remote_xact(entry);
211  }
212  PG_CATCH();
213  {
215  ErrorData *errdata = CopyErrorData();
216 
217  /*
218  * Determine whether to try to reestablish the connection.
219  *
220  * After a broken connection is detected in libpq, any error other
221  * than connection failure (e.g., out-of-memory) can be thrown
222  * somewhere between return from libpq and the expected ereport() call
223  * in pgfdw_report_error(). In this case, since PQstatus() indicates
224  * CONNECTION_BAD, checking only PQstatus() causes the false detection
225  * of connection failure. To avoid this, we also verify that the
226  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
227  * checking only the sqlstate can cause another false detection
228  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
229  * for any libpq-originated error condition.
230  */
231  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
232  PQstatus(entry->conn) != CONNECTION_BAD ||
233  entry->xact_depth > 0)
234  {
235  MemoryContextSwitchTo(ecxt);
236  PG_RE_THROW();
237  }
238 
239  /* Clean up the error state */
240  FlushErrorState();
241  FreeErrorData(errdata);
242  errdata = NULL;
243 
244  retry = true;
245  }
246  PG_END_TRY();
247 
248  /*
249  * If a broken connection is detected, disconnect it, reestablish a new
250  * connection and retry a new remote transaction. If connection failure is
251  * reported again, we give up getting a connection.
252  */
253  if (retry)
254  {
255  Assert(entry->xact_depth == 0);
256 
257  ereport(DEBUG3,
258  (errmsg_internal("could not start remote transaction on connection %p",
259  entry->conn)),
260  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
261 
262  elog(DEBUG3, "closing connection %p to reestablish a new one",
263  entry->conn);
264  disconnect_pg_server(entry);
265 
266  if (entry->conn == NULL)
267  make_new_connection(entry, user);
268 
269  begin_remote_xact(entry);
270  }
271 
272  /* Remember if caller will prepare statements */
273  entry->have_prep_stmt |= will_prep_stmt;
274 
275  /* If caller needs access to the per-connection state, return it. */
276  if (state)
277  *state = &entry->state;
278 
279  return entry->conn;
280 }
281 
282 /*
283  * Reset all transient state fields in the cached connection entry and
284  * establish new connection to the remote server.
285  */
286 static void
288 {
289  ForeignServer *server = GetForeignServer(user->serverid);
290  ListCell *lc;
291 
292  Assert(entry->conn == NULL);
293 
294  /* Reset all transient state fields, to be sure all are clean */
295  entry->xact_depth = 0;
296  entry->have_prep_stmt = false;
297  entry->have_error = false;
298  entry->changing_xact_state = false;
299  entry->invalidated = false;
300  entry->serverid = server->serverid;
301  entry->server_hashvalue =
303  ObjectIdGetDatum(server->serverid));
304  entry->mapping_hashvalue =
306  ObjectIdGetDatum(user->umid));
307  memset(&entry->state, 0, sizeof(entry->state));
308 
309  /*
310  * Determine whether to keep the connection that we're about to make here
311  * open even after the transaction using it ends, so that the subsequent
312  * transactions can re-use it.
313  *
314  * It's enough to determine this only when making new connection because
315  * all the connections to the foreign server whose keep_connections option
316  * is changed will be closed and re-made later.
317  *
318  * By default, all the connections to any foreign servers are kept open.
319  */
320  entry->keep_connections = true;
321  foreach(lc, server->options)
322  {
323  DefElem *def = (DefElem *) lfirst(lc);
324 
325  if (strcmp(def->defname, "keep_connections") == 0)
326  entry->keep_connections = defGetBoolean(def);
327  }
328 
329  /* Now try to make the connection */
330  entry->conn = connect_pg_server(server, user);
331 
332  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
333  entry->conn, server->servername, user->umid, user->userid);
334 }
335 
336 /*
337  * Connect to remote server using specified server and user mapping properties.
338  */
339 static PGconn *
341 {
342  PGconn *volatile conn = NULL;
343 
344  /*
345  * Use PG_TRY block to ensure closing connection on error.
346  */
347  PG_TRY();
348  {
349  const char **keywords;
350  const char **values;
351  int n;
352 
353  /*
354  * Construct connection params from generic options of ForeignServer
355  * and UserMapping. (Some of them might not be libpq options, in
356  * which case we'll just waste a few array slots.) Add 4 extra slots
357  * for application_name, fallback_application_name, client_encoding,
358  * end marker.
359  */
360  n = list_length(server->options) + list_length(user->options) + 4;
361  keywords = (const char **) palloc(n * sizeof(char *));
362  values = (const char **) palloc(n * sizeof(char *));
363 
364  n = 0;
365  n += ExtractConnectionOptions(server->options,
366  keywords + n, values + n);
368  keywords + n, values + n);
369 
370  /*
371  * Use pgfdw_application_name as application_name if set.
372  *
373  * PQconnectdbParams() processes the parameter arrays from start to
374  * end. If any key word is repeated, the last value is used. Therefore
375  * note that pgfdw_application_name must be added to the arrays after
376  * options of ForeignServer are, so that it can override
377  * application_name set in ForeignServer.
378  */
380  {
381  keywords[n] = "application_name";
382  values[n] = pgfdw_application_name;
383  n++;
384  }
385 
386  /* Use "postgres_fdw" as fallback_application_name */
387  keywords[n] = "fallback_application_name";
388  values[n] = "postgres_fdw";
389  n++;
390 
391  /* Set client_encoding so that libpq can convert encoding properly. */
392  keywords[n] = "client_encoding";
393  values[n] = GetDatabaseEncodingName();
394  n++;
395 
396  keywords[n] = values[n] = NULL;
397 
398  /* verify the set of connection parameters */
399  check_conn_params(keywords, values, user);
400 
401  /*
402  * We must obey fd.c's limit on non-virtual file descriptors. Assume
403  * that a PGconn represents one long-lived FD. (Doing this here also
404  * ensures that VFDs are closed if needed to make room.)
405  */
406  if (!AcquireExternalFD())
407  {
408 #ifndef WIN32 /* can't write #if within ereport() macro */
409  ereport(ERROR,
410  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
411  errmsg("could not connect to server \"%s\"",
412  server->servername),
413  errdetail("There are too many open files on the local server."),
414  errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
415 #else
416  ereport(ERROR,
417  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
418  errmsg("could not connect to server \"%s\"",
419  server->servername),
420  errdetail("There are too many open files on the local server."),
421  errhint("Raise the server's max_files_per_process setting.")));
422 #endif
423  }
424 
425  /* OK to make connection */
426  conn = PQconnectdbParams(keywords, values, false);
427 
428  if (!conn)
429  ReleaseExternalFD(); /* because the PG_CATCH block won't */
430 
431  if (!conn || PQstatus(conn) != CONNECTION_OK)
432  ereport(ERROR,
433  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
434  errmsg("could not connect to server \"%s\"",
435  server->servername),
436  errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
437 
438  /*
439  * Check that non-superuser has used password to establish connection;
440  * otherwise, he's piggybacking on the postgres server's user
441  * identity. See also dblink_security_check() in contrib/dblink and
442  * check_conn_params.
443  */
444  if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
446  ereport(ERROR,
447  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
448  errmsg("password is required"),
449  errdetail("Non-superuser cannot connect if the server does not request a password."),
450  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
451 
452  /* Prepare new session for use */
454 
455  pfree(keywords);
456  pfree(values);
457  }
458  PG_CATCH();
459  {
460  /* Release PGconn data structure if we managed to create one */
461  if (conn)
462  {
463  PQfinish(conn);
465  }
466  PG_RE_THROW();
467  }
468  PG_END_TRY();
469 
470  return conn;
471 }
472 
473 /*
474  * Disconnect any open connection for a connection cache entry.
475  */
476 static void
478 {
479  if (entry->conn != NULL)
480  {
481  PQfinish(entry->conn);
482  entry->conn = NULL;
484  }
485 }
486 
487 /*
488  * Return true if the password_required is defined and false for this user
489  * mapping, otherwise false. The mapping has been pre-validated.
490  */
491 static bool
493 {
494  ListCell *cell;
495 
496  foreach(cell, user->options)
497  {
498  DefElem *def = (DefElem *) lfirst(cell);
499 
500  if (strcmp(def->defname, "password_required") == 0)
501  return defGetBoolean(def);
502  }
503 
504  return true;
505 }
506 
507 /*
508  * For non-superusers, insist that the connstr specify a password. This
509  * prevents a password from being picked up from .pgpass, a service file, the
510  * environment, etc. We don't want the postgres user's passwords,
511  * certificates, etc to be accessible to non-superusers. (See also
512  * dblink_connstr_check in contrib/dblink.)
513  */
514 static void
515 check_conn_params(const char **keywords, const char **values, UserMapping *user)
516 {
517  int i;
518 
519  /* no check required if superuser */
520  if (superuser_arg(user->userid))
521  return;
522 
523  /* ok if params contain a non-empty password */
524  for (i = 0; keywords[i] != NULL; i++)
525  {
526  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
527  return;
528  }
529 
530  /* ok if the superuser explicitly said so at user mapping creation time */
531  if (!UserMappingPasswordRequired(user))
532  return;
533 
534  ereport(ERROR,
535  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
536  errmsg("password is required"),
537  errdetail("Non-superusers must provide a password in the user mapping.")));
538 }
539 
540 /*
541  * Issue SET commands to make sure remote session is configured properly.
542  *
543  * We do this just once at connection, assuming nothing will change the
544  * values later. Since we'll never send volatile function calls to the
545  * remote, there shouldn't be any way to break this assumption from our end.
546  * It's possible to think of ways to break it at the remote end, eg making
547  * a foreign table point to a view that includes a set_config call ---
548  * but once you admit the possibility of a malicious view definition,
549  * there are any number of ways to break things.
550  */
551 static void
553 {
554  int remoteversion = PQserverVersion(conn);
555 
556  /* Force the search path to contain only pg_catalog (see deparse.c) */
557  do_sql_command(conn, "SET search_path = pg_catalog");
558 
559  /*
560  * Set remote timezone; this is basically just cosmetic, since all
561  * transmitted and returned timestamptzs should specify a zone explicitly
562  * anyway. However it makes the regression test outputs more predictable.
563  *
564  * We don't risk setting remote zone equal to ours, since the remote
565  * server might use a different timezone database. Instead, use UTC
566  * (quoted, because very old servers are picky about case).
567  */
568  do_sql_command(conn, "SET timezone = 'UTC'");
569 
570  /*
571  * Set values needed to ensure unambiguous data output from remote. (This
572  * logic should match what pg_dump does. See also set_transmission_modes
573  * in postgres_fdw.c.)
574  */
575  do_sql_command(conn, "SET datestyle = ISO");
576  if (remoteversion >= 80400)
577  do_sql_command(conn, "SET intervalstyle = postgres");
578  if (remoteversion >= 90000)
579  do_sql_command(conn, "SET extra_float_digits = 3");
580  else
581  do_sql_command(conn, "SET extra_float_digits = 2");
582 }
583 
584 /*
585  * Convenience subroutine to issue a non-data-returning SQL command to remote
586  */
587 void
588 do_sql_command(PGconn *conn, const char *sql)
589 {
590  PGresult *res;
591 
592  if (!PQsendQuery(conn, sql))
593  pgfdw_report_error(ERROR, NULL, conn, false, sql);
594  res = pgfdw_get_result(conn, sql);
595  if (PQresultStatus(res) != PGRES_COMMAND_OK)
596  pgfdw_report_error(ERROR, res, conn, true, sql);
597  PQclear(res);
598 }
599 
600 /*
601  * Start remote transaction or subtransaction, if needed.
602  *
603  * Note that we always use at least REPEATABLE READ in the remote session.
604  * This is so that, if a query initiates multiple scans of the same or
605  * different foreign tables, we will get snapshot-consistent results from
606  * those scans. A disadvantage is that we can't provide sane emulation of
607  * READ COMMITTED behavior --- it would be nice if we had some other way to
608  * control which remote queries share a snapshot.
609  */
610 static void
612 {
613  int curlevel = GetCurrentTransactionNestLevel();
614 
615  /* Start main transaction if we haven't yet */
616  if (entry->xact_depth <= 0)
617  {
618  const char *sql;
619 
620  elog(DEBUG3, "starting remote transaction on connection %p",
621  entry->conn);
622 
624  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
625  else
626  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
627  entry->changing_xact_state = true;
628  do_sql_command(entry->conn, sql);
629  entry->xact_depth = 1;
630  entry->changing_xact_state = false;
631  }
632 
633  /*
634  * If we're in a subtransaction, stack up savepoints to match our level.
635  * This ensures we can rollback just the desired effects when a
636  * subtransaction aborts.
637  */
638  while (entry->xact_depth < curlevel)
639  {
640  char sql[64];
641 
642  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
643  entry->changing_xact_state = true;
644  do_sql_command(entry->conn, sql);
645  entry->xact_depth++;
646  entry->changing_xact_state = false;
647  }
648 }
649 
650 /*
651  * Release connection reference count created by calling GetConnection.
652  */
653 void
655 {
656  /*
657  * Currently, we don't actually track connection references because all
658  * cleanup is managed on a transaction or subtransaction basis instead. So
659  * there's nothing to do here.
660  */
661 }
662 
663 /*
664  * Assign a "unique" number for a cursor.
665  *
666  * These really only need to be unique per connection within a transaction.
667  * For the moment we ignore the per-connection point and assign them across
668  * all connections in the transaction, but we ask for the connection to be
669  * supplied in case we want to refine that.
670  *
671  * Note that even if wraparound happens in a very long transaction, actual
672  * collisions are highly improbable; just be sure to use %u not %d to print.
673  */
674 unsigned int
676 {
677  return ++cursor_number;
678 }
679 
680 /*
681  * Assign a "unique" number for a prepared statement.
682  *
683  * This works much like GetCursorNumber, except that we never reset the counter
684  * within a session. That's because we can't be 100% sure we've gotten rid
685  * of all prepared statements on all connections, and it's not really worth
686  * increasing the risk of prepared-statement name collisions by resetting.
687  */
688 unsigned int
690 {
691  return ++prep_stmt_number;
692 }
693 
694 /*
695  * Submit a query and wait for the result.
696  *
697  * This function is interruptible by signals.
698  *
699  * Caller is responsible for the error handling on the result.
700  */
701 PGresult *
703 {
704  /* First, process a pending asynchronous request, if any. */
705  if (state && state->pendingAreq)
707 
708  /*
709  * Submit a query. Since we don't use non-blocking mode, this also can
710  * block. But its risk is relatively small, so we ignore that for now.
711  */
712  if (!PQsendQuery(conn, query))
713  pgfdw_report_error(ERROR, NULL, conn, false, query);
714 
715  /* Wait for the result. */
716  return pgfdw_get_result(conn, query);
717 }
718 
719 /*
720  * Wait for the result from a prior asynchronous execution function call.
721  *
722  * This function offers quick responsiveness by checking for any interruptions.
723  *
724  * This function emulates PQexec()'s behavior of returning the last result
725  * when there are many.
726  *
727  * Caller is responsible for the error handling on the result.
728  */
729 PGresult *
730 pgfdw_get_result(PGconn *conn, const char *query)
731 {
732  PGresult *volatile last_res = NULL;
733 
734  /* In what follows, do not leak any PGresults on an error. */
735  PG_TRY();
736  {
737  for (;;)
738  {
739  PGresult *res;
740 
741  while (PQisBusy(conn))
742  {
743  int wc;
744 
745  /* Sleep until there's something to do */
749  PQsocket(conn),
750  -1L, PG_WAIT_EXTENSION);
752 
754 
755  /* Data available in socket? */
756  if (wc & WL_SOCKET_READABLE)
757  {
758  if (!PQconsumeInput(conn))
759  pgfdw_report_error(ERROR, NULL, conn, false, query);
760  }
761  }
762 
763  res = PQgetResult(conn);
764  if (res == NULL)
765  break; /* query is complete */
766 
767  PQclear(last_res);
768  last_res = res;
769  }
770  }
771  PG_CATCH();
772  {
773  PQclear(last_res);
774  PG_RE_THROW();
775  }
776  PG_END_TRY();
777 
778  return last_res;
779 }
780 
781 /*
782  * Report an error we got from the remote server.
783  *
784  * elevel: error level to use (typically ERROR, but might be less)
785  * res: PGresult containing the error
786  * conn: connection we did the query on
787  * clear: if true, PQclear the result (otherwise caller will handle it)
788  * sql: NULL, or text of remote command we tried to execute
789  *
790  * Note: callers that choose not to throw ERROR for a remote error are
791  * responsible for making sure that the associated ConnCacheEntry gets
792  * marked with have_error = true.
793  */
794 void
796  bool clear, const char *sql)
797 {
798  /* If requested, PGresult must be released before leaving this function. */
799  PG_TRY();
800  {
801  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
802  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
803  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
804  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
805  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
806  int sqlstate;
807 
808  if (diag_sqlstate)
809  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
810  diag_sqlstate[1],
811  diag_sqlstate[2],
812  diag_sqlstate[3],
813  diag_sqlstate[4]);
814  else
815  sqlstate = ERRCODE_CONNECTION_FAILURE;
816 
817  /*
818  * If we don't get a message from the PGresult, try the PGconn. This
819  * is needed because for connection-level failures, PQexec may just
820  * return NULL, not a PGresult at all.
821  */
822  if (message_primary == NULL)
823  message_primary = pchomp(PQerrorMessage(conn));
824 
825  ereport(elevel,
826  (errcode(sqlstate),
827  message_primary ? errmsg_internal("%s", message_primary) :
828  errmsg("could not obtain message string for remote error"),
829  message_detail ? errdetail_internal("%s", message_detail) : 0,
830  message_hint ? errhint("%s", message_hint) : 0,
831  message_context ? errcontext("%s", message_context) : 0,
832  sql ? errcontext("remote SQL command: %s", sql) : 0));
833  }
834  PG_FINALLY();
835  {
836  if (clear)
837  PQclear(res);
838  }
839  PG_END_TRY();
840 }
841 
842 /*
843  * pgfdw_xact_callback --- cleanup at main-transaction end.
844  *
845  * This runs just late enough that it must not enter user-defined code
846  * locally. (Entering such code on the remote side is fine. Its remote
847  * COMMIT TRANSACTION may run deferred triggers.)
848  */
849 static void
851 {
852  HASH_SEQ_STATUS scan;
853  ConnCacheEntry *entry;
854 
855  /* Quick exit if no connections were touched in this transaction. */
856  if (!xact_got_connection)
857  return;
858 
859  /*
860  * Scan all connection cache entries to find open remote transactions, and
861  * close them.
862  */
863  hash_seq_init(&scan, ConnectionHash);
864  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
865  {
866  PGresult *res;
867 
868  /* Ignore cache entry if no open connection right now */
869  if (entry->conn == NULL)
870  continue;
871 
872  /* If it has an open remote transaction, try to close it */
873  if (entry->xact_depth > 0)
874  {
875  elog(DEBUG3, "closing remote transaction on connection %p",
876  entry->conn);
877 
878  switch (event)
879  {
882 
883  /*
884  * If abort cleanup previously failed for this connection,
885  * we can't issue any more commands against it.
886  */
888 
889  /* Commit all remote transactions during pre-commit */
890  entry->changing_xact_state = true;
891  do_sql_command(entry->conn, "COMMIT TRANSACTION");
892  entry->changing_xact_state = false;
893 
894  /*
895  * If there were any errors in subtransactions, and we
896  * made prepared statements, do a DEALLOCATE ALL to make
897  * sure we get rid of all prepared statements. This is
898  * annoying and not terribly bulletproof, but it's
899  * probably not worth trying harder.
900  *
901  * DEALLOCATE ALL only exists in 8.3 and later, so this
902  * constrains how old a server postgres_fdw can
903  * communicate with. We intentionally ignore errors in
904  * the DEALLOCATE, so that we can hobble along to some
905  * extent with older servers (leaking prepared statements
906  * as we go; but we don't really support update operations
907  * pre-8.3 anyway).
908  */
909  if (entry->have_prep_stmt && entry->have_error)
910  {
911  res = PQexec(entry->conn, "DEALLOCATE ALL");
912  PQclear(res);
913  }
914  entry->have_prep_stmt = false;
915  entry->have_error = false;
916  break;
918 
919  /*
920  * We disallow any remote transactions, since it's not
921  * very reasonable to hold them open until the prepared
922  * transaction is committed. For the moment, throw error
923  * unconditionally; later we might allow read-only cases.
924  * Note that the error will cause us to come right back
925  * here with event == XACT_EVENT_ABORT, so we'll clean up
926  * the connection state at that point.
927  */
928  ereport(ERROR,
929  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
930  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
931  break;
933  case XACT_EVENT_COMMIT:
934  case XACT_EVENT_PREPARE:
935  /* Pre-commit should have closed the open transaction */
936  elog(ERROR, "missed cleaning up connection during pre-commit");
937  break;
939  case XACT_EVENT_ABORT:
940 
941  pgfdw_abort_cleanup(entry, "ABORT TRANSACTION", true);
942  break;
943  }
944  }
945 
946  /* Reset state to show we're out of a transaction */
947  entry->xact_depth = 0;
948 
949  /*
950  * If the connection isn't in a good idle state, it is marked as
951  * invalid or keep_connections option of its server is disabled, then
952  * discard it to recover. Next GetConnection will open a new
953  * connection.
954  */
955  if (PQstatus(entry->conn) != CONNECTION_OK ||
957  entry->changing_xact_state ||
958  entry->invalidated ||
959  !entry->keep_connections)
960  {
961  elog(DEBUG3, "discarding connection %p", entry->conn);
962  disconnect_pg_server(entry);
963  }
964  }
965 
966  /*
967  * Regardless of the event type, we can now mark ourselves as out of the
968  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
969  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
970  */
971  xact_got_connection = false;
972 
973  /* Also reset cursor numbering for next transaction */
974  cursor_number = 0;
975 }
976 
977 /*
978  * pgfdw_subxact_callback --- cleanup at subtransaction end.
979  */
980 static void
982  SubTransactionId parentSubid, void *arg)
983 {
984  HASH_SEQ_STATUS scan;
985  ConnCacheEntry *entry;
986  int curlevel;
987 
988  /* Nothing to do at subxact start, nor after commit. */
989  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
990  event == SUBXACT_EVENT_ABORT_SUB))
991  return;
992 
993  /* Quick exit if no connections were touched in this transaction. */
994  if (!xact_got_connection)
995  return;
996 
997  /*
998  * Scan all connection cache entries to find open remote subtransactions
999  * of the current level, and close them.
1000  */
1001  curlevel = GetCurrentTransactionNestLevel();
1002  hash_seq_init(&scan, ConnectionHash);
1003  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1004  {
1005  char sql[100];
1006 
1007  /*
1008  * We only care about connections with open remote subtransactions of
1009  * the current level.
1010  */
1011  if (entry->conn == NULL || entry->xact_depth < curlevel)
1012  continue;
1013 
1014  if (entry->xact_depth > curlevel)
1015  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1016  entry->xact_depth);
1017 
1018  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1019  {
1020  /*
1021  * If abort cleanup previously failed for this connection, we
1022  * can't issue any more commands against it.
1023  */
1025 
1026  /* Commit all remote subtransactions during pre-commit */
1027  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1028  entry->changing_xact_state = true;
1029  do_sql_command(entry->conn, sql);
1030  entry->changing_xact_state = false;
1031  }
1032  else
1033  {
1034  /* Rollback all remote subtransactions during abort */
1035  snprintf(sql, sizeof(sql),
1036  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
1037  curlevel, curlevel);
1038  pgfdw_abort_cleanup(entry, sql, false);
1039  }
1040 
1041  /* OK, we're outta that level of subtransaction */
1042  entry->xact_depth--;
1043  }
1044 }
1045 
1046 /*
1047  * Connection invalidation callback function
1048  *
1049  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1050  * close connections depending on that entry immediately if current transaction
1051  * has not used those connections yet. Otherwise, mark those connections as
1052  * invalid and then make pgfdw_xact_callback() close them at the end of current
1053  * transaction, since they cannot be closed in the midst of the transaction
1054  * using them. Closed connections will be remade at the next opportunity if
1055  * necessary.
1056  *
1057  * Although most cache invalidation callbacks blow away all the related stuff
1058  * regardless of the given hashvalue, connections are expensive enough that
1059  * it's worth trying to avoid that.
1060  *
1061  * NB: We could avoid unnecessary disconnection more strictly by examining
1062  * individual option values, but it seems too much effort for the gain.
1063  */
1064 static void
1065 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1066 {
1067  HASH_SEQ_STATUS scan;
1068  ConnCacheEntry *entry;
1069 
1070  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1071 
1072  /* ConnectionHash must exist already, if we're registered */
1073  hash_seq_init(&scan, ConnectionHash);
1074  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1075  {
1076  /* Ignore invalid entries */
1077  if (entry->conn == NULL)
1078  continue;
1079 
1080  /* hashvalue == 0 means a cache reset, must clear all state */
1081  if (hashvalue == 0 ||
1082  (cacheid == FOREIGNSERVEROID &&
1083  entry->server_hashvalue == hashvalue) ||
1084  (cacheid == USERMAPPINGOID &&
1085  entry->mapping_hashvalue == hashvalue))
1086  {
1087  /*
1088  * Close the connection immediately if it's not used yet in this
1089  * transaction. Otherwise mark it as invalid so that
1090  * pgfdw_xact_callback() can close it at the end of this
1091  * transaction.
1092  */
1093  if (entry->xact_depth == 0)
1094  {
1095  elog(DEBUG3, "discarding connection %p", entry->conn);
1096  disconnect_pg_server(entry);
1097  }
1098  else
1099  entry->invalidated = true;
1100  }
1101  }
1102 }
1103 
1104 /*
1105  * Raise an error if the given connection cache entry is marked as being
1106  * in the middle of an xact state change. This should be called at which no
1107  * such change is expected to be in progress; if one is found to be in
1108  * progress, it means that we aborted in the middle of a previous state change
1109  * and now don't know what the remote transaction state actually is.
1110  * Such connections can't safely be further used. Re-establishing the
1111  * connection would change the snapshot and roll back any writes already
1112  * performed, so that's not an option, either. Thus, we must abort.
1113  */
1114 static void
1116 {
1117  ForeignServer *server;
1118 
1119  /* nothing to do for inactive entries and entries of sane state */
1120  if (entry->conn == NULL || !entry->changing_xact_state)
1121  return;
1122 
1123  /* make sure this entry is inactive */
1124  disconnect_pg_server(entry);
1125 
1126  /* find server name to be shown in the message below */
1127  server = GetForeignServer(entry->serverid);
1128 
1129  ereport(ERROR,
1130  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1131  errmsg("connection to server \"%s\" was lost",
1132  server->servername)));
1133 }
1134 
1135 /*
1136  * Cancel the currently-in-progress query (whose query text we do not have)
1137  * and ignore the result. Returns true if we successfully cancel the query
1138  * and discard any pending result, and false if not.
1139  *
1140  * It's not a huge problem if we throw an ERROR here, but if we get into error
1141  * recursion trouble, we'll end up slamming the connection shut, which will
1142  * necessitate failing the entire toplevel transaction even if subtransactions
1143  * were used. Try to use WARNING where we can.
1144  *
1145  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1146  * query text from the pendingAreq saved in the per-connection state, then
1147  * report the query using it.
1148  */
1149 static bool
1151 {
1152  PGcancel *cancel;
1153  char errbuf[256];
1154  PGresult *result = NULL;
1155  TimestampTz endtime;
1156 
1157  /*
1158  * If it takes too long to cancel the query and discard the result, assume
1159  * the connection is dead.
1160  */
1162 
1163  /*
1164  * Issue cancel request. Unfortunately, there's no good way to limit the
1165  * amount of time that we might block inside PQgetCancel().
1166  */
1167  if ((cancel = PQgetCancel(conn)))
1168  {
1169  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1170  {
1171  ereport(WARNING,
1172  (errcode(ERRCODE_CONNECTION_FAILURE),
1173  errmsg("could not send cancel request: %s",
1174  errbuf)));
1175  PQfreeCancel(cancel);
1176  return false;
1177  }
1178  PQfreeCancel(cancel);
1179  }
1180 
1181  /* Get and discard the result of the query. */
1182  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1183  return false;
1184  PQclear(result);
1185 
1186  return true;
1187 }
1188 
1189 /*
1190  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1191  * result. If the query is executed without error, the return value is true.
1192  * If the query is executed successfully but returns an error, the return
1193  * value is true if and only if ignore_errors is set. If the query can't be
1194  * sent or times out, the return value is false.
1195  *
1196  * It's not a huge problem if we throw an ERROR here, but if we get into error
1197  * recursion trouble, we'll end up slamming the connection shut, which will
1198  * necessitate failing the entire toplevel transaction even if subtransactions
1199  * were used. Try to use WARNING where we can.
1200  */
1201 static bool
1202 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1203 {
1204  PGresult *result = NULL;
1205  TimestampTz endtime;
1206 
1207  /*
1208  * If it takes too long to execute a cleanup query, assume the connection
1209  * is dead. It's fairly likely that this is why we aborted in the first
1210  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1211  * be too long.
1212  */
1214 
1215  /*
1216  * Submit a query. Since we don't use non-blocking mode, this also can
1217  * block. But its risk is relatively small, so we ignore that for now.
1218  */
1219  if (!PQsendQuery(conn, query))
1220  {
1221  pgfdw_report_error(WARNING, NULL, conn, false, query);
1222  return false;
1223  }
1224 
1225  /* Get the result of the query. */
1226  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1227  return false;
1228 
1229  /* Issue a warning if not successful. */
1230  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1231  {
1232  pgfdw_report_error(WARNING, result, conn, true, query);
1233  return ignore_errors;
1234  }
1235  PQclear(result);
1236 
1237  return true;
1238 }
1239 
1240 /*
1241  * Get, during abort cleanup, the result of a query that is in progress. This
1242  * might be a query that is being interrupted by transaction abort, or it might
1243  * be a query that was initiated as part of transaction abort to get the remote
1244  * side back to the appropriate state.
1245  *
1246  * endtime is the time at which we should give up and assume the remote
1247  * side is dead. Returns true if the timeout expired, otherwise false.
1248  * Sets *result except in case of a timeout.
1249  */
1250 static bool
1252 {
1253  volatile bool timed_out = false;
1254  PGresult *volatile last_res = NULL;
1255 
1256  /* In what follows, do not leak any PGresults on an error. */
1257  PG_TRY();
1258  {
1259  for (;;)
1260  {
1261  PGresult *res;
1262 
1263  while (PQisBusy(conn))
1264  {
1265  int wc;
1267  long cur_timeout;
1268 
1269  /* If timeout has expired, give up, else get sleep time. */
1270  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1271  if (cur_timeout <= 0)
1272  {
1273  timed_out = true;
1274  goto exit;
1275  }
1276 
1277  /* Sleep until there's something to do */
1281  PQsocket(conn),
1282  cur_timeout, PG_WAIT_EXTENSION);
1284 
1286 
1287  /* Data available in socket? */
1288  if (wc & WL_SOCKET_READABLE)
1289  {
1290  if (!PQconsumeInput(conn))
1291  {
1292  /* connection trouble; treat the same as a timeout */
1293  timed_out = true;
1294  goto exit;
1295  }
1296  }
1297  }
1298 
1299  res = PQgetResult(conn);
1300  if (res == NULL)
1301  break; /* query is complete */
1302 
1303  PQclear(last_res);
1304  last_res = res;
1305  }
1306 exit: ;
1307  }
1308  PG_CATCH();
1309  {
1310  PQclear(last_res);
1311  PG_RE_THROW();
1312  }
1313  PG_END_TRY();
1314 
1315  if (timed_out)
1316  PQclear(last_res);
1317  else
1318  *result = last_res;
1319  return timed_out;
1320 }
1321 
1322 /*
1323  * Abort remote transaction.
1324  *
1325  * The statement specified in "sql" is sent to the remote server,
1326  * in order to rollback the remote transaction.
1327  *
1328  * "toplevel" should be set to true if toplevel (main) transaction is
1329  * rollbacked, false otherwise.
1330  *
1331  * Set entry->changing_xact_state to false on success, true on failure.
1332  */
1333 static void
1334 pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel)
1335 {
1336  /*
1337  * Don't try to clean up the connection if we're already in error
1338  * recursion trouble.
1339  */
1341  entry->changing_xact_state = true;
1342 
1343  /*
1344  * If connection is already unsalvageable, don't touch it further.
1345  */
1346  if (entry->changing_xact_state)
1347  return;
1348 
1349  /*
1350  * Mark this connection as in the process of changing transaction state.
1351  */
1352  entry->changing_xact_state = true;
1353 
1354  /* Assume we might have lost track of prepared statements */
1355  entry->have_error = true;
1356 
1357  /*
1358  * If a command has been submitted to the remote server by using an
1359  * asynchronous execution function, the command might not have yet
1360  * completed. Check to see if a command is still being processed by the
1361  * remote server, and if so, request cancellation of the command.
1362  */
1363  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1364  !pgfdw_cancel_query(entry->conn))
1365  return; /* Unable to cancel running query */
1366 
1367  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1368  return; /* Unable to abort remote transaction */
1369 
1370  if (toplevel)
1371  {
1372  if (entry->have_prep_stmt && entry->have_error &&
1374  "DEALLOCATE ALL",
1375  true))
1376  return; /* Trouble clearing prepared statements */
1377 
1378  entry->have_prep_stmt = false;
1379  entry->have_error = false;
1380  /* Also reset per-connection state */
1381  memset(&entry->state, 0, sizeof(entry->state));
1382  }
1383 
1384  /* Disarm changing_xact_state if it all worked */
1385  entry->changing_xact_state = false;
1386 }
1387 
1388 /*
1389  * List active foreign server connections.
1390  *
1391  * This function takes no input parameter and returns setof record made of
1392  * following values:
1393  * - server_name - server name of active connection. In case the foreign server
1394  * is dropped but still the connection is active, then the server name will
1395  * be NULL in output.
1396  * - valid - true/false representing whether the connection is valid or not.
1397  * Note that the connections can get invalidated in pgfdw_inval_callback.
1398  *
1399  * No records are returned when there are no cached connections at all.
1400  */
1401 Datum
1403 {
1404 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1405  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1406  TupleDesc tupdesc;
1407  Tuplestorestate *tupstore;
1408  MemoryContext per_query_ctx;
1409  MemoryContext oldcontext;
1410  HASH_SEQ_STATUS scan;
1411  ConnCacheEntry *entry;
1412 
1413  /* check to see if caller supports us returning a tuplestore */
1414  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1415  ereport(ERROR,
1416  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1417  errmsg("set-valued function called in context that cannot accept a set")));
1418  if (!(rsinfo->allowedModes & SFRM_Materialize))
1419  ereport(ERROR,
1420  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1421  errmsg("materialize mode required, but it is not allowed in this context")));
1422 
1423  /* Build a tuple descriptor for our result type */
1424  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1425  elog(ERROR, "return type must be a row type");
1426 
1427  /* Build tuplestore to hold the result rows */
1428  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1429  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1430 
1431  tupstore = tuplestore_begin_heap(true, false, work_mem);
1432  rsinfo->returnMode = SFRM_Materialize;
1433  rsinfo->setResult = tupstore;
1434  rsinfo->setDesc = tupdesc;
1435 
1436  MemoryContextSwitchTo(oldcontext);
1437 
1438  /* If cache doesn't exist, we return no records */
1439  if (!ConnectionHash)
1440  {
1441  /* clean up and return the tuplestore */
1442  tuplestore_donestoring(tupstore);
1443 
1444  PG_RETURN_VOID();
1445  }
1446 
1447  hash_seq_init(&scan, ConnectionHash);
1448  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1449  {
1450  ForeignServer *server;
1453 
1454  /* We only look for open remote connections */
1455  if (!entry->conn)
1456  continue;
1457 
1458  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
1459 
1460  MemSet(values, 0, sizeof(values));
1461  MemSet(nulls, 0, sizeof(nulls));
1462 
1463  /*
1464  * The foreign server may have been dropped in current explicit
1465  * transaction. It is not possible to drop the server from another
1466  * session when the connection associated with it is in use in the
1467  * current transaction, if tried so, the drop query in another session
1468  * blocks until the current transaction finishes.
1469  *
1470  * Even though the server is dropped in the current transaction, the
1471  * cache can still have associated active connection entry, say we
1472  * call such connections dangling. Since we can not fetch the server
1473  * name from system catalogs for dangling connections, instead we show
1474  * NULL value for server name in output.
1475  *
1476  * We could have done better by storing the server name in the cache
1477  * entry instead of server oid so that it could be used in the output.
1478  * But the server name in each cache entry requires 64 bytes of
1479  * memory, which is huge, when there are many cached connections and
1480  * the use case i.e. dropping the foreign server within the explicit
1481  * current transaction seems rare. So, we chose to show NULL value for
1482  * server name in output.
1483  *
1484  * Such dangling connections get closed either in next use or at the
1485  * end of current explicit transaction in pgfdw_xact_callback.
1486  */
1487  if (!server)
1488  {
1489  /*
1490  * If the server has been dropped in the current explicit
1491  * transaction, then this entry would have been invalidated in
1492  * pgfdw_inval_callback at the end of drop server command. Note
1493  * that this connection would not have been closed in
1494  * pgfdw_inval_callback because it is still being used in the
1495  * current explicit transaction. So, assert that here.
1496  */
1497  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
1498 
1499  /* Show null, if no server name was found */
1500  nulls[0] = true;
1501  }
1502  else
1503  values[0] = CStringGetTextDatum(server->servername);
1504 
1505  values[1] = BoolGetDatum(!entry->invalidated);
1506 
1507  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1508  }
1509 
1510  /* clean up and return the tuplestore */
1511  tuplestore_donestoring(tupstore);
1512 
1513  PG_RETURN_VOID();
1514 }
1515 
1516 /*
1517  * Disconnect the specified cached connections.
1518  *
1519  * This function discards the open connections that are established by
1520  * postgres_fdw from the local session to the foreign server with
1521  * the given name. Note that there can be multiple connections to
1522  * the given server using different user mappings. If the connections
1523  * are used in the current local transaction, they are not disconnected
1524  * and warning messages are reported. This function returns true
1525  * if it disconnects at least one connection, otherwise false. If no
1526  * foreign server with the given name is found, an error is reported.
1527  */
1528 Datum
1530 {
1531  ForeignServer *server;
1532  char *servername;
1533 
1534  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
1535  server = GetForeignServerByName(servername, false);
1536 
1538 }
1539 
1540 /*
1541  * Disconnect all the cached connections.
1542  *
1543  * This function discards all the open connections that are established by
1544  * postgres_fdw from the local session to the foreign servers.
1545  * If the connections are used in the current local transaction, they are
1546  * not disconnected and warning messages are reported. This function
1547  * returns true if it disconnects at least one connection, otherwise false.
1548  */
1549 Datum
1551 {
1553 }
1554 
1555 /*
1556  * Workhorse to disconnect cached connections.
1557  *
1558  * This function scans all the connection cache entries and disconnects
1559  * the open connections whose foreign server OID matches with
1560  * the specified one. If InvalidOid is specified, it disconnects all
1561  * the cached connections.
1562  *
1563  * This function emits a warning for each connection that's used in
1564  * the current transaction and doesn't close it. It returns true if
1565  * it disconnects at least one connection, otherwise false.
1566  *
1567  * Note that this function disconnects even the connections that are
1568  * established by other users in the same local session using different
1569  * user mappings. This leads even non-superuser to be able to close
1570  * the connections established by superusers in the same local session.
1571  *
1572  * XXX As of now we don't see any security risk doing this. But we should
1573  * set some restrictions on that, for example, prevent non-superuser
1574  * from closing the connections established by superusers even
1575  * in the same session?
1576  */
1577 static bool
1579 {
1580  HASH_SEQ_STATUS scan;
1581  ConnCacheEntry *entry;
1582  bool all = !OidIsValid(serverid);
1583  bool result = false;
1584 
1585  /*
1586  * Connection cache hashtable has not been initialized yet in this
1587  * session, so return false.
1588  */
1589  if (!ConnectionHash)
1590  return false;
1591 
1592  hash_seq_init(&scan, ConnectionHash);
1593  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1594  {
1595  /* Ignore cache entry if no open connection right now. */
1596  if (!entry->conn)
1597  continue;
1598 
1599  if (all || entry->serverid == serverid)
1600  {
1601  /*
1602  * Emit a warning because the connection to close is used in the
1603  * current transaction and cannot be disconnected right now.
1604  */
1605  if (entry->xact_depth > 0)
1606  {
1607  ForeignServer *server;
1608 
1609  server = GetForeignServerExtended(entry->serverid,
1610  FSV_MISSING_OK);
1611 
1612  if (!server)
1613  {
1614  /*
1615  * If the foreign server was dropped while its connection
1616  * was used in the current transaction, the connection
1617  * must have been marked as invalid by
1618  * pgfdw_inval_callback at the end of DROP SERVER command.
1619  */
1620  Assert(entry->invalidated);
1621 
1622  ereport(WARNING,
1623  (errmsg("cannot close dropped server connection because it is still in use")));
1624  }
1625  else
1626  ereport(WARNING,
1627  (errmsg("cannot close connection for server \"%s\" because it is still in use",
1628  server->servername)));
1629  }
1630  else
1631  {
1632  elog(DEBUG3, "discarding connection %p", entry->conn);
1633  disconnect_pg_server(entry);
1634  result = true;
1635  }
1636  }
1637  }
1638 
1639  return result;
1640 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
Oid umid
Definition: foreign.h:47
XactEvent
Definition: xact.h:113
Datum postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
Definition: connection.c:1550
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define IsA(nodeptr, _type_)
Definition: nodes.h:587
static void configure_remote_session(PGconn *conn)
Definition: connection.c:552
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
int errhint(const char *fmt,...)
Definition: elog.c:1156
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define HASH_ELEM
Definition: hsearch.h:95
#define WL_TIMEOUT
Definition: latch.h:128
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1251
int sqlerrcode
Definition: elog.h:381
#define DEBUG3
Definition: elog.h:23
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
struct ConnCacheEntry ConnCacheEntry
#define PG_WAIT_EXTENSION
Definition: wait_event.h:23
ErrorData * CopyErrorData(void)
Definition: elog.c:1560
int64 TimestampTz
Definition: timestamp.h:39
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:64
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4375
ConnCacheKey key
Definition: connection.c:53
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:76
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:515
int errcode(int sqlerrcode)
Definition: elog.c:698
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4231
#define MemSet(start, val, len)
Definition: c.h:1008
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
Definition: connection.c:702
#define WL_SOCKET_READABLE
Definition: latch.h:126
bool have_prep_stmt
Definition: connection.c:58
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:202
uint32 server_hashvalue
Definition: connection.c:65
uint32 SubTransactionId
Definition: c.h:591
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6734
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
#define OidIsValid(objectId)
Definition: c.h:710
Oid userid
Definition: foreign.h:48
void FlushErrorState(void)
Definition: elog.c:1654
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
void ResetLatch(Latch *latch)
Definition: latch.c:660
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:657
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1069
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
void ReleaseConnection(PGconn *conn)
Definition: connection.c:654
char * pchomp(const char *in)
Definition: mcxt.c:1327
Datum postgres_fdw_get_connections(PG_FUNCTION_ARGS)
Definition: connection.c:1402
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1326
Definition: dynahash.c:219
bool defGetBoolean(DefElem *def)
Definition: define.c:106
void pfree(void *pointer)
Definition: mcxt.c:1169
static unsigned int prep_stmt_number
Definition: connection.c:77
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:377
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:127
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1616
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
bool changing_xact_state
Definition: connection.c:60
Datum postgres_fdw_disconnect(PG_FUNCTION_ARGS)
Definition: connection.c:1529
void process_pending_request(AsyncRequest *areq)
bool keep_connections
Definition: connection.c:62
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4352
int errdetail(const char *fmt,...)
Definition: elog.c:1042
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
List * options
Definition: foreign.h:50
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
unsigned int uint32
Definition: c.h:441
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:795
PgFdwConnState state
Definition: connection.c:67
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:180
static unsigned int cursor_number
Definition: connection.c:76
bool invalidated
Definition: connection.c:61
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel)
Definition: connection.c:1334
#define FSV_MISSING_OK
Definition: foreign.h:61
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6699
#define WARNING
Definition: elog.h:40
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1202
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1065
static int elevel
Definition: vacuumlazy.c:401
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
char * pgfdw_application_name
Definition: option.c:51
#define HASH_BLOBS
Definition: hsearch.h:97
SubXactEvent
Definition: xact.h:127
#define PG_FINALLY()
Definition: elog.h:330
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:1578
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1498
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:850
static HTAB * ConnectionHash
Definition: connection.c:73
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
uintptr_t Datum
Definition: postgres.h:411
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3596
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1150
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1904
Size keysize
Definition: hsearch.h:75
int work_mem
Definition: globals.c:124
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:492
bool AcquireExternalFD(void)
Definition: fd.c:1175
#define BoolGetDatum(X)
Definition: postgres.h:446
#define InvalidOid
Definition: postgres_ext.h:36
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
#define ereport(elevel,...)
Definition: elog.h:157
int allowedModes
Definition: execnodes.h:306
void PQclear(PGresult *res)
Definition: fe-exec.c:694
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:121
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:134
uint32 mapping_hashvalue
Definition: connection.c:66
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:858
bool in_error_recursion_trouble(void)
Definition: elog.c:291
#define PG_RETURN_VOID()
Definition: fmgr.h:349
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
SetFunctionReturnMode returnMode
Definition: execnodes.h:308
#define PG_CATCH()
Definition: elog.h:323
PGconn * conn
Definition: connection.c:54
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:730
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3233
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:477
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1216
Definition: regguts.h:317
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3541
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1951
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1115
static bool xact_got_connection
Definition: connection.c:80
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:675
static int list_length(const List *l)
Definition: pg_list.h:149
Oid serverid
Definition: foreign.h:49
#define PG_RE_THROW()
Definition: elog.h:354
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:234
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
Tuplestorestate * setResult
Definition: execnodes.h:311
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections)
static Datum values[MAXATTR]
Definition: bootstrap.c:156
char * text_to_cstring(const text *t)
Definition: varlena.c:222
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6810
ExprContext * econtext
Definition: execnodes.h:304
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4507
void ReleaseExternalFD(void)
Definition: fd.c:1228
static char * user
Definition: pg_regress.c:95
TupleDesc setDesc
Definition: execnodes.h:312
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:287
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define IsolationIsSerializable()
Definition: xact.h:52
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:588
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:232
int i
#define errcontext
Definition: elog.h:204
#define CStringGetTextDatum(s)
Definition: builtins.h:86
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
void * arg
struct Latch * MyLatch
Definition: globals.c:57
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
char * defname
Definition: parsenodes.h:758
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:689
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:981
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6691
#define PG_TRY()
Definition: elog.h:313
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:611
Oid ConnCacheKey
Definition: connection.c:49
#define snprintf
Definition: port.h:217
List * options
Definition: foreign.h:42
#define WL_LATCH_SET
Definition: latch.h:125
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6770
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
Oid serverid
Definition: foreign.h:36
#define PG_END_TRY()
Definition: elog.h:338
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1693
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:340