PostgreSQL Source Code  git master
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
18 #include "commands/defrem.h"
19 #include "funcapi.h"
21 #include "mb/pg_wchar.h"
22 #include "miscadmin.h"
23 #include "pgstat.h"
24 #include "postgres_fdw.h"
25 #include "storage/fd.h"
26 #include "storage/latch.h"
27 #include "utils/builtins.h"
28 #include "utils/datetime.h"
29 #include "utils/hsearch.h"
30 #include "utils/inval.h"
31 #include "utils/memutils.h"
32 #include "utils/syscache.h"
33 
34 /*
35  * Connection cache hash table entry
36  *
37  * The lookup key in this hash table is the user mapping OID. We use just one
38  * connection per user mapping ID, which ensures that all the scans use the
39  * same snapshot during a query. Using the user mapping OID rather than
40  * the foreign server OID + user OID avoids creating multiple connections when
41  * the public user mapping applies to all user OIDs.
42  *
43  * The "conn" pointer can be NULL if we don't currently have a live connection.
44  * When we do have a connection, xact_depth tracks the current depth of
45  * transactions and subtransactions open on the remote side. We need to issue
46  * commands at the same nesting depth on the remote as we're executing at
47  * ourselves, so that rolling back a subtransaction will kill the right
48  * queries and not the wrong ones.
49  */
50 typedef Oid ConnCacheKey;
51 
52 typedef struct ConnCacheEntry
53 {
54  ConnCacheKey key; /* hash key (must be first) */
55  PGconn *conn; /* connection to foreign server, or NULL */
56  /* Remaining fields are invalid when conn is NULL: */
57  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
58  * one level of subxact open, etc */
59  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
60  bool have_error; /* have any subxacts aborted in this xact? */
61  bool changing_xact_state; /* xact state change in process */
62  bool parallel_commit; /* do we commit (sub)xacts in parallel? */
63  bool invalidated; /* true if reconnect is pending */
64  bool keep_connections; /* setting value of keep_connections
65  * server option */
66  Oid serverid; /* foreign server OID used to get server name */
67  uint32 server_hashvalue; /* hash value of foreign server OID */
68  uint32 mapping_hashvalue; /* hash value of user mapping OID */
69  PgFdwConnState state; /* extra per-connection state */
71 
72 /*
73  * Connection cache (initialized on first use)
74  */
75 static HTAB *ConnectionHash = NULL;
76 
77 /* for assigning cursor numbers and prepared statement numbers */
78 static unsigned int cursor_number = 0;
79 static unsigned int prep_stmt_number = 0;
80 
81 /* tracks whether any work is needed in callback functions */
82 static bool xact_got_connection = false;
83 
84 /*
85  * SQL functions
86  */
90 
91 /* prototypes of private functions */
94 static void disconnect_pg_server(ConnCacheEntry *entry);
95 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
97 static void do_sql_command_begin(PGconn *conn, const char *sql);
98 static void do_sql_command_end(PGconn *conn, const char *sql,
99  bool consume_input);
100 static void begin_remote_xact(ConnCacheEntry *entry);
101 static void pgfdw_xact_callback(XactEvent event, void *arg);
102 static void pgfdw_subxact_callback(SubXactEvent event,
103  SubTransactionId mySubid,
104  SubTransactionId parentSubid,
105  void *arg);
106 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
108 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
109 static bool pgfdw_cancel_query(PGconn *conn);
110 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
111  bool ignore_errors);
112 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
113  PGresult **result, bool *timed_out);
114 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
115 static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
116 static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
117  int curlevel);
119 static bool disconnect_cached_connections(Oid serverid);
120 
121 /*
122  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
123  * server with the user's authorization. A new connection is established
124  * if we don't already have a suitable one, and a transaction is opened at
125  * the right subtransaction nesting depth if we didn't do that already.
126  *
127  * will_prep_stmt must be true if caller intends to create any prepared
128  * statements. Since those don't go away automatically at transaction end
129  * (not even on error), we need this flag to cue manual cleanup.
130  *
131  * If state is not NULL, *state receives the per-connection state associated
132  * with the PGconn.
133  */
134 PGconn *
136 {
137  bool found;
138  bool retry = false;
139  ConnCacheEntry *entry;
142 
143  /* First time through, initialize connection cache hashtable */
144  if (ConnectionHash == NULL)
145  {
146  HASHCTL ctl;
147 
148  ctl.keysize = sizeof(ConnCacheKey);
149  ctl.entrysize = sizeof(ConnCacheEntry);
150  ConnectionHash = hash_create("postgres_fdw connections", 8,
151  &ctl,
153 
154  /*
155  * Register some callback functions that manage connection cleanup.
156  * This should be done just once in each backend.
157  */
164  }
165 
166  /* Set flag that we did GetConnection during the current transaction */
167  xact_got_connection = true;
168 
169  /* Create hash key for the entry. Assume no pad bytes in key struct */
170  key = user->umid;
171 
172  /*
173  * Find or create cached entry for requested connection.
174  */
175  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
176  if (!found)
177  {
178  /*
179  * We need only clear "conn" here; remaining fields will be filled
180  * later when "conn" is set.
181  */
182  entry->conn = NULL;
183  }
184 
185  /* Reject further use of connections which failed abort cleanup. */
187 
188  /*
189  * If the connection needs to be remade due to invalidation, disconnect as
190  * soon as we're out of all transactions.
191  */
192  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
193  {
194  elog(DEBUG3, "closing connection %p for option changes to take effect",
195  entry->conn);
196  disconnect_pg_server(entry);
197  }
198 
199  /*
200  * If cache entry doesn't have a connection, we have to establish a new
201  * connection. (If connect_pg_server throws an error, the cache entry
202  * will remain in a valid empty state, ie conn == NULL.)
203  */
204  if (entry->conn == NULL)
205  make_new_connection(entry, user);
206 
207  /*
208  * We check the health of the cached connection here when using it. In
209  * cases where we're out of all transactions, if a broken connection is
210  * detected, we try to reestablish a new connection later.
211  */
212  PG_TRY();
213  {
214  /* Process a pending asynchronous request if any. */
215  if (entry->state.pendingAreq)
217  /* Start a new transaction or subtransaction if needed. */
218  begin_remote_xact(entry);
219  }
220  PG_CATCH();
221  {
223  ErrorData *errdata = CopyErrorData();
224 
225  /*
226  * Determine whether to try to reestablish the connection.
227  *
228  * After a broken connection is detected in libpq, any error other
229  * than connection failure (e.g., out-of-memory) can be thrown
230  * somewhere between return from libpq and the expected ereport() call
231  * in pgfdw_report_error(). In this case, since PQstatus() indicates
232  * CONNECTION_BAD, checking only PQstatus() causes the false detection
233  * of connection failure. To avoid this, we also verify that the
234  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
235  * checking only the sqlstate can cause another false detection
236  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
237  * for any libpq-originated error condition.
238  */
239  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
240  PQstatus(entry->conn) != CONNECTION_BAD ||
241  entry->xact_depth > 0)
242  {
243  MemoryContextSwitchTo(ecxt);
244  PG_RE_THROW();
245  }
246 
247  /* Clean up the error state */
248  FlushErrorState();
249  FreeErrorData(errdata);
250  errdata = NULL;
251 
252  retry = true;
253  }
254  PG_END_TRY();
255 
256  /*
257  * If a broken connection is detected, disconnect it, reestablish a new
258  * connection and retry a new remote transaction. If connection failure is
259  * reported again, we give up getting a connection.
260  */
261  if (retry)
262  {
263  Assert(entry->xact_depth == 0);
264 
265  ereport(DEBUG3,
266  (errmsg_internal("could not start remote transaction on connection %p",
267  entry->conn)),
268  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
269 
270  elog(DEBUG3, "closing connection %p to reestablish a new one",
271  entry->conn);
272  disconnect_pg_server(entry);
273 
274  make_new_connection(entry, user);
275 
276  begin_remote_xact(entry);
277  }
278 
279  /* Remember if caller will prepare statements */
280  entry->have_prep_stmt |= will_prep_stmt;
281 
282  /* If caller needs access to the per-connection state, return it. */
283  if (state)
284  *state = &entry->state;
285 
286  return entry->conn;
287 }
288 
289 /*
290  * Reset all transient state fields in the cached connection entry and
291  * establish new connection to the remote server.
292  */
293 static void
295 {
296  ForeignServer *server = GetForeignServer(user->serverid);
297  ListCell *lc;
298 
299  Assert(entry->conn == NULL);
300 
301  /* Reset all transient state fields, to be sure all are clean */
302  entry->xact_depth = 0;
303  entry->have_prep_stmt = false;
304  entry->have_error = false;
305  entry->changing_xact_state = false;
306  entry->invalidated = false;
307  entry->serverid = server->serverid;
308  entry->server_hashvalue =
310  ObjectIdGetDatum(server->serverid));
311  entry->mapping_hashvalue =
313  ObjectIdGetDatum(user->umid));
314  memset(&entry->state, 0, sizeof(entry->state));
315 
316  /*
317  * Determine whether to keep the connection that we're about to make here
318  * open even after the transaction using it ends, so that the subsequent
319  * transactions can re-use it.
320  *
321  * By default, all the connections to any foreign servers are kept open.
322  *
323  * Also determine whether to commit (sub)transactions opened on the remote
324  * server in parallel at (sub)transaction end, which is disabled by
325  * default.
326  *
327  * Note: it's enough to determine these only when making a new connection
328  * because if these settings for it are changed, it will be closed and
329  * re-made later.
330  */
331  entry->keep_connections = true;
332  entry->parallel_commit = false;
333  foreach(lc, server->options)
334  {
335  DefElem *def = (DefElem *) lfirst(lc);
336 
337  if (strcmp(def->defname, "keep_connections") == 0)
338  entry->keep_connections = defGetBoolean(def);
339  else if (strcmp(def->defname, "parallel_commit") == 0)
340  entry->parallel_commit = defGetBoolean(def);
341  }
342 
343  /* Now try to make the connection */
344  entry->conn = connect_pg_server(server, user);
345 
346  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
347  entry->conn, server->servername, user->umid, user->userid);
348 }
349 
350 /*
351  * Connect to remote server using specified server and user mapping properties.
352  */
353 static PGconn *
355 {
356  PGconn *volatile conn = NULL;
357 
358  /*
359  * Use PG_TRY block to ensure closing connection on error.
360  */
361  PG_TRY();
362  {
363  const char **keywords;
364  const char **values;
365  char *appname = NULL;
366  int n;
367 
368  /*
369  * Construct connection params from generic options of ForeignServer
370  * and UserMapping. (Some of them might not be libpq options, in
371  * which case we'll just waste a few array slots.) Add 4 extra slots
372  * for application_name, fallback_application_name, client_encoding,
373  * end marker.
374  */
375  n = list_length(server->options) + list_length(user->options) + 4;
376  keywords = (const char **) palloc(n * sizeof(char *));
377  values = (const char **) palloc(n * sizeof(char *));
378 
379  n = 0;
380  n += ExtractConnectionOptions(server->options,
381  keywords + n, values + n);
382  n += ExtractConnectionOptions(user->options,
383  keywords + n, values + n);
384 
385  /*
386  * Use pgfdw_application_name as application_name if set.
387  *
388  * PQconnectdbParams() processes the parameter arrays from start to
389  * end. If any key word is repeated, the last value is used. Therefore
390  * note that pgfdw_application_name must be added to the arrays after
391  * options of ForeignServer are, so that it can override
392  * application_name set in ForeignServer.
393  */
395  {
396  keywords[n] = "application_name";
398  n++;
399  }
400 
401  /*
402  * Search the parameter arrays to find application_name setting, and
403  * replace escape sequences in it with status information if found.
404  * The arrays are searched backwards because the last value is used if
405  * application_name is repeatedly set.
406  */
407  for (int i = n - 1; i >= 0; i--)
408  {
409  if (strcmp(keywords[i], "application_name") == 0 &&
410  *(values[i]) != '\0')
411  {
412  /*
413  * Use this application_name setting if it's not empty string
414  * even after any escape sequences in it are replaced.
415  */
416  appname = process_pgfdw_appname(values[i]);
417  if (appname[0] != '\0')
418  {
419  values[i] = appname;
420  break;
421  }
422 
423  /*
424  * This empty application_name is not used, so we set
425  * values[i] to NULL and keep searching the array to find the
426  * next one.
427  */
428  values[i] = NULL;
429  pfree(appname);
430  appname = NULL;
431  }
432  }
433 
434  /* Use "postgres_fdw" as fallback_application_name */
435  keywords[n] = "fallback_application_name";
436  values[n] = "postgres_fdw";
437  n++;
438 
439  /* Set client_encoding so that libpq can convert encoding properly. */
440  keywords[n] = "client_encoding";
442  n++;
443 
444  keywords[n] = values[n] = NULL;
445 
446  /* verify the set of connection parameters */
447  check_conn_params(keywords, values, user);
448 
449  /* OK to make connection */
450  conn = libpqsrv_connect_params(keywords, values,
451  false, /* expand_dbname */
453 
454  if (!conn || PQstatus(conn) != CONNECTION_OK)
455  ereport(ERROR,
456  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
457  errmsg("could not connect to server \"%s\"",
458  server->servername),
460 
461  /*
462  * Check that non-superuser has used password to establish connection;
463  * otherwise, he's piggybacking on the postgres server's user
464  * identity. See also dblink_security_check() in contrib/dblink and
465  * check_conn_params.
466  */
469  ereport(ERROR,
470  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
471  errmsg("password is required"),
472  errdetail("Non-superuser cannot connect if the server does not request a password."),
473  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
474 
475  /* Prepare new session for use */
477 
478  if (appname != NULL)
479  pfree(appname);
480  pfree(keywords);
481  pfree(values);
482  }
483  PG_CATCH();
484  {
486  PG_RE_THROW();
487  }
488  PG_END_TRY();
489 
490  return conn;
491 }
492 
493 /*
494  * Disconnect any open connection for a connection cache entry.
495  */
496 static void
498 {
499  if (entry->conn != NULL)
500  {
501  libpqsrv_disconnect(entry->conn);
502  entry->conn = NULL;
503  }
504 }
505 
506 /*
507  * Return true if the password_required is defined and false for this user
508  * mapping, otherwise false. The mapping has been pre-validated.
509  */
510 static bool
512 {
513  ListCell *cell;
514 
515  foreach(cell, user->options)
516  {
517  DefElem *def = (DefElem *) lfirst(cell);
518 
519  if (strcmp(def->defname, "password_required") == 0)
520  return defGetBoolean(def);
521  }
522 
523  return true;
524 }
525 
526 /*
527  * For non-superusers, insist that the connstr specify a password. This
528  * prevents a password from being picked up from .pgpass, a service file, the
529  * environment, etc. We don't want the postgres user's passwords,
530  * certificates, etc to be accessible to non-superusers. (See also
531  * dblink_connstr_check in contrib/dblink.)
532  */
533 static void
534 check_conn_params(const char **keywords, const char **values, UserMapping *user)
535 {
536  int i;
537 
538  /* no check required if superuser */
539  if (superuser_arg(user->userid))
540  return;
541 
542  /* ok if params contain a non-empty password */
543  for (i = 0; keywords[i] != NULL; i++)
544  {
545  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
546  return;
547  }
548 
549  /* ok if the superuser explicitly said so at user mapping creation time */
551  return;
552 
553  ereport(ERROR,
554  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
555  errmsg("password is required"),
556  errdetail("Non-superusers must provide a password in the user mapping.")));
557 }
558 
559 /*
560  * Issue SET commands to make sure remote session is configured properly.
561  *
562  * We do this just once at connection, assuming nothing will change the
563  * values later. Since we'll never send volatile function calls to the
564  * remote, there shouldn't be any way to break this assumption from our end.
565  * It's possible to think of ways to break it at the remote end, eg making
566  * a foreign table point to a view that includes a set_config call ---
567  * but once you admit the possibility of a malicious view definition,
568  * there are any number of ways to break things.
569  */
570 static void
572 {
573  int remoteversion = PQserverVersion(conn);
574 
575  /* Force the search path to contain only pg_catalog (see deparse.c) */
576  do_sql_command(conn, "SET search_path = pg_catalog");
577 
578  /*
579  * Set remote timezone; this is basically just cosmetic, since all
580  * transmitted and returned timestamptzs should specify a zone explicitly
581  * anyway. However it makes the regression test outputs more predictable.
582  *
583  * We don't risk setting remote zone equal to ours, since the remote
584  * server might use a different timezone database. Instead, use UTC
585  * (quoted, because very old servers are picky about case).
586  */
587  do_sql_command(conn, "SET timezone = 'UTC'");
588 
589  /*
590  * Set values needed to ensure unambiguous data output from remote. (This
591  * logic should match what pg_dump does. See also set_transmission_modes
592  * in postgres_fdw.c.)
593  */
594  do_sql_command(conn, "SET datestyle = ISO");
595  if (remoteversion >= 80400)
596  do_sql_command(conn, "SET intervalstyle = postgres");
597  if (remoteversion >= 90000)
598  do_sql_command(conn, "SET extra_float_digits = 3");
599  else
600  do_sql_command(conn, "SET extra_float_digits = 2");
601 }
602 
603 /*
604  * Convenience subroutine to issue a non-data-returning SQL command to remote
605  */
606 void
607 do_sql_command(PGconn *conn, const char *sql)
608 {
610  do_sql_command_end(conn, sql, false);
611 }
612 
613 static void
614 do_sql_command_begin(PGconn *conn, const char *sql)
615 {
616  if (!PQsendQuery(conn, sql))
617  pgfdw_report_error(ERROR, NULL, conn, false, sql);
618 }
619 
620 static void
621 do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
622 {
623  PGresult *res;
624 
625  /*
626  * If requested, consume whatever data is available from the socket. (Note
627  * that if all data is available, this allows pgfdw_get_result to call
628  * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
629  * would be large compared to the overhead of PQconsumeInput.)
630  */
631  if (consume_input && !PQconsumeInput(conn))
632  pgfdw_report_error(ERROR, NULL, conn, false, sql);
633  res = pgfdw_get_result(conn, sql);
635  pgfdw_report_error(ERROR, res, conn, true, sql);
636  PQclear(res);
637 }
638 
639 /*
640  * Start remote transaction or subtransaction, if needed.
641  *
642  * Note that we always use at least REPEATABLE READ in the remote session.
643  * This is so that, if a query initiates multiple scans of the same or
644  * different foreign tables, we will get snapshot-consistent results from
645  * those scans. A disadvantage is that we can't provide sane emulation of
646  * READ COMMITTED behavior --- it would be nice if we had some other way to
647  * control which remote queries share a snapshot.
648  */
649 static void
651 {
652  int curlevel = GetCurrentTransactionNestLevel();
653 
654  /* Start main transaction if we haven't yet */
655  if (entry->xact_depth <= 0)
656  {
657  const char *sql;
658 
659  elog(DEBUG3, "starting remote transaction on connection %p",
660  entry->conn);
661 
663  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
664  else
665  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
666  entry->changing_xact_state = true;
667  do_sql_command(entry->conn, sql);
668  entry->xact_depth = 1;
669  entry->changing_xact_state = false;
670  }
671 
672  /*
673  * If we're in a subtransaction, stack up savepoints to match our level.
674  * This ensures we can rollback just the desired effects when a
675  * subtransaction aborts.
676  */
677  while (entry->xact_depth < curlevel)
678  {
679  char sql[64];
680 
681  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
682  entry->changing_xact_state = true;
683  do_sql_command(entry->conn, sql);
684  entry->xact_depth++;
685  entry->changing_xact_state = false;
686  }
687 }
688 
689 /*
690  * Release connection reference count created by calling GetConnection.
691  */
692 void
694 {
695  /*
696  * Currently, we don't actually track connection references because all
697  * cleanup is managed on a transaction or subtransaction basis instead. So
698  * there's nothing to do here.
699  */
700 }
701 
702 /*
703  * Assign a "unique" number for a cursor.
704  *
705  * These really only need to be unique per connection within a transaction.
706  * For the moment we ignore the per-connection point and assign them across
707  * all connections in the transaction, but we ask for the connection to be
708  * supplied in case we want to refine that.
709  *
710  * Note that even if wraparound happens in a very long transaction, actual
711  * collisions are highly improbable; just be sure to use %u not %d to print.
712  */
713 unsigned int
715 {
716  return ++cursor_number;
717 }
718 
719 /*
720  * Assign a "unique" number for a prepared statement.
721  *
722  * This works much like GetCursorNumber, except that we never reset the counter
723  * within a session. That's because we can't be 100% sure we've gotten rid
724  * of all prepared statements on all connections, and it's not really worth
725  * increasing the risk of prepared-statement name collisions by resetting.
726  */
727 unsigned int
729 {
730  return ++prep_stmt_number;
731 }
732 
733 /*
734  * Submit a query and wait for the result.
735  *
736  * This function is interruptible by signals.
737  *
738  * Caller is responsible for the error handling on the result.
739  */
740 PGresult *
742 {
743  /* First, process a pending asynchronous request, if any. */
744  if (state && state->pendingAreq)
745  process_pending_request(state->pendingAreq);
746 
747  /*
748  * Submit a query. Since we don't use non-blocking mode, this also can
749  * block. But its risk is relatively small, so we ignore that for now.
750  */
751  if (!PQsendQuery(conn, query))
752  pgfdw_report_error(ERROR, NULL, conn, false, query);
753 
754  /* Wait for the result. */
755  return pgfdw_get_result(conn, query);
756 }
757 
758 /*
759  * Wait for the result from a prior asynchronous execution function call.
760  *
761  * This function offers quick responsiveness by checking for any interruptions.
762  *
763  * This function emulates PQexec()'s behavior of returning the last result
764  * when there are many.
765  *
766  * Caller is responsible for the error handling on the result.
767  */
768 PGresult *
769 pgfdw_get_result(PGconn *conn, const char *query)
770 {
771  PGresult *volatile last_res = NULL;
772 
773  /* In what follows, do not leak any PGresults on an error. */
774  PG_TRY();
775  {
776  for (;;)
777  {
778  PGresult *res;
779 
780  while (PQisBusy(conn))
781  {
782  int wc;
783 
784  /* Sleep until there's something to do */
788  PQsocket(conn),
789  -1L, PG_WAIT_EXTENSION);
791 
793 
794  /* Data available in socket? */
795  if (wc & WL_SOCKET_READABLE)
796  {
797  if (!PQconsumeInput(conn))
798  pgfdw_report_error(ERROR, NULL, conn, false, query);
799  }
800  }
801 
802  res = PQgetResult(conn);
803  if (res == NULL)
804  break; /* query is complete */
805 
806  PQclear(last_res);
807  last_res = res;
808  }
809  }
810  PG_CATCH();
811  {
812  PQclear(last_res);
813  PG_RE_THROW();
814  }
815  PG_END_TRY();
816 
817  return last_res;
818 }
819 
820 /*
821  * Report an error we got from the remote server.
822  *
823  * elevel: error level to use (typically ERROR, but might be less)
824  * res: PGresult containing the error
825  * conn: connection we did the query on
826  * clear: if true, PQclear the result (otherwise caller will handle it)
827  * sql: NULL, or text of remote command we tried to execute
828  *
829  * Note: callers that choose not to throw ERROR for a remote error are
830  * responsible for making sure that the associated ConnCacheEntry gets
831  * marked with have_error = true.
832  */
833 void
835  bool clear, const char *sql)
836 {
837  /* If requested, PGresult must be released before leaving this function. */
838  PG_TRY();
839  {
840  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
841  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
842  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
843  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
844  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
845  int sqlstate;
846 
847  if (diag_sqlstate)
848  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
849  diag_sqlstate[1],
850  diag_sqlstate[2],
851  diag_sqlstate[3],
852  diag_sqlstate[4]);
853  else
854  sqlstate = ERRCODE_CONNECTION_FAILURE;
855 
856  /*
857  * If we don't get a message from the PGresult, try the PGconn. This
858  * is needed because for connection-level failures, PQexec may just
859  * return NULL, not a PGresult at all.
860  */
861  if (message_primary == NULL)
862  message_primary = pchomp(PQerrorMessage(conn));
863 
864  ereport(elevel,
865  (errcode(sqlstate),
866  (message_primary != NULL && message_primary[0] != '\0') ?
867  errmsg_internal("%s", message_primary) :
868  errmsg("could not obtain message string for remote error"),
869  message_detail ? errdetail_internal("%s", message_detail) : 0,
870  message_hint ? errhint("%s", message_hint) : 0,
871  message_context ? errcontext("%s", message_context) : 0,
872  sql ? errcontext("remote SQL command: %s", sql) : 0));
873  }
874  PG_FINALLY();
875  {
876  if (clear)
877  PQclear(res);
878  }
879  PG_END_TRY();
880 }
881 
882 /*
883  * pgfdw_xact_callback --- cleanup at main-transaction end.
884  *
885  * This runs just late enough that it must not enter user-defined code
886  * locally. (Entering such code on the remote side is fine. Its remote
887  * COMMIT TRANSACTION may run deferred triggers.)
888  */
889 static void
891 {
892  HASH_SEQ_STATUS scan;
893  ConnCacheEntry *entry;
894  List *pending_entries = NIL;
895 
896  /* Quick exit if no connections were touched in this transaction. */
897  if (!xact_got_connection)
898  return;
899 
900  /*
901  * Scan all connection cache entries to find open remote transactions, and
902  * close them.
903  */
905  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
906  {
907  PGresult *res;
908 
909  /* Ignore cache entry if no open connection right now */
910  if (entry->conn == NULL)
911  continue;
912 
913  /* If it has an open remote transaction, try to close it */
914  if (entry->xact_depth > 0)
915  {
916  elog(DEBUG3, "closing remote transaction on connection %p",
917  entry->conn);
918 
919  switch (event)
920  {
923 
924  /*
925  * If abort cleanup previously failed for this connection,
926  * we can't issue any more commands against it.
927  */
929 
930  /* Commit all remote transactions during pre-commit */
931  entry->changing_xact_state = true;
932  if (entry->parallel_commit)
933  {
934  do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
935  pending_entries = lappend(pending_entries, entry);
936  continue;
937  }
938  do_sql_command(entry->conn, "COMMIT TRANSACTION");
939  entry->changing_xact_state = false;
940 
941  /*
942  * If there were any errors in subtransactions, and we
943  * made prepared statements, do a DEALLOCATE ALL to make
944  * sure we get rid of all prepared statements. This is
945  * annoying and not terribly bulletproof, but it's
946  * probably not worth trying harder.
947  *
948  * DEALLOCATE ALL only exists in 8.3 and later, so this
949  * constrains how old a server postgres_fdw can
950  * communicate with. We intentionally ignore errors in
951  * the DEALLOCATE, so that we can hobble along to some
952  * extent with older servers (leaking prepared statements
953  * as we go; but we don't really support update operations
954  * pre-8.3 anyway).
955  */
956  if (entry->have_prep_stmt && entry->have_error)
957  {
958  res = PQexec(entry->conn, "DEALLOCATE ALL");
959  PQclear(res);
960  }
961  entry->have_prep_stmt = false;
962  entry->have_error = false;
963  break;
965 
966  /*
967  * We disallow any remote transactions, since it's not
968  * very reasonable to hold them open until the prepared
969  * transaction is committed. For the moment, throw error
970  * unconditionally; later we might allow read-only cases.
971  * Note that the error will cause us to come right back
972  * here with event == XACT_EVENT_ABORT, so we'll clean up
973  * the connection state at that point.
974  */
975  ereport(ERROR,
976  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
977  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
978  break;
980  case XACT_EVENT_COMMIT:
981  case XACT_EVENT_PREPARE:
982  /* Pre-commit should have closed the open transaction */
983  elog(ERROR, "missed cleaning up connection during pre-commit");
984  break;
986  case XACT_EVENT_ABORT:
987  /* Rollback all remote transactions during abort */
988  pgfdw_abort_cleanup(entry, true);
989  break;
990  }
991  }
992 
993  /* Reset state to show we're out of a transaction */
994  pgfdw_reset_xact_state(entry, true);
995  }
996 
997  /* If there are any pending connections, finish cleaning them up */
998  if (pending_entries)
999  {
1001  event == XACT_EVENT_PRE_COMMIT);
1002  pgfdw_finish_pre_commit_cleanup(pending_entries);
1003  }
1004 
1005  /*
1006  * Regardless of the event type, we can now mark ourselves as out of the
1007  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1008  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1009  */
1010  xact_got_connection = false;
1011 
1012  /* Also reset cursor numbering for next transaction */
1013  cursor_number = 0;
1014 }
1015 
1016 /*
1017  * pgfdw_subxact_callback --- cleanup at subtransaction end.
1018  */
1019 static void
1021  SubTransactionId parentSubid, void *arg)
1022 {
1023  HASH_SEQ_STATUS scan;
1024  ConnCacheEntry *entry;
1025  int curlevel;
1026  List *pending_entries = NIL;
1027 
1028  /* Nothing to do at subxact start, nor after commit. */
1029  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1030  event == SUBXACT_EVENT_ABORT_SUB))
1031  return;
1032 
1033  /* Quick exit if no connections were touched in this transaction. */
1034  if (!xact_got_connection)
1035  return;
1036 
1037  /*
1038  * Scan all connection cache entries to find open remote subtransactions
1039  * of the current level, and close them.
1040  */
1041  curlevel = GetCurrentTransactionNestLevel();
1042  hash_seq_init(&scan, ConnectionHash);
1043  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1044  {
1045  char sql[100];
1046 
1047  /*
1048  * We only care about connections with open remote subtransactions of
1049  * the current level.
1050  */
1051  if (entry->conn == NULL || entry->xact_depth < curlevel)
1052  continue;
1053 
1054  if (entry->xact_depth > curlevel)
1055  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1056  entry->xact_depth);
1057 
1058  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1059  {
1060  /*
1061  * If abort cleanup previously failed for this connection, we
1062  * can't issue any more commands against it.
1063  */
1065 
1066  /* Commit all remote subtransactions during pre-commit */
1067  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1068  entry->changing_xact_state = true;
1069  if (entry->parallel_commit)
1070  {
1071  do_sql_command_begin(entry->conn, sql);
1072  pending_entries = lappend(pending_entries, entry);
1073  continue;
1074  }
1075  do_sql_command(entry->conn, sql);
1076  entry->changing_xact_state = false;
1077  }
1078  else
1079  {
1080  /* Rollback all remote subtransactions during abort */
1081  pgfdw_abort_cleanup(entry, false);
1082  }
1083 
1084  /* OK, we're outta that level of subtransaction */
1085  pgfdw_reset_xact_state(entry, false);
1086  }
1087 
1088  /* If there are any pending connections, finish cleaning them up */
1089  if (pending_entries)
1090  {
1092  pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1093  }
1094 }
1095 
1096 /*
1097  * Connection invalidation callback function
1098  *
1099  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1100  * close connections depending on that entry immediately if current transaction
1101  * has not used those connections yet. Otherwise, mark those connections as
1102  * invalid and then make pgfdw_xact_callback() close them at the end of current
1103  * transaction, since they cannot be closed in the midst of the transaction
1104  * using them. Closed connections will be remade at the next opportunity if
1105  * necessary.
1106  *
1107  * Although most cache invalidation callbacks blow away all the related stuff
1108  * regardless of the given hashvalue, connections are expensive enough that
1109  * it's worth trying to avoid that.
1110  *
1111  * NB: We could avoid unnecessary disconnection more strictly by examining
1112  * individual option values, but it seems too much effort for the gain.
1113  */
1114 static void
1115 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1116 {
1117  HASH_SEQ_STATUS scan;
1118  ConnCacheEntry *entry;
1119 
1120  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1121 
1122  /* ConnectionHash must exist already, if we're registered */
1123  hash_seq_init(&scan, ConnectionHash);
1124  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1125  {
1126  /* Ignore invalid entries */
1127  if (entry->conn == NULL)
1128  continue;
1129 
1130  /* hashvalue == 0 means a cache reset, must clear all state */
1131  if (hashvalue == 0 ||
1132  (cacheid == FOREIGNSERVEROID &&
1133  entry->server_hashvalue == hashvalue) ||
1134  (cacheid == USERMAPPINGOID &&
1135  entry->mapping_hashvalue == hashvalue))
1136  {
1137  /*
1138  * Close the connection immediately if it's not used yet in this
1139  * transaction. Otherwise mark it as invalid so that
1140  * pgfdw_xact_callback() can close it at the end of this
1141  * transaction.
1142  */
1143  if (entry->xact_depth == 0)
1144  {
1145  elog(DEBUG3, "discarding connection %p", entry->conn);
1146  disconnect_pg_server(entry);
1147  }
1148  else
1149  entry->invalidated = true;
1150  }
1151  }
1152 }
1153 
1154 /*
1155  * Raise an error if the given connection cache entry is marked as being
1156  * in the middle of an xact state change. This should be called at which no
1157  * such change is expected to be in progress; if one is found to be in
1158  * progress, it means that we aborted in the middle of a previous state change
1159  * and now don't know what the remote transaction state actually is.
1160  * Such connections can't safely be further used. Re-establishing the
1161  * connection would change the snapshot and roll back any writes already
1162  * performed, so that's not an option, either. Thus, we must abort.
1163  */
1164 static void
1166 {
1167  ForeignServer *server;
1168 
1169  /* nothing to do for inactive entries and entries of sane state */
1170  if (entry->conn == NULL || !entry->changing_xact_state)
1171  return;
1172 
1173  /* make sure this entry is inactive */
1174  disconnect_pg_server(entry);
1175 
1176  /* find server name to be shown in the message below */
1177  server = GetForeignServer(entry->serverid);
1178 
1179  ereport(ERROR,
1180  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1181  errmsg("connection to server \"%s\" was lost",
1182  server->servername)));
1183 }
1184 
1185 /*
1186  * Reset state to show we're out of a (sub)transaction.
1187  */
1188 static void
1190 {
1191  if (toplevel)
1192  {
1193  /* Reset state to show we're out of a transaction */
1194  entry->xact_depth = 0;
1195 
1196  /*
1197  * If the connection isn't in a good idle state, it is marked as
1198  * invalid or keep_connections option of its server is disabled, then
1199  * discard it to recover. Next GetConnection will open a new
1200  * connection.
1201  */
1202  if (PQstatus(entry->conn) != CONNECTION_OK ||
1203  PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1204  entry->changing_xact_state ||
1205  entry->invalidated ||
1206  !entry->keep_connections)
1207  {
1208  elog(DEBUG3, "discarding connection %p", entry->conn);
1209  disconnect_pg_server(entry);
1210  }
1211  }
1212  else
1213  {
1214  /* Reset state to show we're out of a subtransaction */
1215  entry->xact_depth--;
1216  }
1217 }
1218 
1219 /*
1220  * Cancel the currently-in-progress query (whose query text we do not have)
1221  * and ignore the result. Returns true if we successfully cancel the query
1222  * and discard any pending result, and false if not.
1223  *
1224  * It's not a huge problem if we throw an ERROR here, but if we get into error
1225  * recursion trouble, we'll end up slamming the connection shut, which will
1226  * necessitate failing the entire toplevel transaction even if subtransactions
1227  * were used. Try to use WARNING where we can.
1228  *
1229  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1230  * query text from the pendingAreq saved in the per-connection state, then
1231  * report the query using it.
1232  */
1233 static bool
1235 {
1236  PGcancel *cancel;
1237  char errbuf[256];
1238  PGresult *result = NULL;
1239  TimestampTz endtime;
1240  bool timed_out;
1241 
1242  /*
1243  * If it takes too long to cancel the query and discard the result, assume
1244  * the connection is dead.
1245  */
1247 
1248  /*
1249  * Issue cancel request. Unfortunately, there's no good way to limit the
1250  * amount of time that we might block inside PQgetCancel().
1251  */
1252  if ((cancel = PQgetCancel(conn)))
1253  {
1254  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1255  {
1256  ereport(WARNING,
1257  (errcode(ERRCODE_CONNECTION_FAILURE),
1258  errmsg("could not send cancel request: %s",
1259  errbuf)));
1260  PQfreeCancel(cancel);
1261  return false;
1262  }
1263  PQfreeCancel(cancel);
1264  }
1265 
1266  /* Get and discard the result of the query. */
1267  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1268  {
1269  if (timed_out)
1270  ereport(WARNING,
1271  (errmsg("could not get result of cancel request due to timeout")));
1272  else
1273  ereport(WARNING,
1274  (errcode(ERRCODE_CONNECTION_FAILURE),
1275  errmsg("could not get result of cancel request: %s",
1276  pchomp(PQerrorMessage(conn)))));
1277 
1278  return false;
1279  }
1280  PQclear(result);
1281 
1282  return true;
1283 }
1284 
1285 /*
1286  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1287  * result. If the query is executed without error, the return value is true.
1288  * If the query is executed successfully but returns an error, the return
1289  * value is true if and only if ignore_errors is set. If the query can't be
1290  * sent or times out, the return value is false.
1291  *
1292  * It's not a huge problem if we throw an ERROR here, but if we get into error
1293  * recursion trouble, we'll end up slamming the connection shut, which will
1294  * necessitate failing the entire toplevel transaction even if subtransactions
1295  * were used. Try to use WARNING where we can.
1296  */
1297 static bool
1298 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1299 {
1300  PGresult *result = NULL;
1301  TimestampTz endtime;
1302  bool timed_out;
1303 
1304  /*
1305  * If it takes too long to execute a cleanup query, assume the connection
1306  * is dead. It's fairly likely that this is why we aborted in the first
1307  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1308  * be too long.
1309  */
1311 
1312  /*
1313  * Submit a query. Since we don't use non-blocking mode, this also can
1314  * block. But its risk is relatively small, so we ignore that for now.
1315  */
1316  if (!PQsendQuery(conn, query))
1317  {
1318  pgfdw_report_error(WARNING, NULL, conn, false, query);
1319  return false;
1320  }
1321 
1322  /* Get the result of the query. */
1323  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1324  {
1325  if (timed_out)
1326  ereport(WARNING,
1327  (errmsg("could not get query result due to timeout"),
1328  query ? errcontext("remote SQL command: %s", query) : 0));
1329  else
1330  pgfdw_report_error(WARNING, NULL, conn, false, query);
1331 
1332  return false;
1333  }
1334 
1335  /* Issue a warning if not successful. */
1336  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1337  {
1338  pgfdw_report_error(WARNING, result, conn, true, query);
1339  return ignore_errors;
1340  }
1341  PQclear(result);
1342 
1343  return true;
1344 }
1345 
1346 /*
1347  * Get, during abort cleanup, the result of a query that is in progress. This
1348  * might be a query that is being interrupted by transaction abort, or it might
1349  * be a query that was initiated as part of transaction abort to get the remote
1350  * side back to the appropriate state.
1351  *
1352  * endtime is the time at which we should give up and assume the remote
1353  * side is dead. Returns true if the timeout expired or connection trouble
1354  * occurred, false otherwise. Sets *result except in case of a timeout.
1355  * Sets timed_out to true only when the timeout expired.
1356  */
1357 static bool
1359  bool *timed_out)
1360 {
1361  volatile bool failed = false;
1362  PGresult *volatile last_res = NULL;
1363 
1364  *timed_out = false;
1365 
1366  /* In what follows, do not leak any PGresults on an error. */
1367  PG_TRY();
1368  {
1369  for (;;)
1370  {
1371  PGresult *res;
1372 
1373  while (PQisBusy(conn))
1374  {
1375  int wc;
1377  long cur_timeout;
1378 
1379  /* If timeout has expired, give up, else get sleep time. */
1380  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1381  if (cur_timeout <= 0)
1382  {
1383  *timed_out = true;
1384  failed = true;
1385  goto exit;
1386  }
1387 
1388  /* Sleep until there's something to do */
1392  PQsocket(conn),
1393  cur_timeout, PG_WAIT_EXTENSION);
1395 
1397 
1398  /* Data available in socket? */
1399  if (wc & WL_SOCKET_READABLE)
1400  {
1401  if (!PQconsumeInput(conn))
1402  {
1403  /* connection trouble */
1404  failed = true;
1405  goto exit;
1406  }
1407  }
1408  }
1409 
1410  res = PQgetResult(conn);
1411  if (res == NULL)
1412  break; /* query is complete */
1413 
1414  PQclear(last_res);
1415  last_res = res;
1416  }
1417 exit: ;
1418  }
1419  PG_CATCH();
1420  {
1421  PQclear(last_res);
1422  PG_RE_THROW();
1423  }
1424  PG_END_TRY();
1425 
1426  if (failed)
1427  PQclear(last_res);
1428  else
1429  *result = last_res;
1430  return failed;
1431 }
1432 
1433 /*
1434  * Abort remote transaction or subtransaction.
1435  *
1436  * "toplevel" should be set to true if toplevel (main) transaction is
1437  * rollbacked, false otherwise.
1438  *
1439  * Set entry->changing_xact_state to false on success, true on failure.
1440  */
1441 static void
1442 pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1443 {
1444  char sql[100];
1445 
1446  /*
1447  * Don't try to clean up the connection if we're already in error
1448  * recursion trouble.
1449  */
1451  entry->changing_xact_state = true;
1452 
1453  /*
1454  * If connection is already unsalvageable, don't touch it further.
1455  */
1456  if (entry->changing_xact_state)
1457  return;
1458 
1459  /*
1460  * Mark this connection as in the process of changing transaction state.
1461  */
1462  entry->changing_xact_state = true;
1463 
1464  /* Assume we might have lost track of prepared statements */
1465  entry->have_error = true;
1466 
1467  /*
1468  * If a command has been submitted to the remote server by using an
1469  * asynchronous execution function, the command might not have yet
1470  * completed. Check to see if a command is still being processed by the
1471  * remote server, and if so, request cancellation of the command.
1472  */
1473  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1474  !pgfdw_cancel_query(entry->conn))
1475  return; /* Unable to cancel running query */
1476 
1477  if (toplevel)
1478  snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
1479  else
1480  snprintf(sql, sizeof(sql),
1481  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
1482  entry->xact_depth, entry->xact_depth);
1483  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1484  return; /* Unable to abort remote (sub)transaction */
1485 
1486  if (toplevel)
1487  {
1488  if (entry->have_prep_stmt && entry->have_error &&
1490  "DEALLOCATE ALL",
1491  true))
1492  return; /* Trouble clearing prepared statements */
1493 
1494  entry->have_prep_stmt = false;
1495  entry->have_error = false;
1496  }
1497 
1498  /*
1499  * If pendingAreq of the per-connection state is not NULL, it means that
1500  * an asynchronous fetch begun by fetch_more_data_begin() was not done
1501  * successfully and thus the per-connection state was not reset in
1502  * fetch_more_data(); in that case reset the per-connection state here.
1503  */
1504  if (entry->state.pendingAreq)
1505  memset(&entry->state, 0, sizeof(entry->state));
1506 
1507  /* Disarm changing_xact_state if it all worked */
1508  entry->changing_xact_state = false;
1509 }
1510 
1511 /*
1512  * Finish pre-commit cleanup of connections on each of which we've sent a
1513  * COMMIT command to the remote server.
1514  */
1515 static void
1517 {
1518  ConnCacheEntry *entry;
1519  List *pending_deallocs = NIL;
1520  ListCell *lc;
1521 
1522  Assert(pending_entries);
1523 
1524  /*
1525  * Get the result of the COMMIT command for each of the pending entries
1526  */
1527  foreach(lc, pending_entries)
1528  {
1529  entry = (ConnCacheEntry *) lfirst(lc);
1530 
1531  Assert(entry->changing_xact_state);
1532 
1533  /*
1534  * We might already have received the result on the socket, so pass
1535  * consume_input=true to try to consume it first
1536  */
1537  do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1538  entry->changing_xact_state = false;
1539 
1540  /* Do a DEALLOCATE ALL in parallel if needed */
1541  if (entry->have_prep_stmt && entry->have_error)
1542  {
1543  /* Ignore errors (see notes in pgfdw_xact_callback) */
1544  if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1545  {
1546  pending_deallocs = lappend(pending_deallocs, entry);
1547  continue;
1548  }
1549  }
1550  entry->have_prep_stmt = false;
1551  entry->have_error = false;
1552 
1553  pgfdw_reset_xact_state(entry, true);
1554  }
1555 
1556  /* No further work if no pending entries */
1557  if (!pending_deallocs)
1558  return;
1559 
1560  /*
1561  * Get the result of the DEALLOCATE command for each of the pending
1562  * entries
1563  */
1564  foreach(lc, pending_deallocs)
1565  {
1566  PGresult *res;
1567 
1568  entry = (ConnCacheEntry *) lfirst(lc);
1569 
1570  /* Ignore errors (see notes in pgfdw_xact_callback) */
1571  while ((res = PQgetResult(entry->conn)) != NULL)
1572  {
1573  PQclear(res);
1574  /* Stop if the connection is lost (else we'll loop infinitely) */
1575  if (PQstatus(entry->conn) == CONNECTION_BAD)
1576  break;
1577  }
1578  entry->have_prep_stmt = false;
1579  entry->have_error = false;
1580 
1581  pgfdw_reset_xact_state(entry, true);
1582  }
1583 }
1584 
1585 /*
1586  * Finish pre-subcommit cleanup of connections on each of which we've sent a
1587  * RELEASE command to the remote server.
1588  */
1589 static void
1590 pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1591 {
1592  ConnCacheEntry *entry;
1593  char sql[100];
1594  ListCell *lc;
1595 
1596  Assert(pending_entries);
1597 
1598  /*
1599  * Get the result of the RELEASE command for each of the pending entries
1600  */
1601  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1602  foreach(lc, pending_entries)
1603  {
1604  entry = (ConnCacheEntry *) lfirst(lc);
1605 
1606  Assert(entry->changing_xact_state);
1607 
1608  /*
1609  * We might already have received the result on the socket, so pass
1610  * consume_input=true to try to consume it first
1611  */
1612  do_sql_command_end(entry->conn, sql, true);
1613  entry->changing_xact_state = false;
1614 
1615  pgfdw_reset_xact_state(entry, false);
1616  }
1617 }
1618 
1619 /*
1620  * List active foreign server connections.
1621  *
1622  * This function takes no input parameter and returns setof record made of
1623  * following values:
1624  * - server_name - server name of active connection. In case the foreign server
1625  * is dropped but still the connection is active, then the server name will
1626  * be NULL in output.
1627  * - valid - true/false representing whether the connection is valid or not.
1628  * Note that the connections can get invalidated in pgfdw_inval_callback.
1629  *
1630  * No records are returned when there are no cached connections at all.
1631  */
1632 Datum
1634 {
1635 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1636  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1637  HASH_SEQ_STATUS scan;
1638  ConnCacheEntry *entry;
1639 
1640  InitMaterializedSRF(fcinfo, 0);
1641 
1642  /* If cache doesn't exist, we return no records */
1643  if (!ConnectionHash)
1644  PG_RETURN_VOID();
1645 
1646  hash_seq_init(&scan, ConnectionHash);
1647  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1648  {
1649  ForeignServer *server;
1651  bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
1652 
1653  /* We only look for open remote connections */
1654  if (!entry->conn)
1655  continue;
1656 
1657  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
1658 
1659  /*
1660  * The foreign server may have been dropped in current explicit
1661  * transaction. It is not possible to drop the server from another
1662  * session when the connection associated with it is in use in the
1663  * current transaction, if tried so, the drop query in another session
1664  * blocks until the current transaction finishes.
1665  *
1666  * Even though the server is dropped in the current transaction, the
1667  * cache can still have associated active connection entry, say we
1668  * call such connections dangling. Since we can not fetch the server
1669  * name from system catalogs for dangling connections, instead we show
1670  * NULL value for server name in output.
1671  *
1672  * We could have done better by storing the server name in the cache
1673  * entry instead of server oid so that it could be used in the output.
1674  * But the server name in each cache entry requires 64 bytes of
1675  * memory, which is huge, when there are many cached connections and
1676  * the use case i.e. dropping the foreign server within the explicit
1677  * current transaction seems rare. So, we chose to show NULL value for
1678  * server name in output.
1679  *
1680  * Such dangling connections get closed either in next use or at the
1681  * end of current explicit transaction in pgfdw_xact_callback.
1682  */
1683  if (!server)
1684  {
1685  /*
1686  * If the server has been dropped in the current explicit
1687  * transaction, then this entry would have been invalidated in
1688  * pgfdw_inval_callback at the end of drop server command. Note
1689  * that this connection would not have been closed in
1690  * pgfdw_inval_callback because it is still being used in the
1691  * current explicit transaction. So, assert that here.
1692  */
1693  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
1694 
1695  /* Show null, if no server name was found */
1696  nulls[0] = true;
1697  }
1698  else
1699  values[0] = CStringGetTextDatum(server->servername);
1700 
1701  values[1] = BoolGetDatum(!entry->invalidated);
1702 
1703  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1704  }
1705 
1706  PG_RETURN_VOID();
1707 }
1708 
1709 /*
1710  * Disconnect the specified cached connections.
1711  *
1712  * This function discards the open connections that are established by
1713  * postgres_fdw from the local session to the foreign server with
1714  * the given name. Note that there can be multiple connections to
1715  * the given server using different user mappings. If the connections
1716  * are used in the current local transaction, they are not disconnected
1717  * and warning messages are reported. This function returns true
1718  * if it disconnects at least one connection, otherwise false. If no
1719  * foreign server with the given name is found, an error is reported.
1720  */
1721 Datum
1723 {
1724  ForeignServer *server;
1725  char *servername;
1726 
1727  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
1728  server = GetForeignServerByName(servername, false);
1729 
1731 }
1732 
1733 /*
1734  * Disconnect all the cached connections.
1735  *
1736  * This function discards all the open connections that are established by
1737  * postgres_fdw from the local session to the foreign servers.
1738  * If the connections are used in the current local transaction, they are
1739  * not disconnected and warning messages are reported. This function
1740  * returns true if it disconnects at least one connection, otherwise false.
1741  */
1742 Datum
1744 {
1746 }
1747 
1748 /*
1749  * Workhorse to disconnect cached connections.
1750  *
1751  * This function scans all the connection cache entries and disconnects
1752  * the open connections whose foreign server OID matches with
1753  * the specified one. If InvalidOid is specified, it disconnects all
1754  * the cached connections.
1755  *
1756  * This function emits a warning for each connection that's used in
1757  * the current transaction and doesn't close it. It returns true if
1758  * it disconnects at least one connection, otherwise false.
1759  *
1760  * Note that this function disconnects even the connections that are
1761  * established by other users in the same local session using different
1762  * user mappings. This leads even non-superuser to be able to close
1763  * the connections established by superusers in the same local session.
1764  *
1765  * XXX As of now we don't see any security risk doing this. But we should
1766  * set some restrictions on that, for example, prevent non-superuser
1767  * from closing the connections established by superusers even
1768  * in the same session?
1769  */
1770 static bool
1772 {
1773  HASH_SEQ_STATUS scan;
1774  ConnCacheEntry *entry;
1775  bool all = !OidIsValid(serverid);
1776  bool result = false;
1777 
1778  /*
1779  * Connection cache hashtable has not been initialized yet in this
1780  * session, so return false.
1781  */
1782  if (!ConnectionHash)
1783  return false;
1784 
1785  hash_seq_init(&scan, ConnectionHash);
1786  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1787  {
1788  /* Ignore cache entry if no open connection right now. */
1789  if (!entry->conn)
1790  continue;
1791 
1792  if (all || entry->serverid == serverid)
1793  {
1794  /*
1795  * Emit a warning because the connection to close is used in the
1796  * current transaction and cannot be disconnected right now.
1797  */
1798  if (entry->xact_depth > 0)
1799  {
1800  ForeignServer *server;
1801 
1802  server = GetForeignServerExtended(entry->serverid,
1803  FSV_MISSING_OK);
1804 
1805  if (!server)
1806  {
1807  /*
1808  * If the foreign server was dropped while its connection
1809  * was used in the current transaction, the connection
1810  * must have been marked as invalid by
1811  * pgfdw_inval_callback at the end of DROP SERVER command.
1812  */
1813  Assert(entry->invalidated);
1814 
1815  ereport(WARNING,
1816  (errmsg("cannot close dropped server connection because it is still in use")));
1817  }
1818  else
1819  ereport(WARNING,
1820  (errmsg("cannot close connection for server \"%s\" because it is still in use",
1821  server->servername)));
1822  }
1823  else
1824  {
1825  elog(DEBUG3, "discarding connection %p", entry->conn);
1826  disconnect_pg_server(entry);
1827  result = true;
1828  }
1829  }
1830  }
1831 
1832  return result;
1833 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1703
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1582
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1546
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:94
unsigned int uint32
Definition: c.h:490
uint32 SubTransactionId
Definition: c.h:640
#define OidIsValid(objectId)
Definition: c.h:759
Oid ConnCacheKey
Definition: connection.c:50
static unsigned int prep_stmt_number
Definition: connection.c:79
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:714
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:511
Datum postgres_fdw_get_connections(PG_FUNCTION_ARGS)
Definition: connection.c:1633
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:607
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
Definition: connection.c:1590
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:497
void ReleaseConnection(PGconn *conn)
Definition: connection.c:693
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections)
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:769
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1189
static void configure_remote_session(PGconn *conn)
Definition: connection.c:571
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:834
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
Definition: connection.c:741
static bool xact_got_connection
Definition: connection.c:82
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
struct ConnCacheEntry ConnCacheEntry
Datum postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
Definition: connection.c:1743
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
Definition: connection.c:621
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
Definition: connection.c:1358
static HTAB * ConnectionHash
Definition: connection.c:75
static unsigned int cursor_number
Definition: connection.c:78
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:294
Datum postgres_fdw_disconnect(PG_FUNCTION_ARGS)
Definition: connection.c:1722
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:1020
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:354
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1298
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:135
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:728
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1165
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:534
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1115
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:890
static void do_sql_command_begin(PGconn *conn, const char *sql)
Definition: connection.c:614
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1442
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:650
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1234
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Definition: connection.c:1516
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:1771
char * process_pgfdw_appname(const char *appname)
Definition: option.c:483
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:406
char * pgfdw_application_name
Definition: option.c:52
int64 TimestampTz
Definition: timestamp.h:39
bool defGetBoolean(DefElem *def)
Definition: define.c:108
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1431
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1421
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1776
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1229
int errdetail(const char *fmt,...)
Definition: elog.c:1202
void FlushErrorState(void)
Definition: elog.c:1825
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
bool in_error_recursion_trouble(void)
Definition: elog.c:298
ErrorData * CopyErrorData(void)
Definition: elog.c:1720
#define PG_RE_THROW()
Definition: elog.h:411
#define errcontext
Definition: elog.h:196
#define DEBUG3
Definition: elog.h:28
#define PG_TRY(...)
Definition: elog.h:370
#define WARNING
Definition: elog.h:36
#define PG_END_TRY(...)
Definition: elog.h:395
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define PG_FINALLY(...)
Definition: elog.h:387
#define ereport(elevel,...)
Definition: elog.h:149
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7008
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4510
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6973
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7084
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7018
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4624
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6965
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4578
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7044
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3240
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2225
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1953
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1418
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2000
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3295
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2031
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:123
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:111
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:182
#define FSV_MISSING_OK
Definition: foreign.h:61
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
struct Latch * MyLatch
Definition: globals.c:58
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:540
void ResetLatch(Latch *latch)
Definition: latch.c:699
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
static void libpqsrv_disconnect(PGconn *conn)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
@ CONNECTION_BAD
Definition: libpq-fe.h:61
@ CONNECTION_OK
Definition: libpq-fe.h:60
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
@ PQTRANS_IDLE
Definition: libpq-fe.h:118
@ PQTRANS_ACTIVE
Definition: libpq-fe.h:119
Assert(fmt[strlen(fmt) - 1] !='\n')
exit(1)
List * lappend(List *list, void *datum)
Definition: list.c:338
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1274
char * pchomp(const char *in)
Definition: mcxt.c:1652
void pfree(void *pointer)
Definition: mcxt.c:1436
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void * palloc(Size size)
Definition: mcxt.c:1210
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
void * arg
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
static char * user
Definition: pg_regress.c:93
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:59
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:57
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:58
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:63
void process_pending_request(AsyncRequest *areq)
PGconn * conn
Definition: streamutil.c:54
PGconn * conn
Definition: connection.c:55
bool have_prep_stmt
Definition: connection.c:59
PgFdwConnState state
Definition: connection.c:69
bool invalidated
Definition: connection.c:63
ConnCacheKey key
Definition: connection.c:54
bool parallel_commit
Definition: connection.c:62
uint32 server_hashvalue
Definition: connection.c:67
uint32 mapping_hashvalue
Definition: connection.c:68
bool keep_connections
Definition: connection.c:64
bool changing_xact_state
Definition: connection.c:61
char * defname
Definition: parsenodes.h:810
int sqlerrcode
Definition: elog.h:438
List * options
Definition: foreign.h:42
char * servername
Definition: foreign.h:39
Oid serverid
Definition: foreign.h:36
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
Definition: dynahash.c:220
Definition: pg_list.h:54
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:134
TupleDesc setDesc
Definition: execnodes.h:334
Tuplestorestate * setResult
Definition: execnodes.h:333
Definition: regguts.h:318
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
@ FOREIGNSERVEROID
Definition: syscache.h:64
@ USERMAPPINGOID
Definition: syscache.h:115
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:206
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
char * text_to_cstring(const text *t)
Definition: varlena.c:215
#define PG_WAIT_EXTENSION
Definition: wait_event.h:23
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:914
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3657
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3717
SubXactEvent
Definition: xact.h:141
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition: xact.h:145
@ SUBXACT_EVENT_ABORT_SUB
Definition: xact.h:144
XactEvent
Definition: xact.h:127
@ XACT_EVENT_PRE_PREPARE
Definition: xact.h:135
@ XACT_EVENT_COMMIT
Definition: xact.h:128
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition: xact.h:134
@ XACT_EVENT_PARALLEL_COMMIT
Definition: xact.h:129
@ XACT_EVENT_ABORT
Definition: xact.h:130
@ XACT_EVENT_PRE_COMMIT
Definition: xact.h:133
@ XACT_EVENT_PARALLEL_ABORT
Definition: xact.h:131
@ XACT_EVENT_PREPARE
Definition: xact.h:132
#define IsolationIsSerializable()
Definition: xact.h:52