PostgreSQL Source Code  git master
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
18 #include "commands/defrem.h"
19 #include "funcapi.h"
20 #include "mb/pg_wchar.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postgres_fdw.h"
24 #include "storage/fd.h"
25 #include "storage/latch.h"
26 #include "utils/builtins.h"
27 #include "utils/datetime.h"
28 #include "utils/hsearch.h"
29 #include "utils/inval.h"
30 #include "utils/memutils.h"
31 #include "utils/syscache.h"
32 
33 /*
34  * Connection cache hash table entry
35  *
36  * The lookup key in this hash table is the user mapping OID. We use just one
37  * connection per user mapping ID, which ensures that all the scans use the
38  * same snapshot during a query. Using the user mapping OID rather than
39  * the foreign server OID + user OID avoids creating multiple connections when
40  * the public user mapping applies to all user OIDs.
41  *
42  * The "conn" pointer can be NULL if we don't currently have a live connection.
43  * When we do have a connection, xact_depth tracks the current depth of
44  * transactions and subtransactions open on the remote side. We need to issue
45  * commands at the same nesting depth on the remote as we're executing at
46  * ourselves, so that rolling back a subtransaction will kill the right
47  * queries and not the wrong ones.
48  */
49 typedef Oid ConnCacheKey;
50 
51 typedef struct ConnCacheEntry
52 {
53  ConnCacheKey key; /* hash key (must be first) */
54  PGconn *conn; /* connection to foreign server, or NULL */
55  /* Remaining fields are invalid when conn is NULL: */
56  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
57  * one level of subxact open, etc */
58  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
59  bool have_error; /* have any subxacts aborted in this xact? */
60  bool changing_xact_state; /* xact state change in process */
61  bool invalidated; /* true if reconnect is pending */
62  Oid serverid; /* foreign server OID used to get server name */
63  uint32 server_hashvalue; /* hash value of foreign server OID */
64  uint32 mapping_hashvalue; /* hash value of user mapping OID */
66 
67 /*
68  * Connection cache (initialized on first use)
69  */
70 static HTAB *ConnectionHash = NULL;
71 
72 /* for assigning cursor numbers and prepared statement numbers */
73 static unsigned int cursor_number = 0;
74 static unsigned int prep_stmt_number = 0;
75 
76 /* tracks whether any work is needed in callback functions */
77 static bool xact_got_connection = false;
78 
79 /*
80  * SQL functions
81  */
85 
86 /* prototypes of private functions */
89 static void disconnect_pg_server(ConnCacheEntry *entry);
90 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
92 static void do_sql_command(PGconn *conn, const char *sql);
93 static void begin_remote_xact(ConnCacheEntry *entry);
94 static void pgfdw_xact_callback(XactEvent event, void *arg);
95 static void pgfdw_subxact_callback(SubXactEvent event,
96  SubTransactionId mySubid,
97  SubTransactionId parentSubid,
98  void *arg);
99 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
101 static bool pgfdw_cancel_query(PGconn *conn);
102 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
103  bool ignore_errors);
104 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
105  PGresult **result);
108 
109 /*
110  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
111  * server with the user's authorization. A new connection is established
112  * if we don't already have a suitable one, and a transaction is opened at
113  * the right subtransaction nesting depth if we didn't do that already.
114  *
115  * will_prep_stmt must be true if caller intends to create any prepared
116  * statements. Since those don't go away automatically at transaction end
117  * (not even on error), we need this flag to cue manual cleanup.
118  */
119 PGconn *
120 GetConnection(UserMapping *user, bool will_prep_stmt)
121 {
122  bool found;
123  bool retry = false;
124  ConnCacheEntry *entry;
127 
128  /* First time through, initialize connection cache hashtable */
129  if (ConnectionHash == NULL)
130  {
131  HASHCTL ctl;
132 
133  ctl.keysize = sizeof(ConnCacheKey);
134  ctl.entrysize = sizeof(ConnCacheEntry);
135  ConnectionHash = hash_create("postgres_fdw connections", 8,
136  &ctl,
138 
139  /*
140  * Register some callback functions that manage connection cleanup.
141  * This should be done just once in each backend.
142  */
149  }
150 
151  /* Set flag that we did GetConnection during the current transaction */
152  xact_got_connection = true;
153 
154  /* Create hash key for the entry. Assume no pad bytes in key struct */
155  key = user->umid;
156 
157  /*
158  * Find or create cached entry for requested connection.
159  */
160  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
161  if (!found)
162  {
163  /*
164  * We need only clear "conn" here; remaining fields will be filled
165  * later when "conn" is set.
166  */
167  entry->conn = NULL;
168  }
169 
170  /* Reject further use of connections which failed abort cleanup. */
172 
173  /*
174  * If the connection needs to be remade due to invalidation, disconnect as
175  * soon as we're out of all transactions.
176  */
177  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
178  {
179  elog(DEBUG3, "closing connection %p for option changes to take effect",
180  entry->conn);
181  disconnect_pg_server(entry);
182  }
183 
184  /*
185  * If cache entry doesn't have a connection, we have to establish a new
186  * connection. (If connect_pg_server throws an error, the cache entry
187  * will remain in a valid empty state, ie conn == NULL.)
188  */
189  if (entry->conn == NULL)
190  make_new_connection(entry, user);
191 
192  /*
193  * We check the health of the cached connection here when starting a new
194  * remote transaction. If a broken connection is detected, we try to
195  * reestablish a new connection later.
196  */
197  PG_TRY();
198  {
199  /* Start a new transaction or subtransaction if needed. */
200  begin_remote_xact(entry);
201  }
202  PG_CATCH();
203  {
205  ErrorData *errdata = CopyErrorData();
206 
207  /*
208  * If connection failure is reported when starting a new remote
209  * transaction (not subtransaction), new connection will be
210  * reestablished later.
211  *
212  * After a broken connection is detected in libpq, any error other
213  * than connection failure (e.g., out-of-memory) can be thrown
214  * somewhere between return from libpq and the expected ereport() call
215  * in pgfdw_report_error(). In this case, since PQstatus() indicates
216  * CONNECTION_BAD, checking only PQstatus() causes the false detection
217  * of connection failure. To avoid this, we also verify that the
218  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
219  * checking only the sqlstate can cause another false detection
220  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
221  * for any libpq-originated error condition.
222  */
223  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
224  PQstatus(entry->conn) != CONNECTION_BAD ||
225  entry->xact_depth > 0)
226  {
227  MemoryContextSwitchTo(ecxt);
228  PG_RE_THROW();
229  }
230 
231  /* Clean up the error state */
232  FlushErrorState();
233  FreeErrorData(errdata);
234  errdata = NULL;
235 
236  retry = true;
237  }
238  PG_END_TRY();
239 
240  /*
241  * If a broken connection is detected, disconnect it, reestablish a new
242  * connection and retry a new remote transaction. If connection failure is
243  * reported again, we give up getting a connection.
244  */
245  if (retry)
246  {
247  Assert(entry->xact_depth == 0);
248 
249  ereport(DEBUG3,
250  (errmsg_internal("could not start remote transaction on connection %p",
251  entry->conn)),
252  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
253 
254  elog(DEBUG3, "closing connection %p to reestablish a new one",
255  entry->conn);
256  disconnect_pg_server(entry);
257 
258  if (entry->conn == NULL)
259  make_new_connection(entry, user);
260 
261  begin_remote_xact(entry);
262  }
263 
264  /* Remember if caller will prepare statements */
265  entry->have_prep_stmt |= will_prep_stmt;
266 
267  return entry->conn;
268 }
269 
270 /*
271  * Reset all transient state fields in the cached connection entry and
272  * establish new connection to the remote server.
273  */
274 static void
276 {
277  ForeignServer *server = GetForeignServer(user->serverid);
278 
279  Assert(entry->conn == NULL);
280 
281  /* Reset all transient state fields, to be sure all are clean */
282  entry->xact_depth = 0;
283  entry->have_prep_stmt = false;
284  entry->have_error = false;
285  entry->changing_xact_state = false;
286  entry->invalidated = false;
287  entry->serverid = server->serverid;
288  entry->server_hashvalue =
290  ObjectIdGetDatum(server->serverid));
291  entry->mapping_hashvalue =
293  ObjectIdGetDatum(user->umid));
294 
295  /* Now try to make the connection */
296  entry->conn = connect_pg_server(server, user);
297 
298  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
299  entry->conn, server->servername, user->umid, user->userid);
300 }
301 
302 /*
303  * Connect to remote server using specified server and user mapping properties.
304  */
305 static PGconn *
307 {
308  PGconn *volatile conn = NULL;
309 
310  /*
311  * Use PG_TRY block to ensure closing connection on error.
312  */
313  PG_TRY();
314  {
315  const char **keywords;
316  const char **values;
317  int n;
318 
319  /*
320  * Construct connection params from generic options of ForeignServer
321  * and UserMapping. (Some of them might not be libpq options, in
322  * which case we'll just waste a few array slots.) Add 3 extra slots
323  * for fallback_application_name, client_encoding, end marker.
324  */
325  n = list_length(server->options) + list_length(user->options) + 3;
326  keywords = (const char **) palloc(n * sizeof(char *));
327  values = (const char **) palloc(n * sizeof(char *));
328 
329  n = 0;
330  n += ExtractConnectionOptions(server->options,
331  keywords + n, values + n);
333  keywords + n, values + n);
334 
335  /* Use "postgres_fdw" as fallback_application_name. */
336  keywords[n] = "fallback_application_name";
337  values[n] = "postgres_fdw";
338  n++;
339 
340  /* Set client_encoding so that libpq can convert encoding properly. */
341  keywords[n] = "client_encoding";
342  values[n] = GetDatabaseEncodingName();
343  n++;
344 
345  keywords[n] = values[n] = NULL;
346 
347  /* verify the set of connection parameters */
348  check_conn_params(keywords, values, user);
349 
350  /*
351  * We must obey fd.c's limit on non-virtual file descriptors. Assume
352  * that a PGconn represents one long-lived FD. (Doing this here also
353  * ensures that VFDs are closed if needed to make room.)
354  */
355  if (!AcquireExternalFD())
356  {
357 #ifndef WIN32 /* can't write #if within ereport() macro */
358  ereport(ERROR,
359  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
360  errmsg("could not connect to server \"%s\"",
361  server->servername),
362  errdetail("There are too many open files on the local server."),
363  errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
364 #else
365  ereport(ERROR,
366  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
367  errmsg("could not connect to server \"%s\"",
368  server->servername),
369  errdetail("There are too many open files on the local server."),
370  errhint("Raise the server's max_files_per_process setting.")));
371 #endif
372  }
373 
374  /* OK to make connection */
375  conn = PQconnectdbParams(keywords, values, false);
376 
377  if (!conn)
378  ReleaseExternalFD(); /* because the PG_CATCH block won't */
379 
380  if (!conn || PQstatus(conn) != CONNECTION_OK)
381  ereport(ERROR,
382  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
383  errmsg("could not connect to server \"%s\"",
384  server->servername),
385  errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
386 
387  /*
388  * Check that non-superuser has used password to establish connection;
389  * otherwise, he's piggybacking on the postgres server's user
390  * identity. See also dblink_security_check() in contrib/dblink and
391  * check_conn_params.
392  */
393  if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
395  ereport(ERROR,
396  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
397  errmsg("password is required"),
398  errdetail("Non-superuser cannot connect if the server does not request a password."),
399  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
400 
401  /* Prepare new session for use */
403 
404  pfree(keywords);
405  pfree(values);
406  }
407  PG_CATCH();
408  {
409  /* Release PGconn data structure if we managed to create one */
410  if (conn)
411  {
412  PQfinish(conn);
414  }
415  PG_RE_THROW();
416  }
417  PG_END_TRY();
418 
419  return conn;
420 }
421 
422 /*
423  * Disconnect any open connection for a connection cache entry.
424  */
425 static void
427 {
428  if (entry->conn != NULL)
429  {
430  PQfinish(entry->conn);
431  entry->conn = NULL;
433  }
434 }
435 
436 /*
437  * Return true if the password_required is defined and false for this user
438  * mapping, otherwise false. The mapping has been pre-validated.
439  */
440 static bool
442 {
443  ListCell *cell;
444 
445  foreach(cell, user->options)
446  {
447  DefElem *def = (DefElem *) lfirst(cell);
448 
449  if (strcmp(def->defname, "password_required") == 0)
450  return defGetBoolean(def);
451  }
452 
453  return true;
454 }
455 
456 /*
457  * For non-superusers, insist that the connstr specify a password. This
458  * prevents a password from being picked up from .pgpass, a service file, the
459  * environment, etc. We don't want the postgres user's passwords,
460  * certificates, etc to be accessible to non-superusers. (See also
461  * dblink_connstr_check in contrib/dblink.)
462  */
463 static void
464 check_conn_params(const char **keywords, const char **values, UserMapping *user)
465 {
466  int i;
467 
468  /* no check required if superuser */
469  if (superuser_arg(user->userid))
470  return;
471 
472  /* ok if params contain a non-empty password */
473  for (i = 0; keywords[i] != NULL; i++)
474  {
475  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
476  return;
477  }
478 
479  /* ok if the superuser explicitly said so at user mapping creation time */
480  if (!UserMappingPasswordRequired(user))
481  return;
482 
483  ereport(ERROR,
484  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
485  errmsg("password is required"),
486  errdetail("Non-superusers must provide a password in the user mapping.")));
487 }
488 
489 /*
490  * Issue SET commands to make sure remote session is configured properly.
491  *
492  * We do this just once at connection, assuming nothing will change the
493  * values later. Since we'll never send volatile function calls to the
494  * remote, there shouldn't be any way to break this assumption from our end.
495  * It's possible to think of ways to break it at the remote end, eg making
496  * a foreign table point to a view that includes a set_config call ---
497  * but once you admit the possibility of a malicious view definition,
498  * there are any number of ways to break things.
499  */
500 static void
502 {
503  int remoteversion = PQserverVersion(conn);
504 
505  /* Force the search path to contain only pg_catalog (see deparse.c) */
506  do_sql_command(conn, "SET search_path = pg_catalog");
507 
508  /*
509  * Set remote timezone; this is basically just cosmetic, since all
510  * transmitted and returned timestamptzs should specify a zone explicitly
511  * anyway. However it makes the regression test outputs more predictable.
512  *
513  * We don't risk setting remote zone equal to ours, since the remote
514  * server might use a different timezone database. Instead, use UTC
515  * (quoted, because very old servers are picky about case).
516  */
517  do_sql_command(conn, "SET timezone = 'UTC'");
518 
519  /*
520  * Set values needed to ensure unambiguous data output from remote. (This
521  * logic should match what pg_dump does. See also set_transmission_modes
522  * in postgres_fdw.c.)
523  */
524  do_sql_command(conn, "SET datestyle = ISO");
525  if (remoteversion >= 80400)
526  do_sql_command(conn, "SET intervalstyle = postgres");
527  if (remoteversion >= 90000)
528  do_sql_command(conn, "SET extra_float_digits = 3");
529  else
530  do_sql_command(conn, "SET extra_float_digits = 2");
531 }
532 
533 /*
534  * Convenience subroutine to issue a non-data-returning SQL command to remote
535  */
536 static void
537 do_sql_command(PGconn *conn, const char *sql)
538 {
539  PGresult *res;
540 
541  if (!PQsendQuery(conn, sql))
542  pgfdw_report_error(ERROR, NULL, conn, false, sql);
543  res = pgfdw_get_result(conn, sql);
544  if (PQresultStatus(res) != PGRES_COMMAND_OK)
545  pgfdw_report_error(ERROR, res, conn, true, sql);
546  PQclear(res);
547 }
548 
549 /*
550  * Start remote transaction or subtransaction, if needed.
551  *
552  * Note that we always use at least REPEATABLE READ in the remote session.
553  * This is so that, if a query initiates multiple scans of the same or
554  * different foreign tables, we will get snapshot-consistent results from
555  * those scans. A disadvantage is that we can't provide sane emulation of
556  * READ COMMITTED behavior --- it would be nice if we had some other way to
557  * control which remote queries share a snapshot.
558  */
559 static void
561 {
562  int curlevel = GetCurrentTransactionNestLevel();
563 
564  /* Start main transaction if we haven't yet */
565  if (entry->xact_depth <= 0)
566  {
567  const char *sql;
568 
569  elog(DEBUG3, "starting remote transaction on connection %p",
570  entry->conn);
571 
573  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
574  else
575  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
576  entry->changing_xact_state = true;
577  do_sql_command(entry->conn, sql);
578  entry->xact_depth = 1;
579  entry->changing_xact_state = false;
580  }
581 
582  /*
583  * If we're in a subtransaction, stack up savepoints to match our level.
584  * This ensures we can rollback just the desired effects when a
585  * subtransaction aborts.
586  */
587  while (entry->xact_depth < curlevel)
588  {
589  char sql[64];
590 
591  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
592  entry->changing_xact_state = true;
593  do_sql_command(entry->conn, sql);
594  entry->xact_depth++;
595  entry->changing_xact_state = false;
596  }
597 }
598 
599 /*
600  * Release connection reference count created by calling GetConnection.
601  */
602 void
604 {
605  /*
606  * Currently, we don't actually track connection references because all
607  * cleanup is managed on a transaction or subtransaction basis instead. So
608  * there's nothing to do here.
609  */
610 }
611 
612 /*
613  * Assign a "unique" number for a cursor.
614  *
615  * These really only need to be unique per connection within a transaction.
616  * For the moment we ignore the per-connection point and assign them across
617  * all connections in the transaction, but we ask for the connection to be
618  * supplied in case we want to refine that.
619  *
620  * Note that even if wraparound happens in a very long transaction, actual
621  * collisions are highly improbable; just be sure to use %u not %d to print.
622  */
623 unsigned int
625 {
626  return ++cursor_number;
627 }
628 
629 /*
630  * Assign a "unique" number for a prepared statement.
631  *
632  * This works much like GetCursorNumber, except that we never reset the counter
633  * within a session. That's because we can't be 100% sure we've gotten rid
634  * of all prepared statements on all connections, and it's not really worth
635  * increasing the risk of prepared-statement name collisions by resetting.
636  */
637 unsigned int
639 {
640  return ++prep_stmt_number;
641 }
642 
643 /*
644  * Submit a query and wait for the result.
645  *
646  * This function is interruptible by signals.
647  *
648  * Caller is responsible for the error handling on the result.
649  */
650 PGresult *
651 pgfdw_exec_query(PGconn *conn, const char *query)
652 {
653  /*
654  * Submit a query. Since we don't use non-blocking mode, this also can
655  * block. But its risk is relatively small, so we ignore that for now.
656  */
657  if (!PQsendQuery(conn, query))
658  pgfdw_report_error(ERROR, NULL, conn, false, query);
659 
660  /* Wait for the result. */
661  return pgfdw_get_result(conn, query);
662 }
663 
664 /*
665  * Wait for the result from a prior asynchronous execution function call.
666  *
667  * This function offers quick responsiveness by checking for any interruptions.
668  *
669  * This function emulates PQexec()'s behavior of returning the last result
670  * when there are many.
671  *
672  * Caller is responsible for the error handling on the result.
673  */
674 PGresult *
675 pgfdw_get_result(PGconn *conn, const char *query)
676 {
677  PGresult *volatile last_res = NULL;
678 
679  /* In what follows, do not leak any PGresults on an error. */
680  PG_TRY();
681  {
682  for (;;)
683  {
684  PGresult *res;
685 
686  while (PQisBusy(conn))
687  {
688  int wc;
689 
690  /* Sleep until there's something to do */
694  PQsocket(conn),
695  -1L, PG_WAIT_EXTENSION);
697 
699 
700  /* Data available in socket? */
701  if (wc & WL_SOCKET_READABLE)
702  {
703  if (!PQconsumeInput(conn))
704  pgfdw_report_error(ERROR, NULL, conn, false, query);
705  }
706  }
707 
708  res = PQgetResult(conn);
709  if (res == NULL)
710  break; /* query is complete */
711 
712  PQclear(last_res);
713  last_res = res;
714  }
715  }
716  PG_CATCH();
717  {
718  PQclear(last_res);
719  PG_RE_THROW();
720  }
721  PG_END_TRY();
722 
723  return last_res;
724 }
725 
726 /*
727  * Report an error we got from the remote server.
728  *
729  * elevel: error level to use (typically ERROR, but might be less)
730  * res: PGresult containing the error
731  * conn: connection we did the query on
732  * clear: if true, PQclear the result (otherwise caller will handle it)
733  * sql: NULL, or text of remote command we tried to execute
734  *
735  * Note: callers that choose not to throw ERROR for a remote error are
736  * responsible for making sure that the associated ConnCacheEntry gets
737  * marked with have_error = true.
738  */
739 void
741  bool clear, const char *sql)
742 {
743  /* If requested, PGresult must be released before leaving this function. */
744  PG_TRY();
745  {
746  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
747  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
748  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
749  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
750  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
751  int sqlstate;
752 
753  if (diag_sqlstate)
754  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
755  diag_sqlstate[1],
756  diag_sqlstate[2],
757  diag_sqlstate[3],
758  diag_sqlstate[4]);
759  else
760  sqlstate = ERRCODE_CONNECTION_FAILURE;
761 
762  /*
763  * If we don't get a message from the PGresult, try the PGconn. This
764  * is needed because for connection-level failures, PQexec may just
765  * return NULL, not a PGresult at all.
766  */
767  if (message_primary == NULL)
768  message_primary = pchomp(PQerrorMessage(conn));
769 
770  ereport(elevel,
771  (errcode(sqlstate),
772  message_primary ? errmsg_internal("%s", message_primary) :
773  errmsg("could not obtain message string for remote error"),
774  message_detail ? errdetail_internal("%s", message_detail) : 0,
775  message_hint ? errhint("%s", message_hint) : 0,
776  message_context ? errcontext("%s", message_context) : 0,
777  sql ? errcontext("remote SQL command: %s", sql) : 0));
778  }
779  PG_FINALLY();
780  {
781  if (clear)
782  PQclear(res);
783  }
784  PG_END_TRY();
785 }
786 
787 /*
788  * pgfdw_xact_callback --- cleanup at main-transaction end.
789  *
790  * This runs just late enough that it must not enter user-defined code
791  * locally. (Entering such code on the remote side is fine. Its remote
792  * COMMIT TRANSACTION may run deferred triggers.)
793  */
794 static void
796 {
797  HASH_SEQ_STATUS scan;
798  ConnCacheEntry *entry;
799 
800  /* Quick exit if no connections were touched in this transaction. */
801  if (!xact_got_connection)
802  return;
803 
804  /*
805  * Scan all connection cache entries to find open remote transactions, and
806  * close them.
807  */
808  hash_seq_init(&scan, ConnectionHash);
809  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
810  {
811  PGresult *res;
812 
813  /* Ignore cache entry if no open connection right now */
814  if (entry->conn == NULL)
815  continue;
816 
817  /* If it has an open remote transaction, try to close it */
818  if (entry->xact_depth > 0)
819  {
820  bool abort_cleanup_failure = false;
821 
822  elog(DEBUG3, "closing remote transaction on connection %p",
823  entry->conn);
824 
825  switch (event)
826  {
829 
830  /*
831  * If abort cleanup previously failed for this connection,
832  * we can't issue any more commands against it.
833  */
835 
836  /* Commit all remote transactions during pre-commit */
837  entry->changing_xact_state = true;
838  do_sql_command(entry->conn, "COMMIT TRANSACTION");
839  entry->changing_xact_state = false;
840 
841  /*
842  * If there were any errors in subtransactions, and we
843  * made prepared statements, do a DEALLOCATE ALL to make
844  * sure we get rid of all prepared statements. This is
845  * annoying and not terribly bulletproof, but it's
846  * probably not worth trying harder.
847  *
848  * DEALLOCATE ALL only exists in 8.3 and later, so this
849  * constrains how old a server postgres_fdw can
850  * communicate with. We intentionally ignore errors in
851  * the DEALLOCATE, so that we can hobble along to some
852  * extent with older servers (leaking prepared statements
853  * as we go; but we don't really support update operations
854  * pre-8.3 anyway).
855  */
856  if (entry->have_prep_stmt && entry->have_error)
857  {
858  res = PQexec(entry->conn, "DEALLOCATE ALL");
859  PQclear(res);
860  }
861  entry->have_prep_stmt = false;
862  entry->have_error = false;
863  break;
865 
866  /*
867  * We disallow any remote transactions, since it's not
868  * very reasonable to hold them open until the prepared
869  * transaction is committed. For the moment, throw error
870  * unconditionally; later we might allow read-only cases.
871  * Note that the error will cause us to come right back
872  * here with event == XACT_EVENT_ABORT, so we'll clean up
873  * the connection state at that point.
874  */
875  ereport(ERROR,
876  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
877  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
878  break;
880  case XACT_EVENT_COMMIT:
881  case XACT_EVENT_PREPARE:
882  /* Pre-commit should have closed the open transaction */
883  elog(ERROR, "missed cleaning up connection during pre-commit");
884  break;
886  case XACT_EVENT_ABORT:
887 
888  /*
889  * Don't try to clean up the connection if we're already
890  * in error recursion trouble.
891  */
893  entry->changing_xact_state = true;
894 
895  /*
896  * If connection is already unsalvageable, don't touch it
897  * further.
898  */
899  if (entry->changing_xact_state)
900  break;
901 
902  /*
903  * Mark this connection as in the process of changing
904  * transaction state.
905  */
906  entry->changing_xact_state = true;
907 
908  /* Assume we might have lost track of prepared statements */
909  entry->have_error = true;
910 
911  /*
912  * If a command has been submitted to the remote server by
913  * using an asynchronous execution function, the command
914  * might not have yet completed. Check to see if a
915  * command is still being processed by the remote server,
916  * and if so, request cancellation of the command.
917  */
918  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
919  !pgfdw_cancel_query(entry->conn))
920  {
921  /* Unable to cancel running query. */
922  abort_cleanup_failure = true;
923  }
924  else if (!pgfdw_exec_cleanup_query(entry->conn,
925  "ABORT TRANSACTION",
926  false))
927  {
928  /* Unable to abort remote transaction. */
929  abort_cleanup_failure = true;
930  }
931  else if (entry->have_prep_stmt && entry->have_error &&
933  "DEALLOCATE ALL",
934  true))
935  {
936  /* Trouble clearing prepared statements. */
937  abort_cleanup_failure = true;
938  }
939  else
940  {
941  entry->have_prep_stmt = false;
942  entry->have_error = false;
943  }
944 
945  /* Disarm changing_xact_state if it all worked. */
946  entry->changing_xact_state = abort_cleanup_failure;
947  break;
948  }
949  }
950 
951  /* Reset state to show we're out of a transaction */
952  entry->xact_depth = 0;
953 
954  /*
955  * If the connection isn't in a good idle state or it is marked as
956  * invalid, then discard it to recover. Next GetConnection will open a
957  * new connection.
958  */
959  if (PQstatus(entry->conn) != CONNECTION_OK ||
961  entry->changing_xact_state ||
962  entry->invalidated)
963  {
964  elog(DEBUG3, "discarding connection %p", entry->conn);
965  disconnect_pg_server(entry);
966  }
967  }
968 
969  /*
970  * Regardless of the event type, we can now mark ourselves as out of the
971  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
972  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
973  */
974  xact_got_connection = false;
975 
976  /* Also reset cursor numbering for next transaction */
977  cursor_number = 0;
978 }
979 
980 /*
981  * pgfdw_subxact_callback --- cleanup at subtransaction end.
982  */
983 static void
985  SubTransactionId parentSubid, void *arg)
986 {
987  HASH_SEQ_STATUS scan;
988  ConnCacheEntry *entry;
989  int curlevel;
990 
991  /* Nothing to do at subxact start, nor after commit. */
992  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
993  event == SUBXACT_EVENT_ABORT_SUB))
994  return;
995 
996  /* Quick exit if no connections were touched in this transaction. */
997  if (!xact_got_connection)
998  return;
999 
1000  /*
1001  * Scan all connection cache entries to find open remote subtransactions
1002  * of the current level, and close them.
1003  */
1004  curlevel = GetCurrentTransactionNestLevel();
1005  hash_seq_init(&scan, ConnectionHash);
1006  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1007  {
1008  char sql[100];
1009 
1010  /*
1011  * We only care about connections with open remote subtransactions of
1012  * the current level.
1013  */
1014  if (entry->conn == NULL || entry->xact_depth < curlevel)
1015  continue;
1016 
1017  if (entry->xact_depth > curlevel)
1018  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1019  entry->xact_depth);
1020 
1021  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1022  {
1023  /*
1024  * If abort cleanup previously failed for this connection, we
1025  * can't issue any more commands against it.
1026  */
1028 
1029  /* Commit all remote subtransactions during pre-commit */
1030  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1031  entry->changing_xact_state = true;
1032  do_sql_command(entry->conn, sql);
1033  entry->changing_xact_state = false;
1034  }
1035  else if (in_error_recursion_trouble())
1036  {
1037  /*
1038  * Don't try to clean up the connection if we're already in error
1039  * recursion trouble.
1040  */
1041  entry->changing_xact_state = true;
1042  }
1043  else if (!entry->changing_xact_state)
1044  {
1045  bool abort_cleanup_failure = false;
1046 
1047  /* Remember that abort cleanup is in progress. */
1048  entry->changing_xact_state = true;
1049 
1050  /* Assume we might have lost track of prepared statements */
1051  entry->have_error = true;
1052 
1053  /*
1054  * If a command has been submitted to the remote server by using
1055  * an asynchronous execution function, the command might not have
1056  * yet completed. Check to see if a command is still being
1057  * processed by the remote server, and if so, request cancellation
1058  * of the command.
1059  */
1060  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1061  !pgfdw_cancel_query(entry->conn))
1062  abort_cleanup_failure = true;
1063  else
1064  {
1065  /* Rollback all remote subtransactions during abort */
1066  snprintf(sql, sizeof(sql),
1067  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
1068  curlevel, curlevel);
1069  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1070  abort_cleanup_failure = true;
1071  }
1072 
1073  /* Disarm changing_xact_state if it all worked. */
1074  entry->changing_xact_state = abort_cleanup_failure;
1075  }
1076 
1077  /* OK, we're outta that level of subtransaction */
1078  entry->xact_depth--;
1079  }
1080 }
1081 
1082 /*
1083  * Connection invalidation callback function
1084  *
1085  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1086  * close connections depending on that entry immediately if current transaction
1087  * has not used those connections yet. Otherwise, mark those connections as
1088  * invalid and then make pgfdw_xact_callback() close them at the end of current
1089  * transaction, since they cannot be closed in the midst of the transaction
1090  * using them. Closed connections will be remade at the next opportunity if
1091  * necessary.
1092  *
1093  * Although most cache invalidation callbacks blow away all the related stuff
1094  * regardless of the given hashvalue, connections are expensive enough that
1095  * it's worth trying to avoid that.
1096  *
1097  * NB: We could avoid unnecessary disconnection more strictly by examining
1098  * individual option values, but it seems too much effort for the gain.
1099  */
1100 static void
1101 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1102 {
1103  HASH_SEQ_STATUS scan;
1104  ConnCacheEntry *entry;
1105 
1106  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1107 
1108  /* ConnectionHash must exist already, if we're registered */
1109  hash_seq_init(&scan, ConnectionHash);
1110  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1111  {
1112  /* Ignore invalid entries */
1113  if (entry->conn == NULL)
1114  continue;
1115 
1116  /* hashvalue == 0 means a cache reset, must clear all state */
1117  if (hashvalue == 0 ||
1118  (cacheid == FOREIGNSERVEROID &&
1119  entry->server_hashvalue == hashvalue) ||
1120  (cacheid == USERMAPPINGOID &&
1121  entry->mapping_hashvalue == hashvalue))
1122  {
1123  /*
1124  * Close the connection immediately if it's not used yet in this
1125  * transaction. Otherwise mark it as invalid so that
1126  * pgfdw_xact_callback() can close it at the end of this
1127  * transaction.
1128  */
1129  if (entry->xact_depth == 0)
1130  {
1131  elog(DEBUG3, "discarding connection %p", entry->conn);
1132  disconnect_pg_server(entry);
1133  }
1134  else
1135  entry->invalidated = true;
1136  }
1137  }
1138 }
1139 
1140 /*
1141  * Raise an error if the given connection cache entry is marked as being
1142  * in the middle of an xact state change. This should be called at which no
1143  * such change is expected to be in progress; if one is found to be in
1144  * progress, it means that we aborted in the middle of a previous state change
1145  * and now don't know what the remote transaction state actually is.
1146  * Such connections can't safely be further used. Re-establishing the
1147  * connection would change the snapshot and roll back any writes already
1148  * performed, so that's not an option, either. Thus, we must abort.
1149  */
1150 static void
1152 {
1153  ForeignServer *server;
1154 
1155  /* nothing to do for inactive entries and entries of sane state */
1156  if (entry->conn == NULL || !entry->changing_xact_state)
1157  return;
1158 
1159  /* make sure this entry is inactive */
1160  disconnect_pg_server(entry);
1161 
1162  /* find server name to be shown in the message below */
1163  server = GetForeignServer(entry->serverid);
1164 
1165  ereport(ERROR,
1166  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1167  errmsg("connection to server \"%s\" was lost",
1168  server->servername)));
1169 }
1170 
1171 /*
1172  * Cancel the currently-in-progress query (whose query text we do not have)
1173  * and ignore the result. Returns true if we successfully cancel the query
1174  * and discard any pending result, and false if not.
1175  */
1176 static bool
1178 {
1179  PGcancel *cancel;
1180  char errbuf[256];
1181  PGresult *result = NULL;
1182  TimestampTz endtime;
1183 
1184  /*
1185  * If it takes too long to cancel the query and discard the result, assume
1186  * the connection is dead.
1187  */
1189 
1190  /*
1191  * Issue cancel request. Unfortunately, there's no good way to limit the
1192  * amount of time that we might block inside PQgetCancel().
1193  */
1194  if ((cancel = PQgetCancel(conn)))
1195  {
1196  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1197  {
1198  ereport(WARNING,
1199  (errcode(ERRCODE_CONNECTION_FAILURE),
1200  errmsg("could not send cancel request: %s",
1201  errbuf)));
1202  PQfreeCancel(cancel);
1203  return false;
1204  }
1205  PQfreeCancel(cancel);
1206  }
1207 
1208  /* Get and discard the result of the query. */
1209  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1210  return false;
1211  PQclear(result);
1212 
1213  return true;
1214 }
1215 
1216 /*
1217  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1218  * result. If the query is executed without error, the return value is true.
1219  * If the query is executed successfully but returns an error, the return
1220  * value is true if and only if ignore_errors is set. If the query can't be
1221  * sent or times out, the return value is false.
1222  */
1223 static bool
1224 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1225 {
1226  PGresult *result = NULL;
1227  TimestampTz endtime;
1228 
1229  /*
1230  * If it takes too long to execute a cleanup query, assume the connection
1231  * is dead. It's fairly likely that this is why we aborted in the first
1232  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1233  * be too long.
1234  */
1236 
1237  /*
1238  * Submit a query. Since we don't use non-blocking mode, this also can
1239  * block. But its risk is relatively small, so we ignore that for now.
1240  */
1241  if (!PQsendQuery(conn, query))
1242  {
1243  pgfdw_report_error(WARNING, NULL, conn, false, query);
1244  return false;
1245  }
1246 
1247  /* Get the result of the query. */
1248  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1249  return false;
1250 
1251  /* Issue a warning if not successful. */
1252  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1253  {
1254  pgfdw_report_error(WARNING, result, conn, true, query);
1255  return ignore_errors;
1256  }
1257  PQclear(result);
1258 
1259  return true;
1260 }
1261 
1262 /*
1263  * Get, during abort cleanup, the result of a query that is in progress. This
1264  * might be a query that is being interrupted by transaction abort, or it might
1265  * be a query that was initiated as part of transaction abort to get the remote
1266  * side back to the appropriate state.
1267  *
1268  * It's not a huge problem if we throw an ERROR here, but if we get into error
1269  * recursion trouble, we'll end up slamming the connection shut, which will
1270  * necessitate failing the entire toplevel transaction even if subtransactions
1271  * were used. Try to use WARNING where we can.
1272  *
1273  * endtime is the time at which we should give up and assume the remote
1274  * side is dead. Returns true if the timeout expired, otherwise false.
1275  * Sets *result except in case of a timeout.
1276  */
1277 static bool
1279 {
1280  volatile bool timed_out = false;
1281  PGresult *volatile last_res = NULL;
1282 
1283  /* In what follows, do not leak any PGresults on an error. */
1284  PG_TRY();
1285  {
1286  for (;;)
1287  {
1288  PGresult *res;
1289 
1290  while (PQisBusy(conn))
1291  {
1292  int wc;
1294  long cur_timeout;
1295 
1296  /* If timeout has expired, give up, else get sleep time. */
1297  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1298  if (cur_timeout <= 0)
1299  {
1300  timed_out = true;
1301  goto exit;
1302  }
1303 
1304  /* Sleep until there's something to do */
1308  PQsocket(conn),
1309  cur_timeout, PG_WAIT_EXTENSION);
1311 
1313 
1314  /* Data available in socket? */
1315  if (wc & WL_SOCKET_READABLE)
1316  {
1317  if (!PQconsumeInput(conn))
1318  {
1319  /* connection trouble; treat the same as a timeout */
1320  timed_out = true;
1321  goto exit;
1322  }
1323  }
1324  }
1325 
1326  res = PQgetResult(conn);
1327  if (res == NULL)
1328  break; /* query is complete */
1329 
1330  PQclear(last_res);
1331  last_res = res;
1332  }
1333 exit: ;
1334  }
1335  PG_CATCH();
1336  {
1337  PQclear(last_res);
1338  PG_RE_THROW();
1339  }
1340  PG_END_TRY();
1341 
1342  if (timed_out)
1343  PQclear(last_res);
1344  else
1345  *result = last_res;
1346  return timed_out;
1347 }
1348 
1349 /*
1350  * List active foreign server connections.
1351  *
1352  * This function takes no input parameter and returns setof record made of
1353  * following values:
1354  * - server_name - server name of active connection. In case the foreign server
1355  * is dropped but still the connection is active, then the server name will
1356  * be NULL in output.
1357  * - valid - true/false representing whether the connection is valid or not.
1358  * Note that the connections can get invalidated in pgfdw_inval_callback.
1359  *
1360  * No records are returned when there are no cached connections at all.
1361  */
1362 Datum
1364 {
1365 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1366  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1367  TupleDesc tupdesc;
1368  Tuplestorestate *tupstore;
1369  MemoryContext per_query_ctx;
1370  MemoryContext oldcontext;
1371  HASH_SEQ_STATUS scan;
1372  ConnCacheEntry *entry;
1373 
1374  /* check to see if caller supports us returning a tuplestore */
1375  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1376  ereport(ERROR,
1377  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1378  errmsg("set-valued function called in context that cannot accept a set")));
1379  if (!(rsinfo->allowedModes & SFRM_Materialize))
1380  ereport(ERROR,
1381  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1382  errmsg("materialize mode required, but it is not allowed in this context")));
1383 
1384  /* Build a tuple descriptor for our result type */
1385  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1386  elog(ERROR, "return type must be a row type");
1387 
1388  /* Build tuplestore to hold the result rows */
1389  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1390  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1391 
1392  tupstore = tuplestore_begin_heap(true, false, work_mem);
1393  rsinfo->returnMode = SFRM_Materialize;
1394  rsinfo->setResult = tupstore;
1395  rsinfo->setDesc = tupdesc;
1396 
1397  MemoryContextSwitchTo(oldcontext);
1398 
1399  /* If cache doesn't exist, we return no records */
1400  if (!ConnectionHash)
1401  {
1402  /* clean up and return the tuplestore */
1403  tuplestore_donestoring(tupstore);
1404 
1405  PG_RETURN_VOID();
1406  }
1407 
1408  hash_seq_init(&scan, ConnectionHash);
1409  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1410  {
1411  ForeignServer *server;
1414 
1415  /* We only look for open remote connections */
1416  if (!entry->conn)
1417  continue;
1418 
1419  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
1420 
1421  MemSet(values, 0, sizeof(values));
1422  MemSet(nulls, 0, sizeof(nulls));
1423 
1424  /*
1425  * The foreign server may have been dropped in current explicit
1426  * transaction. It is not possible to drop the server from another
1427  * session when the connection associated with it is in use in the
1428  * current transaction, if tried so, the drop query in another session
1429  * blocks until the current transaction finishes.
1430  *
1431  * Even though the server is dropped in the current transaction, the
1432  * cache can still have associated active connection entry, say we
1433  * call such connections dangling. Since we can not fetch the server
1434  * name from system catalogs for dangling connections, instead we show
1435  * NULL value for server name in output.
1436  *
1437  * We could have done better by storing the server name in the cache
1438  * entry instead of server oid so that it could be used in the output.
1439  * But the server name in each cache entry requires 64 bytes of
1440  * memory, which is huge, when there are many cached connections and
1441  * the use case i.e. dropping the foreign server within the explicit
1442  * current transaction seems rare. So, we chose to show NULL value for
1443  * server name in output.
1444  *
1445  * Such dangling connections get closed either in next use or at the
1446  * end of current explicit transaction in pgfdw_xact_callback.
1447  */
1448  if (!server)
1449  {
1450  /*
1451  * If the server has been dropped in the current explicit
1452  * transaction, then this entry would have been invalidated in
1453  * pgfdw_inval_callback at the end of drop server command. Note
1454  * that this connection would not have been closed in
1455  * pgfdw_inval_callback because it is still being used in the
1456  * current explicit transaction. So, assert that here.
1457  */
1458  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
1459 
1460  /* Show null, if no server name was found */
1461  nulls[0] = true;
1462  }
1463  else
1464  values[0] = CStringGetTextDatum(server->servername);
1465 
1466  values[1] = BoolGetDatum(!entry->invalidated);
1467 
1468  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1469  }
1470 
1471  /* clean up and return the tuplestore */
1472  tuplestore_donestoring(tupstore);
1473 
1474  PG_RETURN_VOID();
1475 }
1476 
1477 /*
1478  * Disconnect the specified cached connections.
1479  *
1480  * This function discards the open connections that are established by
1481  * postgres_fdw from the local session to the foreign server with
1482  * the given name. Note that there can be multiple connections to
1483  * the given server using different user mappings. If the connections
1484  * are used in the current local transaction, they are not disconnected
1485  * and warning messages are reported. This function returns true
1486  * if it disconnects at least one connection, otherwise false. If no
1487  * foreign server with the given name is found, an error is reported.
1488  */
1489 Datum
1491 {
1492  ForeignServer *server;
1493  char *servername;
1494 
1495  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
1496  server = GetForeignServerByName(servername, false);
1497 
1499 }
1500 
1501 /*
1502  * Disconnect all the cached connections.
1503  *
1504  * This function discards all the open connections that are established by
1505  * postgres_fdw from the local session to the foreign servers.
1506  * If the connections are used in the current local transaction, they are
1507  * not disconnected and warning messages are reported. This function
1508  * returns true if it disconnects at least one connection, otherwise false.
1509  */
1510 Datum
1512 {
1514 }
1515 
1516 /*
1517  * Workhorse to disconnect cached connections.
1518  *
1519  * This function scans all the connection cache entries and disconnects
1520  * the open connections whose foreign server OID matches with
1521  * the specified one. If InvalidOid is specified, it disconnects all
1522  * the cached connections.
1523  *
1524  * This function emits a warning for each connection that's used in
1525  * the current transaction and doesn't close it. It returns true if
1526  * it disconnects at least one connection, otherwise false.
1527  *
1528  * Note that this function disconnects even the connections that are
1529  * established by other users in the same local session using different
1530  * user mappings. This leads even non-superuser to be able to close
1531  * the connections established by superusers in the same local session.
1532  *
1533  * XXX As of now we don't see any security risk doing this. But we should
1534  * set some restrictions on that, for example, prevent non-superuser
1535  * from closing the connections established by superusers even
1536  * in the same session?
1537  */
1538 static bool
1540 {
1541  HASH_SEQ_STATUS scan;
1542  ConnCacheEntry *entry;
1543  bool all = !OidIsValid(serverid);
1544  bool result = false;
1545 
1546  /*
1547  * Connection cache hashtable has not been initialized yet in this
1548  * session, so return false.
1549  */
1550  if (!ConnectionHash)
1551  return false;
1552 
1553  hash_seq_init(&scan, ConnectionHash);
1554  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1555  {
1556  /* Ignore cache entry if no open connection right now. */
1557  if (!entry->conn)
1558  continue;
1559 
1560  if (all || entry->serverid == serverid)
1561  {
1562  /*
1563  * Emit a warning because the connection to close is used in the
1564  * current transaction and cannot be disconnected right now.
1565  */
1566  if (entry->xact_depth > 0)
1567  {
1568  ForeignServer *server;
1569 
1570  server = GetForeignServerExtended(entry->serverid,
1571  FSV_MISSING_OK);
1572 
1573  if (!server)
1574  {
1575  /*
1576  * If the foreign server was dropped while its connection
1577  * was used in the current transaction, the connection
1578  * must have been marked as invalid by
1579  * pgfdw_inval_callback at the end of DROP SERVER command.
1580  */
1581  Assert(entry->invalidated);
1582 
1583  ereport(WARNING,
1584  (errmsg("cannot close dropped server connection because it is still in use")));
1585  }
1586  else
1587  ereport(WARNING,
1588  (errmsg("cannot close connection for server \"%s\" because it is still in use",
1589  server->servername)));
1590  }
1591  else
1592  {
1593  elog(DEBUG3, "discarding connection %p", entry->conn);
1594  disconnect_pg_server(entry);
1595  result = true;
1596  }
1597  }
1598  }
1599 
1600  return result;
1601 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
Oid umid
Definition: foreign.h:47
XactEvent
Definition: xact.h:113
Datum postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
Definition: connection.c:1511
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6632
#define IsA(nodeptr, _type_)
Definition: nodes.h:581
static void configure_remote_session(PGconn *conn)
Definition: connection.c:501
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
int errhint(const char *fmt,...)
Definition: elog.c:1162
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define HASH_ELEM
Definition: hsearch.h:95
#define WL_TIMEOUT
Definition: latch.h:127
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1278
int sqlerrcode
Definition: elog.h:378
#define DEBUG3
Definition: elog.h:23
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
struct ConnCacheEntry ConnCacheEntry
ErrorData * CopyErrorData(void)
Definition: elog.c:1565
int64 TimestampTz
Definition: timestamp.h:39
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:62
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4271
ConnCacheKey key
Definition: connection.c:53
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:76
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:464
int errcode(int sqlerrcode)
Definition: elog.c:704
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4127
#define MemSet(start, val, len)
Definition: c.h:1008
#define WL_SOCKET_READABLE
Definition: latch.h:125
bool have_prep_stmt
Definition: connection.c:58
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:202
uint32 server_hashvalue
Definition: connection.c:63
uint32 SubTransactionId
Definition: c.h:591
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6622
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
#define OidIsValid(objectId)
Definition: c.h:710
Oid userid
Definition: foreign.h:48
void FlushErrorState(void)
Definition: elog.c:1659
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2651
void ResetLatch(Latch *latch)
Definition: latch.c:588
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:651
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1075
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
void ReleaseConnection(PGconn *conn)
Definition: connection.c:603
char * pchomp(const char *in)
Definition: mcxt.c:1215
Datum postgres_fdw_get_connections(PG_FUNCTION_ARGS)
Definition: connection.c:1363
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1187
Definition: dynahash.c:219
bool defGetBoolean(DefElem *def)
Definition: define.c:111
void pfree(void *pointer)
Definition: mcxt.c:1057
static unsigned int prep_stmt_number
Definition: connection.c:74
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:347
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1621
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:45
bool changing_xact_state
Definition: connection.c:60
Datum postgres_fdw_disconnect(PG_FUNCTION_ARGS)
Definition: connection.c:1490
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:537
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4248
int errdetail(const char *fmt,...)
Definition: elog.c:1048
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
List * options
Definition: foreign.h:50
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
unsigned int uint32
Definition: c.h:441
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:740
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:180
static unsigned int cursor_number
Definition: connection.c:73
bool invalidated
Definition: connection.c:61
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
#define FSV_MISSING_OK
Definition: foreign.h:61
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6587
#define WARNING
Definition: elog.h:40
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1224
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1101
static int elevel
Definition: vacuumlazy.c:333
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define HASH_BLOBS
Definition: hsearch.h:97
SubXactEvent
Definition: xact.h:127
#define PG_FINALLY()
Definition: elog.h:326
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:1539
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:795
static HTAB * ConnectionHash
Definition: connection.c:70
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
uintptr_t Datum
Definition: postgres.h:367
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3582
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1177
#define PG_WAIT_EXTENSION
Definition: pgstat.h:900
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1672
Size keysize
Definition: hsearch.h:75
int work_mem
Definition: globals.c:122
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:441
bool AcquireExternalFD(void)
Definition: fd.c:1076
#define BoolGetDatum(X)
Definition: postgres.h:402
#define InvalidOid
Definition: postgres_ext.h:36
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
#define ereport(elevel,...)
Definition: elog.h:155
int allowedModes
Definition: execnodes.h:304
void PQclear(PGresult *res)
Definition: fe-exec.c:676
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:121
uint32 mapping_hashvalue
Definition: connection.c:64
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
bool in_error_recursion_trouble(void)
Definition: elog.c:291
#define PG_RETURN_VOID()
Definition: fmgr.h:349
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:120
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1002
SetFunctionReturnMode returnMode
Definition: execnodes.h:306
#define PG_CATCH()
Definition: elog.h:319
PGconn * conn
Definition: connection.c:54
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:675
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2713
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:426
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1157
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3527
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1722
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1151
static bool xact_got_connection
Definition: connection.c:77
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:624
static int list_length(const List *l)
Definition: pg_list.h:149
Oid serverid
Definition: foreign.h:49
#define PG_RE_THROW()
Definition: elog.h:350
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:232
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
Tuplestorestate * setResult
Definition: execnodes.h:309
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections)
static Datum values[MAXATTR]
Definition: bootstrap.c:165
char * text_to_cstring(const text *t)
Definition: varlena.c:222
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6681
ExprContext * econtext
Definition: execnodes.h:302
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4403
void ReleaseExternalFD(void)
Definition: fd.c:1129
static char * user
Definition: pg_regress.c:95
TupleDesc setDesc
Definition: execnodes.h:310
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:275
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define IsolationIsSerializable()
Definition: xact.h:52
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:228
int i
#define errcontext
Definition: elog.h:199
#define CStringGetTextDatum(s)
Definition: builtins.h:82
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1907
void * arg
struct Latch * MyLatch
Definition: globals.c:55
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
char * defname
Definition: parsenodes.h:733
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:638
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:984
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6579
#define PG_TRY()
Definition: elog.h:309
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:560
Oid ConnCacheKey
Definition: connection.c:49
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:651
#define snprintf
Definition: port.h:215
List * options
Definition: foreign.h:42
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6650
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1746
Oid serverid
Definition: foreign.h:36
#define PG_END_TRY()
Definition: elog.h:334
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1691
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:306