PostgreSQL Source Code  git master
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
18 #include "commands/defrem.h"
19 #include "funcapi.h"
20 #include "libpq/libpq-be.h"
22 #include "mb/pg_wchar.h"
23 #include "miscadmin.h"
24 #include "pgstat.h"
25 #include "postgres_fdw.h"
26 #include "storage/fd.h"
27 #include "storage/latch.h"
28 #include "utils/builtins.h"
29 #include "utils/datetime.h"
30 #include "utils/hsearch.h"
31 #include "utils/inval.h"
32 #include "utils/memutils.h"
33 #include "utils/syscache.h"
34 
35 /*
36  * Connection cache hash table entry
37  *
38  * The lookup key in this hash table is the user mapping OID. We use just one
39  * connection per user mapping ID, which ensures that all the scans use the
40  * same snapshot during a query. Using the user mapping OID rather than
41  * the foreign server OID + user OID avoids creating multiple connections when
42  * the public user mapping applies to all user OIDs.
43  *
44  * The "conn" pointer can be NULL if we don't currently have a live connection.
45  * When we do have a connection, xact_depth tracks the current depth of
46  * transactions and subtransactions open on the remote side. We need to issue
47  * commands at the same nesting depth on the remote as we're executing at
48  * ourselves, so that rolling back a subtransaction will kill the right
49  * queries and not the wrong ones.
50  */
51 typedef Oid ConnCacheKey;
52 
53 typedef struct ConnCacheEntry
54 {
55  ConnCacheKey key; /* hash key (must be first) */
56  PGconn *conn; /* connection to foreign server, or NULL */
57  /* Remaining fields are invalid when conn is NULL: */
58  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
59  * one level of subxact open, etc */
60  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
61  bool have_error; /* have any subxacts aborted in this xact? */
62  bool changing_xact_state; /* xact state change in process */
63  bool parallel_commit; /* do we commit (sub)xacts in parallel? */
64  bool parallel_abort; /* do we abort (sub)xacts in parallel? */
65  bool invalidated; /* true if reconnect is pending */
66  bool keep_connections; /* setting value of keep_connections
67  * server option */
68  Oid serverid; /* foreign server OID used to get server name */
69  uint32 server_hashvalue; /* hash value of foreign server OID */
70  uint32 mapping_hashvalue; /* hash value of user mapping OID */
71  PgFdwConnState state; /* extra per-connection state */
73 
74 /*
75  * Connection cache (initialized on first use)
76  */
77 static HTAB *ConnectionHash = NULL;
78 
79 /* for assigning cursor numbers and prepared statement numbers */
80 static unsigned int cursor_number = 0;
81 static unsigned int prep_stmt_number = 0;
82 
83 /* tracks whether any work is needed in callback functions */
84 static bool xact_got_connection = false;
85 
86 /* custom wait event values, retrieved from shared memory */
90 
91 /*
92  * Milliseconds to wait to cancel an in-progress query or execute a cleanup
93  * query; if it takes longer than 30 seconds to do these, we assume the
94  * connection is dead.
95  */
96 #define CONNECTION_CLEANUP_TIMEOUT 30000
97 
98 /* Macro for constructing abort command to be sent */
99 #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
100  do { \
101  if (toplevel) \
102  snprintf((sql), sizeof(sql), \
103  "ABORT TRANSACTION"); \
104  else \
105  snprintf((sql), sizeof(sql), \
106  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
107  (entry)->xact_depth, (entry)->xact_depth); \
108  } while(0)
109 
110 /*
111  * SQL functions
112  */
116 
117 /* prototypes of private functions */
118 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
120 static void disconnect_pg_server(ConnCacheEntry *entry);
121 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
122 static void configure_remote_session(PGconn *conn);
123 static void do_sql_command_begin(PGconn *conn, const char *sql);
124 static void do_sql_command_end(PGconn *conn, const char *sql,
125  bool consume_input);
126 static void begin_remote_xact(ConnCacheEntry *entry);
127 static void pgfdw_xact_callback(XactEvent event, void *arg);
128 static void pgfdw_subxact_callback(SubXactEvent event,
129  SubTransactionId mySubid,
130  SubTransactionId parentSubid,
131  void *arg);
132 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
134 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
135 static bool pgfdw_cancel_query(PGconn *conn);
136 static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
137 static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
138  bool consume_input);
139 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
140  bool ignore_errors);
141 static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
142 static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
143  TimestampTz endtime,
144  bool consume_input,
145  bool ignore_errors);
146 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
147  PGresult **result, bool *timed_out);
148 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
149 static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
150  List **pending_entries,
151  List **cancel_requested);
152 static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
153 static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
154  int curlevel);
155 static void pgfdw_finish_abort_cleanup(List *pending_entries,
156  List *cancel_requested,
157  bool toplevel);
158 static void pgfdw_security_check(const char **keywords, const char **values,
161 static bool disconnect_cached_connections(Oid serverid);
162 
163 /*
164  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
165  * server with the user's authorization. A new connection is established
166  * if we don't already have a suitable one, and a transaction is opened at
167  * the right subtransaction nesting depth if we didn't do that already.
168  *
169  * will_prep_stmt must be true if caller intends to create any prepared
170  * statements. Since those don't go away automatically at transaction end
171  * (not even on error), we need this flag to cue manual cleanup.
172  *
173  * If state is not NULL, *state receives the per-connection state associated
174  * with the PGconn.
175  */
176 PGconn *
178 {
179  bool found;
180  bool retry = false;
181  ConnCacheEntry *entry;
184 
185  /* First time through, initialize connection cache hashtable */
186  if (ConnectionHash == NULL)
187  {
188  HASHCTL ctl;
189 
190  if (pgfdw_we_get_result == 0)
192  WaitEventExtensionNew("PostgresFdwGetResult");
193 
194  ctl.keysize = sizeof(ConnCacheKey);
195  ctl.entrysize = sizeof(ConnCacheEntry);
196  ConnectionHash = hash_create("postgres_fdw connections", 8,
197  &ctl,
199 
200  /*
201  * Register some callback functions that manage connection cleanup.
202  * This should be done just once in each backend.
203  */
206  CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
208  CacheRegisterSyscacheCallback(USERMAPPINGOID,
210  }
211 
212  /* Set flag that we did GetConnection during the current transaction */
213  xact_got_connection = true;
214 
215  /* Create hash key for the entry. Assume no pad bytes in key struct */
216  key = user->umid;
217 
218  /*
219  * Find or create cached entry for requested connection.
220  */
221  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
222  if (!found)
223  {
224  /*
225  * We need only clear "conn" here; remaining fields will be filled
226  * later when "conn" is set.
227  */
228  entry->conn = NULL;
229  }
230 
231  /* Reject further use of connections which failed abort cleanup. */
233 
234  /*
235  * If the connection needs to be remade due to invalidation, disconnect as
236  * soon as we're out of all transactions.
237  */
238  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
239  {
240  elog(DEBUG3, "closing connection %p for option changes to take effect",
241  entry->conn);
242  disconnect_pg_server(entry);
243  }
244 
245  /*
246  * If cache entry doesn't have a connection, we have to establish a new
247  * connection. (If connect_pg_server throws an error, the cache entry
248  * will remain in a valid empty state, ie conn == NULL.)
249  */
250  if (entry->conn == NULL)
251  make_new_connection(entry, user);
252 
253  /*
254  * We check the health of the cached connection here when using it. In
255  * cases where we're out of all transactions, if a broken connection is
256  * detected, we try to reestablish a new connection later.
257  */
258  PG_TRY();
259  {
260  /* Process a pending asynchronous request if any. */
261  if (entry->state.pendingAreq)
263  /* Start a new transaction or subtransaction if needed. */
264  begin_remote_xact(entry);
265  }
266  PG_CATCH();
267  {
269  ErrorData *errdata = CopyErrorData();
270 
271  /*
272  * Determine whether to try to reestablish the connection.
273  *
274  * After a broken connection is detected in libpq, any error other
275  * than connection failure (e.g., out-of-memory) can be thrown
276  * somewhere between return from libpq and the expected ereport() call
277  * in pgfdw_report_error(). In this case, since PQstatus() indicates
278  * CONNECTION_BAD, checking only PQstatus() causes the false detection
279  * of connection failure. To avoid this, we also verify that the
280  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
281  * checking only the sqlstate can cause another false detection
282  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
283  * for any libpq-originated error condition.
284  */
285  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
286  PQstatus(entry->conn) != CONNECTION_BAD ||
287  entry->xact_depth > 0)
288  {
289  MemoryContextSwitchTo(ecxt);
290  PG_RE_THROW();
291  }
292 
293  /* Clean up the error state */
294  FlushErrorState();
295  FreeErrorData(errdata);
296  errdata = NULL;
297 
298  retry = true;
299  }
300  PG_END_TRY();
301 
302  /*
303  * If a broken connection is detected, disconnect it, reestablish a new
304  * connection and retry a new remote transaction. If connection failure is
305  * reported again, we give up getting a connection.
306  */
307  if (retry)
308  {
309  Assert(entry->xact_depth == 0);
310 
311  ereport(DEBUG3,
312  (errmsg_internal("could not start remote transaction on connection %p",
313  entry->conn)),
314  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
315 
316  elog(DEBUG3, "closing connection %p to reestablish a new one",
317  entry->conn);
318  disconnect_pg_server(entry);
319 
320  make_new_connection(entry, user);
321 
322  begin_remote_xact(entry);
323  }
324 
325  /* Remember if caller will prepare statements */
326  entry->have_prep_stmt |= will_prep_stmt;
327 
328  /* If caller needs access to the per-connection state, return it. */
329  if (state)
330  *state = &entry->state;
331 
332  return entry->conn;
333 }
334 
335 /*
336  * Reset all transient state fields in the cached connection entry and
337  * establish new connection to the remote server.
338  */
339 static void
341 {
342  ForeignServer *server = GetForeignServer(user->serverid);
343  ListCell *lc;
344 
345  Assert(entry->conn == NULL);
346 
347  /* Reset all transient state fields, to be sure all are clean */
348  entry->xact_depth = 0;
349  entry->have_prep_stmt = false;
350  entry->have_error = false;
351  entry->changing_xact_state = false;
352  entry->invalidated = false;
353  entry->serverid = server->serverid;
354  entry->server_hashvalue =
355  GetSysCacheHashValue1(FOREIGNSERVEROID,
356  ObjectIdGetDatum(server->serverid));
357  entry->mapping_hashvalue =
358  GetSysCacheHashValue1(USERMAPPINGOID,
359  ObjectIdGetDatum(user->umid));
360  memset(&entry->state, 0, sizeof(entry->state));
361 
362  /*
363  * Determine whether to keep the connection that we're about to make here
364  * open even after the transaction using it ends, so that the subsequent
365  * transactions can re-use it.
366  *
367  * By default, all the connections to any foreign servers are kept open.
368  *
369  * Also determine whether to commit/abort (sub)transactions opened on the
370  * remote server in parallel at (sub)transaction end, which is disabled by
371  * default.
372  *
373  * Note: it's enough to determine these only when making a new connection
374  * because if these settings for it are changed, it will be closed and
375  * re-made later.
376  */
377  entry->keep_connections = true;
378  entry->parallel_commit = false;
379  entry->parallel_abort = false;
380  foreach(lc, server->options)
381  {
382  DefElem *def = (DefElem *) lfirst(lc);
383 
384  if (strcmp(def->defname, "keep_connections") == 0)
385  entry->keep_connections = defGetBoolean(def);
386  else if (strcmp(def->defname, "parallel_commit") == 0)
387  entry->parallel_commit = defGetBoolean(def);
388  else if (strcmp(def->defname, "parallel_abort") == 0)
389  entry->parallel_abort = defGetBoolean(def);
390  }
391 
392  /* Now try to make the connection */
393  entry->conn = connect_pg_server(server, user);
394 
395  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
396  entry->conn, server->servername, user->umid, user->userid);
397 }
398 
399 /*
400  * Check that non-superuser has used password or delegated credentials
401  * to establish connection; otherwise, he's piggybacking on the
402  * postgres server's user identity. See also dblink_security_check()
403  * in contrib/dblink and check_conn_params.
404  */
405 static void
406 pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
407 {
408  /* Superusers bypass the check */
409  if (superuser_arg(user->userid))
410  return;
411 
412 #ifdef ENABLE_GSS
413  /* Connected via GSSAPI with delegated credentials- all good. */
415  return;
416 #endif
417 
418  /* Ok if superuser set PW required false. */
420  return;
421 
422  /* Connected via PW, with PW required true, and provided non-empty PW. */
424  {
425  /* ok if params contain a non-empty password */
426  for (int i = 0; keywords[i] != NULL; i++)
427  {
428  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
429  return;
430  }
431  }
432 
433  ereport(ERROR,
434  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
435  errmsg("password or GSSAPI delegated credentials required"),
436  errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
437  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
438 }
439 
440 /*
441  * Connect to remote server using specified server and user mapping properties.
442  */
443 static PGconn *
445 {
446  PGconn *volatile conn = NULL;
447 
448  /*
449  * Use PG_TRY block to ensure closing connection on error.
450  */
451  PG_TRY();
452  {
453  const char **keywords;
454  const char **values;
455  char *appname = NULL;
456  int n;
457 
458  /*
459  * Construct connection params from generic options of ForeignServer
460  * and UserMapping. (Some of them might not be libpq options, in
461  * which case we'll just waste a few array slots.) Add 4 extra slots
462  * for application_name, fallback_application_name, client_encoding,
463  * end marker.
464  */
465  n = list_length(server->options) + list_length(user->options) + 4;
466  keywords = (const char **) palloc(n * sizeof(char *));
467  values = (const char **) palloc(n * sizeof(char *));
468 
469  n = 0;
470  n += ExtractConnectionOptions(server->options,
471  keywords + n, values + n);
472  n += ExtractConnectionOptions(user->options,
473  keywords + n, values + n);
474 
475  /*
476  * Use pgfdw_application_name as application_name if set.
477  *
478  * PQconnectdbParams() processes the parameter arrays from start to
479  * end. If any key word is repeated, the last value is used. Therefore
480  * note that pgfdw_application_name must be added to the arrays after
481  * options of ForeignServer are, so that it can override
482  * application_name set in ForeignServer.
483  */
485  {
486  keywords[n] = "application_name";
488  n++;
489  }
490 
491  /*
492  * Search the parameter arrays to find application_name setting, and
493  * replace escape sequences in it with status information if found.
494  * The arrays are searched backwards because the last value is used if
495  * application_name is repeatedly set.
496  */
497  for (int i = n - 1; i >= 0; i--)
498  {
499  if (strcmp(keywords[i], "application_name") == 0 &&
500  *(values[i]) != '\0')
501  {
502  /*
503  * Use this application_name setting if it's not empty string
504  * even after any escape sequences in it are replaced.
505  */
506  appname = process_pgfdw_appname(values[i]);
507  if (appname[0] != '\0')
508  {
509  values[i] = appname;
510  break;
511  }
512 
513  /*
514  * This empty application_name is not used, so we set
515  * values[i] to NULL and keep searching the array to find the
516  * next one.
517  */
518  values[i] = NULL;
519  pfree(appname);
520  appname = NULL;
521  }
522  }
523 
524  /* Use "postgres_fdw" as fallback_application_name */
525  keywords[n] = "fallback_application_name";
526  values[n] = "postgres_fdw";
527  n++;
528 
529  /* Set client_encoding so that libpq can convert encoding properly. */
530  keywords[n] = "client_encoding";
532  n++;
533 
534  keywords[n] = values[n] = NULL;
535 
536  /* verify the set of connection parameters */
537  check_conn_params(keywords, values, user);
538 
539  /* first time, allocate or get the custom wait event */
540  if (pgfdw_we_connect == 0)
541  pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
542 
543  /* OK to make connection */
544  conn = libpqsrv_connect_params(keywords, values,
545  false, /* expand_dbname */
547 
548  if (!conn || PQstatus(conn) != CONNECTION_OK)
549  ereport(ERROR,
550  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
551  errmsg("could not connect to server \"%s\"",
552  server->servername),
554 
555  /* Perform post-connection security checks */
556  pgfdw_security_check(keywords, values, user, conn);
557 
558  /* Prepare new session for use */
560 
561  if (appname != NULL)
562  pfree(appname);
563  pfree(keywords);
564  pfree(values);
565  }
566  PG_CATCH();
567  {
569  PG_RE_THROW();
570  }
571  PG_END_TRY();
572 
573  return conn;
574 }
575 
576 /*
577  * Disconnect any open connection for a connection cache entry.
578  */
579 static void
581 {
582  if (entry->conn != NULL)
583  {
584  libpqsrv_disconnect(entry->conn);
585  entry->conn = NULL;
586  }
587 }
588 
589 /*
590  * Return true if the password_required is defined and false for this user
591  * mapping, otherwise false. The mapping has been pre-validated.
592  */
593 static bool
595 {
596  ListCell *cell;
597 
598  foreach(cell, user->options)
599  {
600  DefElem *def = (DefElem *) lfirst(cell);
601 
602  if (strcmp(def->defname, "password_required") == 0)
603  return defGetBoolean(def);
604  }
605 
606  return true;
607 }
608 
609 /*
610  * For non-superusers, insist that the connstr specify a password or that the
611  * user provided their own GSSAPI delegated credentials. This
612  * prevents a password from being picked up from .pgpass, a service file, the
613  * environment, etc. We don't want the postgres user's passwords,
614  * certificates, etc to be accessible to non-superusers. (See also
615  * dblink_connstr_check in contrib/dblink.)
616  */
617 static void
618 check_conn_params(const char **keywords, const char **values, UserMapping *user)
619 {
620  int i;
621 
622  /* no check required if superuser */
623  if (superuser_arg(user->userid))
624  return;
625 
626 #ifdef ENABLE_GSS
627  /* ok if the user provided their own delegated credentials */
629  return;
630 #endif
631 
632  /* ok if params contain a non-empty password */
633  for (i = 0; keywords[i] != NULL; i++)
634  {
635  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
636  return;
637  }
638 
639  /* ok if the superuser explicitly said so at user mapping creation time */
641  return;
642 
643  ereport(ERROR,
644  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
645  errmsg("password or GSSAPI delegated credentials required"),
646  errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
647 }
648 
649 /*
650  * Issue SET commands to make sure remote session is configured properly.
651  *
652  * We do this just once at connection, assuming nothing will change the
653  * values later. Since we'll never send volatile function calls to the
654  * remote, there shouldn't be any way to break this assumption from our end.
655  * It's possible to think of ways to break it at the remote end, eg making
656  * a foreign table point to a view that includes a set_config call ---
657  * but once you admit the possibility of a malicious view definition,
658  * there are any number of ways to break things.
659  */
660 static void
662 {
663  int remoteversion = PQserverVersion(conn);
664 
665  /* Force the search path to contain only pg_catalog (see deparse.c) */
666  do_sql_command(conn, "SET search_path = pg_catalog");
667 
668  /*
669  * Set remote timezone; this is basically just cosmetic, since all
670  * transmitted and returned timestamptzs should specify a zone explicitly
671  * anyway. However it makes the regression test outputs more predictable.
672  *
673  * We don't risk setting remote zone equal to ours, since the remote
674  * server might use a different timezone database. Instead, use UTC
675  * (quoted, because very old servers are picky about case).
676  */
677  do_sql_command(conn, "SET timezone = 'UTC'");
678 
679  /*
680  * Set values needed to ensure unambiguous data output from remote. (This
681  * logic should match what pg_dump does. See also set_transmission_modes
682  * in postgres_fdw.c.)
683  */
684  do_sql_command(conn, "SET datestyle = ISO");
685  if (remoteversion >= 80400)
686  do_sql_command(conn, "SET intervalstyle = postgres");
687  if (remoteversion >= 90000)
688  do_sql_command(conn, "SET extra_float_digits = 3");
689  else
690  do_sql_command(conn, "SET extra_float_digits = 2");
691 }
692 
693 /*
694  * Convenience subroutine to issue a non-data-returning SQL command to remote
695  */
696 void
697 do_sql_command(PGconn *conn, const char *sql)
698 {
700  do_sql_command_end(conn, sql, false);
701 }
702 
703 static void
704 do_sql_command_begin(PGconn *conn, const char *sql)
705 {
706  if (!PQsendQuery(conn, sql))
707  pgfdw_report_error(ERROR, NULL, conn, false, sql);
708 }
709 
710 static void
711 do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
712 {
713  PGresult *res;
714 
715  /*
716  * If requested, consume whatever data is available from the socket. (Note
717  * that if all data is available, this allows pgfdw_get_result to call
718  * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
719  * would be large compared to the overhead of PQconsumeInput.)
720  */
721  if (consume_input && !PQconsumeInput(conn))
722  pgfdw_report_error(ERROR, NULL, conn, false, sql);
725  pgfdw_report_error(ERROR, res, conn, true, sql);
726  PQclear(res);
727 }
728 
729 /*
730  * Start remote transaction or subtransaction, if needed.
731  *
732  * Note that we always use at least REPEATABLE READ in the remote session.
733  * This is so that, if a query initiates multiple scans of the same or
734  * different foreign tables, we will get snapshot-consistent results from
735  * those scans. A disadvantage is that we can't provide sane emulation of
736  * READ COMMITTED behavior --- it would be nice if we had some other way to
737  * control which remote queries share a snapshot.
738  */
739 static void
741 {
742  int curlevel = GetCurrentTransactionNestLevel();
743 
744  /* Start main transaction if we haven't yet */
745  if (entry->xact_depth <= 0)
746  {
747  const char *sql;
748 
749  elog(DEBUG3, "starting remote transaction on connection %p",
750  entry->conn);
751 
753  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
754  else
755  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
756  entry->changing_xact_state = true;
757  do_sql_command(entry->conn, sql);
758  entry->xact_depth = 1;
759  entry->changing_xact_state = false;
760  }
761 
762  /*
763  * If we're in a subtransaction, stack up savepoints to match our level.
764  * This ensures we can rollback just the desired effects when a
765  * subtransaction aborts.
766  */
767  while (entry->xact_depth < curlevel)
768  {
769  char sql[64];
770 
771  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
772  entry->changing_xact_state = true;
773  do_sql_command(entry->conn, sql);
774  entry->xact_depth++;
775  entry->changing_xact_state = false;
776  }
777 }
778 
779 /*
780  * Release connection reference count created by calling GetConnection.
781  */
782 void
784 {
785  /*
786  * Currently, we don't actually track connection references because all
787  * cleanup is managed on a transaction or subtransaction basis instead. So
788  * there's nothing to do here.
789  */
790 }
791 
792 /*
793  * Assign a "unique" number for a cursor.
794  *
795  * These really only need to be unique per connection within a transaction.
796  * For the moment we ignore the per-connection point and assign them across
797  * all connections in the transaction, but we ask for the connection to be
798  * supplied in case we want to refine that.
799  *
800  * Note that even if wraparound happens in a very long transaction, actual
801  * collisions are highly improbable; just be sure to use %u not %d to print.
802  */
803 unsigned int
805 {
806  return ++cursor_number;
807 }
808 
809 /*
810  * Assign a "unique" number for a prepared statement.
811  *
812  * This works much like GetCursorNumber, except that we never reset the counter
813  * within a session. That's because we can't be 100% sure we've gotten rid
814  * of all prepared statements on all connections, and it's not really worth
815  * increasing the risk of prepared-statement name collisions by resetting.
816  */
817 unsigned int
819 {
820  return ++prep_stmt_number;
821 }
822 
823 /*
824  * Submit a query and wait for the result.
825  *
826  * Since we don't use non-blocking mode, this can't process interrupts while
827  * pushing the query text to the server. That risk is relatively small, so we
828  * ignore that for now.
829  *
830  * Caller is responsible for the error handling on the result.
831  */
832 PGresult *
834 {
835  /* First, process a pending asynchronous request, if any. */
836  if (state && state->pendingAreq)
837  process_pending_request(state->pendingAreq);
838 
839  if (!PQsendQuery(conn, query))
840  return NULL;
841  return pgfdw_get_result(conn);
842 }
843 
844 /*
845  * Wrap libpqsrv_get_result_last(), adding wait event.
846  *
847  * Caller is responsible for the error handling on the result.
848  */
849 PGresult *
851 {
853 }
854 
855 /*
856  * Report an error we got from the remote server.
857  *
858  * elevel: error level to use (typically ERROR, but might be less)
859  * res: PGresult containing the error
860  * conn: connection we did the query on
861  * clear: if true, PQclear the result (otherwise caller will handle it)
862  * sql: NULL, or text of remote command we tried to execute
863  *
864  * Note: callers that choose not to throw ERROR for a remote error are
865  * responsible for making sure that the associated ConnCacheEntry gets
866  * marked with have_error = true.
867  */
868 void
870  bool clear, const char *sql)
871 {
872  /* If requested, PGresult must be released before leaving this function. */
873  PG_TRY();
874  {
875  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
876  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
877  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
878  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
879  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
880  int sqlstate;
881 
882  if (diag_sqlstate)
883  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
884  diag_sqlstate[1],
885  diag_sqlstate[2],
886  diag_sqlstate[3],
887  diag_sqlstate[4]);
888  else
889  sqlstate = ERRCODE_CONNECTION_FAILURE;
890 
891  /*
892  * If we don't get a message from the PGresult, try the PGconn. This
893  * is needed because for connection-level failures, PQgetResult may
894  * just return NULL, not a PGresult at all.
895  */
896  if (message_primary == NULL)
897  message_primary = pchomp(PQerrorMessage(conn));
898 
899  ereport(elevel,
900  (errcode(sqlstate),
901  (message_primary != NULL && message_primary[0] != '\0') ?
902  errmsg_internal("%s", message_primary) :
903  errmsg("could not obtain message string for remote error"),
904  message_detail ? errdetail_internal("%s", message_detail) : 0,
905  message_hint ? errhint("%s", message_hint) : 0,
906  message_context ? errcontext("%s", message_context) : 0,
907  sql ? errcontext("remote SQL command: %s", sql) : 0));
908  }
909  PG_FINALLY();
910  {
911  if (clear)
912  PQclear(res);
913  }
914  PG_END_TRY();
915 }
916 
917 /*
918  * pgfdw_xact_callback --- cleanup at main-transaction end.
919  *
920  * This runs just late enough that it must not enter user-defined code
921  * locally. (Entering such code on the remote side is fine. Its remote
922  * COMMIT TRANSACTION may run deferred triggers.)
923  */
924 static void
926 {
927  HASH_SEQ_STATUS scan;
928  ConnCacheEntry *entry;
929  List *pending_entries = NIL;
930  List *cancel_requested = NIL;
931 
932  /* Quick exit if no connections were touched in this transaction. */
933  if (!xact_got_connection)
934  return;
935 
936  /*
937  * Scan all connection cache entries to find open remote transactions, and
938  * close them.
939  */
941  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
942  {
943  PGresult *res;
944 
945  /* Ignore cache entry if no open connection right now */
946  if (entry->conn == NULL)
947  continue;
948 
949  /* If it has an open remote transaction, try to close it */
950  if (entry->xact_depth > 0)
951  {
952  elog(DEBUG3, "closing remote transaction on connection %p",
953  entry->conn);
954 
955  switch (event)
956  {
959 
960  /*
961  * If abort cleanup previously failed for this connection,
962  * we can't issue any more commands against it.
963  */
965 
966  /* Commit all remote transactions during pre-commit */
967  entry->changing_xact_state = true;
968  if (entry->parallel_commit)
969  {
970  do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
971  pending_entries = lappend(pending_entries, entry);
972  continue;
973  }
974  do_sql_command(entry->conn, "COMMIT TRANSACTION");
975  entry->changing_xact_state = false;
976 
977  /*
978  * If there were any errors in subtransactions, and we
979  * made prepared statements, do a DEALLOCATE ALL to make
980  * sure we get rid of all prepared statements. This is
981  * annoying and not terribly bulletproof, but it's
982  * probably not worth trying harder.
983  *
984  * DEALLOCATE ALL only exists in 8.3 and later, so this
985  * constrains how old a server postgres_fdw can
986  * communicate with. We intentionally ignore errors in
987  * the DEALLOCATE, so that we can hobble along to some
988  * extent with older servers (leaking prepared statements
989  * as we go; but we don't really support update operations
990  * pre-8.3 anyway).
991  */
992  if (entry->have_prep_stmt && entry->have_error)
993  {
994  res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
995  NULL);
996  PQclear(res);
997  }
998  entry->have_prep_stmt = false;
999  entry->have_error = false;
1000  break;
1002 
1003  /*
1004  * We disallow any remote transactions, since it's not
1005  * very reasonable to hold them open until the prepared
1006  * transaction is committed. For the moment, throw error
1007  * unconditionally; later we might allow read-only cases.
1008  * Note that the error will cause us to come right back
1009  * here with event == XACT_EVENT_ABORT, so we'll clean up
1010  * the connection state at that point.
1011  */
1012  ereport(ERROR,
1013  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1014  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1015  break;
1017  case XACT_EVENT_COMMIT:
1018  case XACT_EVENT_PREPARE:
1019  /* Pre-commit should have closed the open transaction */
1020  elog(ERROR, "missed cleaning up connection during pre-commit");
1021  break;
1023  case XACT_EVENT_ABORT:
1024  /* Rollback all remote transactions during abort */
1025  if (entry->parallel_abort)
1026  {
1027  if (pgfdw_abort_cleanup_begin(entry, true,
1028  &pending_entries,
1029  &cancel_requested))
1030  continue;
1031  }
1032  else
1033  pgfdw_abort_cleanup(entry, true);
1034  break;
1035  }
1036  }
1037 
1038  /* Reset state to show we're out of a transaction */
1039  pgfdw_reset_xact_state(entry, true);
1040  }
1041 
1042  /* If there are any pending connections, finish cleaning them up */
1043  if (pending_entries || cancel_requested)
1044  {
1045  if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1046  event == XACT_EVENT_PRE_COMMIT)
1047  {
1048  Assert(cancel_requested == NIL);
1049  pgfdw_finish_pre_commit_cleanup(pending_entries);
1050  }
1051  else
1052  {
1053  Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1054  event == XACT_EVENT_ABORT);
1055  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1056  true);
1057  }
1058  }
1059 
1060  /*
1061  * Regardless of the event type, we can now mark ourselves as out of the
1062  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1063  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1064  */
1065  xact_got_connection = false;
1066 
1067  /* Also reset cursor numbering for next transaction */
1068  cursor_number = 0;
1069 }
1070 
1071 /*
1072  * pgfdw_subxact_callback --- cleanup at subtransaction end.
1073  */
1074 static void
1076  SubTransactionId parentSubid, void *arg)
1077 {
1078  HASH_SEQ_STATUS scan;
1079  ConnCacheEntry *entry;
1080  int curlevel;
1081  List *pending_entries = NIL;
1082  List *cancel_requested = NIL;
1083 
1084  /* Nothing to do at subxact start, nor after commit. */
1085  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1086  event == SUBXACT_EVENT_ABORT_SUB))
1087  return;
1088 
1089  /* Quick exit if no connections were touched in this transaction. */
1090  if (!xact_got_connection)
1091  return;
1092 
1093  /*
1094  * Scan all connection cache entries to find open remote subtransactions
1095  * of the current level, and close them.
1096  */
1097  curlevel = GetCurrentTransactionNestLevel();
1098  hash_seq_init(&scan, ConnectionHash);
1099  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1100  {
1101  char sql[100];
1102 
1103  /*
1104  * We only care about connections with open remote subtransactions of
1105  * the current level.
1106  */
1107  if (entry->conn == NULL || entry->xact_depth < curlevel)
1108  continue;
1109 
1110  if (entry->xact_depth > curlevel)
1111  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1112  entry->xact_depth);
1113 
1114  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1115  {
1116  /*
1117  * If abort cleanup previously failed for this connection, we
1118  * can't issue any more commands against it.
1119  */
1121 
1122  /* Commit all remote subtransactions during pre-commit */
1123  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1124  entry->changing_xact_state = true;
1125  if (entry->parallel_commit)
1126  {
1127  do_sql_command_begin(entry->conn, sql);
1128  pending_entries = lappend(pending_entries, entry);
1129  continue;
1130  }
1131  do_sql_command(entry->conn, sql);
1132  entry->changing_xact_state = false;
1133  }
1134  else
1135  {
1136  /* Rollback all remote subtransactions during abort */
1137  if (entry->parallel_abort)
1138  {
1139  if (pgfdw_abort_cleanup_begin(entry, false,
1140  &pending_entries,
1141  &cancel_requested))
1142  continue;
1143  }
1144  else
1145  pgfdw_abort_cleanup(entry, false);
1146  }
1147 
1148  /* OK, we're outta that level of subtransaction */
1149  pgfdw_reset_xact_state(entry, false);
1150  }
1151 
1152  /* If there are any pending connections, finish cleaning them up */
1153  if (pending_entries || cancel_requested)
1154  {
1155  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1156  {
1157  Assert(cancel_requested == NIL);
1158  pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1159  }
1160  else
1161  {
1162  Assert(event == SUBXACT_EVENT_ABORT_SUB);
1163  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1164  false);
1165  }
1166  }
1167 }
1168 
1169 /*
1170  * Connection invalidation callback function
1171  *
1172  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1173  * close connections depending on that entry immediately if current transaction
1174  * has not used those connections yet. Otherwise, mark those connections as
1175  * invalid and then make pgfdw_xact_callback() close them at the end of current
1176  * transaction, since they cannot be closed in the midst of the transaction
1177  * using them. Closed connections will be remade at the next opportunity if
1178  * necessary.
1179  *
1180  * Although most cache invalidation callbacks blow away all the related stuff
1181  * regardless of the given hashvalue, connections are expensive enough that
1182  * it's worth trying to avoid that.
1183  *
1184  * NB: We could avoid unnecessary disconnection more strictly by examining
1185  * individual option values, but it seems too much effort for the gain.
1186  */
1187 static void
1188 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1189 {
1190  HASH_SEQ_STATUS scan;
1191  ConnCacheEntry *entry;
1192 
1193  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1194 
1195  /* ConnectionHash must exist already, if we're registered */
1196  hash_seq_init(&scan, ConnectionHash);
1197  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1198  {
1199  /* Ignore invalid entries */
1200  if (entry->conn == NULL)
1201  continue;
1202 
1203  /* hashvalue == 0 means a cache reset, must clear all state */
1204  if (hashvalue == 0 ||
1205  (cacheid == FOREIGNSERVEROID &&
1206  entry->server_hashvalue == hashvalue) ||
1207  (cacheid == USERMAPPINGOID &&
1208  entry->mapping_hashvalue == hashvalue))
1209  {
1210  /*
1211  * Close the connection immediately if it's not used yet in this
1212  * transaction. Otherwise mark it as invalid so that
1213  * pgfdw_xact_callback() can close it at the end of this
1214  * transaction.
1215  */
1216  if (entry->xact_depth == 0)
1217  {
1218  elog(DEBUG3, "discarding connection %p", entry->conn);
1219  disconnect_pg_server(entry);
1220  }
1221  else
1222  entry->invalidated = true;
1223  }
1224  }
1225 }
1226 
1227 /*
1228  * Raise an error if the given connection cache entry is marked as being
1229  * in the middle of an xact state change. This should be called at which no
1230  * such change is expected to be in progress; if one is found to be in
1231  * progress, it means that we aborted in the middle of a previous state change
1232  * and now don't know what the remote transaction state actually is.
1233  * Such connections can't safely be further used. Re-establishing the
1234  * connection would change the snapshot and roll back any writes already
1235  * performed, so that's not an option, either. Thus, we must abort.
1236  */
1237 static void
1239 {
1240  ForeignServer *server;
1241 
1242  /* nothing to do for inactive entries and entries of sane state */
1243  if (entry->conn == NULL || !entry->changing_xact_state)
1244  return;
1245 
1246  /* make sure this entry is inactive */
1247  disconnect_pg_server(entry);
1248 
1249  /* find server name to be shown in the message below */
1250  server = GetForeignServer(entry->serverid);
1251 
1252  ereport(ERROR,
1253  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1254  errmsg("connection to server \"%s\" was lost",
1255  server->servername)));
1256 }
1257 
1258 /*
1259  * Reset state to show we're out of a (sub)transaction.
1260  */
1261 static void
1263 {
1264  if (toplevel)
1265  {
1266  /* Reset state to show we're out of a transaction */
1267  entry->xact_depth = 0;
1268 
1269  /*
1270  * If the connection isn't in a good idle state, it is marked as
1271  * invalid or keep_connections option of its server is disabled, then
1272  * discard it to recover. Next GetConnection will open a new
1273  * connection.
1274  */
1275  if (PQstatus(entry->conn) != CONNECTION_OK ||
1276  PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1277  entry->changing_xact_state ||
1278  entry->invalidated ||
1279  !entry->keep_connections)
1280  {
1281  elog(DEBUG3, "discarding connection %p", entry->conn);
1282  disconnect_pg_server(entry);
1283  }
1284  }
1285  else
1286  {
1287  /* Reset state to show we're out of a subtransaction */
1288  entry->xact_depth--;
1289  }
1290 }
1291 
1292 /*
1293  * Cancel the currently-in-progress query (whose query text we do not have)
1294  * and ignore the result. Returns true if we successfully cancel the query
1295  * and discard any pending result, and false if not.
1296  *
1297  * It's not a huge problem if we throw an ERROR here, but if we get into error
1298  * recursion trouble, we'll end up slamming the connection shut, which will
1299  * necessitate failing the entire toplevel transaction even if subtransactions
1300  * were used. Try to use WARNING where we can.
1301  *
1302  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1303  * query text from the pendingAreq saved in the per-connection state, then
1304  * report the query using it.
1305  */
1306 static bool
1308 {
1309  TimestampTz endtime;
1310 
1311  /*
1312  * If it takes too long to cancel the query and discard the result, assume
1313  * the connection is dead.
1314  */
1317 
1318  if (!pgfdw_cancel_query_begin(conn, endtime))
1319  return false;
1320  return pgfdw_cancel_query_end(conn, endtime, false);
1321 }
1322 
1323 /*
1324  * Submit a cancel request to the given connection, waiting only until
1325  * the given time.
1326  *
1327  * We sleep interruptibly until we receive confirmation that the cancel
1328  * request has been accepted, and if it is, return true; if the timeout
1329  * lapses without that, or the request fails for whatever reason, return
1330  * false.
1331  */
1332 static bool
1334 {
1335  char *errormsg = libpqsrv_cancel(conn, endtime);
1336 
1337  if (errormsg != NULL)
1338  ereport(WARNING,
1339  errcode(ERRCODE_CONNECTION_FAILURE),
1340  errmsg("could not send cancel request: %s", errormsg));
1341 
1342  return errormsg == NULL;
1343 }
1344 
1345 static bool
1346 pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
1347 {
1348  PGresult *result = NULL;
1349  bool timed_out;
1350 
1351  /*
1352  * If requested, consume whatever data is available from the socket. (Note
1353  * that if all data is available, this allows pgfdw_get_cleanup_result to
1354  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1355  * which would be large compared to the overhead of PQconsumeInput.)
1356  */
1357  if (consume_input && !PQconsumeInput(conn))
1358  {
1359  ereport(WARNING,
1360  (errcode(ERRCODE_CONNECTION_FAILURE),
1361  errmsg("could not get result of cancel request: %s",
1362  pchomp(PQerrorMessage(conn)))));
1363  return false;
1364  }
1365 
1366  /* Get and discard the result of the query. */
1367  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1368  {
1369  if (timed_out)
1370  ereport(WARNING,
1371  (errmsg("could not get result of cancel request due to timeout")));
1372  else
1373  ereport(WARNING,
1374  (errcode(ERRCODE_CONNECTION_FAILURE),
1375  errmsg("could not get result of cancel request: %s",
1376  pchomp(PQerrorMessage(conn)))));
1377 
1378  return false;
1379  }
1380  PQclear(result);
1381 
1382  return true;
1383 }
1384 
1385 /*
1386  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1387  * result. If the query is executed without error, the return value is true.
1388  * If the query is executed successfully but returns an error, the return
1389  * value is true if and only if ignore_errors is set. If the query can't be
1390  * sent or times out, the return value is false.
1391  *
1392  * It's not a huge problem if we throw an ERROR here, but if we get into error
1393  * recursion trouble, we'll end up slamming the connection shut, which will
1394  * necessitate failing the entire toplevel transaction even if subtransactions
1395  * were used. Try to use WARNING where we can.
1396  */
1397 static bool
1398 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1399 {
1400  TimestampTz endtime;
1401 
1402  /*
1403  * If it takes too long to execute a cleanup query, assume the connection
1404  * is dead. It's fairly likely that this is why we aborted in the first
1405  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1406  * be too long.
1407  */
1410 
1411  if (!pgfdw_exec_cleanup_query_begin(conn, query))
1412  return false;
1413  return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1414  false, ignore_errors);
1415 }
1416 
1417 static bool
1419 {
1420  /*
1421  * Submit a query. Since we don't use non-blocking mode, this also can
1422  * block. But its risk is relatively small, so we ignore that for now.
1423  */
1424  if (!PQsendQuery(conn, query))
1425  {
1426  pgfdw_report_error(WARNING, NULL, conn, false, query);
1427  return false;
1428  }
1429 
1430  return true;
1431 }
1432 
1433 static bool
1435  TimestampTz endtime, bool consume_input,
1436  bool ignore_errors)
1437 {
1438  PGresult *result = NULL;
1439  bool timed_out;
1440 
1441  /*
1442  * If requested, consume whatever data is available from the socket. (Note
1443  * that if all data is available, this allows pgfdw_get_cleanup_result to
1444  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1445  * which would be large compared to the overhead of PQconsumeInput.)
1446  */
1447  if (consume_input && !PQconsumeInput(conn))
1448  {
1449  pgfdw_report_error(WARNING, NULL, conn, false, query);
1450  return false;
1451  }
1452 
1453  /* Get the result of the query. */
1454  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1455  {
1456  if (timed_out)
1457  ereport(WARNING,
1458  (errmsg("could not get query result due to timeout"),
1459  query ? errcontext("remote SQL command: %s", query) : 0));
1460  else
1461  pgfdw_report_error(WARNING, NULL, conn, false, query);
1462 
1463  return false;
1464  }
1465 
1466  /* Issue a warning if not successful. */
1467  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1468  {
1469  pgfdw_report_error(WARNING, result, conn, true, query);
1470  return ignore_errors;
1471  }
1472  PQclear(result);
1473 
1474  return true;
1475 }
1476 
1477 /*
1478  * Get, during abort cleanup, the result of a query that is in progress. This
1479  * might be a query that is being interrupted by transaction abort, or it might
1480  * be a query that was initiated as part of transaction abort to get the remote
1481  * side back to the appropriate state.
1482  *
1483  * endtime is the time at which we should give up and assume the remote
1484  * side is dead. Returns true if the timeout expired or connection trouble
1485  * occurred, false otherwise. Sets *result except in case of a timeout.
1486  * Sets timed_out to true only when the timeout expired.
1487  */
1488 static bool
1490  bool *timed_out)
1491 {
1492  volatile bool failed = false;
1493  PGresult *volatile last_res = NULL;
1494 
1495  *timed_out = false;
1496 
1497  /* In what follows, do not leak any PGresults on an error. */
1498  PG_TRY();
1499  {
1500  for (;;)
1501  {
1502  PGresult *res;
1503 
1504  while (PQisBusy(conn))
1505  {
1506  int wc;
1508  long cur_timeout;
1509 
1510  /* If timeout has expired, give up, else get sleep time. */
1511  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1512  if (cur_timeout <= 0)
1513  {
1514  *timed_out = true;
1515  failed = true;
1516  goto exit;
1517  }
1518 
1519  /* first time, allocate or get the custom wait event */
1520  if (pgfdw_we_cleanup_result == 0)
1521  pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1522 
1523  /* Sleep until there's something to do */
1527  PQsocket(conn),
1528  cur_timeout, pgfdw_we_cleanup_result);
1530 
1532 
1533  /* Data available in socket? */
1534  if (wc & WL_SOCKET_READABLE)
1535  {
1536  if (!PQconsumeInput(conn))
1537  {
1538  /* connection trouble */
1539  failed = true;
1540  goto exit;
1541  }
1542  }
1543  }
1544 
1545  res = PQgetResult(conn);
1546  if (res == NULL)
1547  break; /* query is complete */
1548 
1549  PQclear(last_res);
1550  last_res = res;
1551  }
1552 exit: ;
1553  }
1554  PG_CATCH();
1555  {
1556  PQclear(last_res);
1557  PG_RE_THROW();
1558  }
1559  PG_END_TRY();
1560 
1561  if (failed)
1562  PQclear(last_res);
1563  else
1564  *result = last_res;
1565  return failed;
1566 }
1567 
1568 /*
1569  * Abort remote transaction or subtransaction.
1570  *
1571  * "toplevel" should be set to true if toplevel (main) transaction is
1572  * rollbacked, false otherwise.
1573  *
1574  * Set entry->changing_xact_state to false on success, true on failure.
1575  */
1576 static void
1577 pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1578 {
1579  char sql[100];
1580 
1581  /*
1582  * Don't try to clean up the connection if we're already in error
1583  * recursion trouble.
1584  */
1586  entry->changing_xact_state = true;
1587 
1588  /*
1589  * If connection is already unsalvageable, don't touch it further.
1590  */
1591  if (entry->changing_xact_state)
1592  return;
1593 
1594  /*
1595  * Mark this connection as in the process of changing transaction state.
1596  */
1597  entry->changing_xact_state = true;
1598 
1599  /* Assume we might have lost track of prepared statements */
1600  entry->have_error = true;
1601 
1602  /*
1603  * If a command has been submitted to the remote server by using an
1604  * asynchronous execution function, the command might not have yet
1605  * completed. Check to see if a command is still being processed by the
1606  * remote server, and if so, request cancellation of the command.
1607  */
1608  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1609  !pgfdw_cancel_query(entry->conn))
1610  return; /* Unable to cancel running query */
1611 
1612  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1613  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1614  return; /* Unable to abort remote (sub)transaction */
1615 
1616  if (toplevel)
1617  {
1618  if (entry->have_prep_stmt && entry->have_error &&
1620  "DEALLOCATE ALL",
1621  true))
1622  return; /* Trouble clearing prepared statements */
1623 
1624  entry->have_prep_stmt = false;
1625  entry->have_error = false;
1626  }
1627 
1628  /*
1629  * If pendingAreq of the per-connection state is not NULL, it means that
1630  * an asynchronous fetch begun by fetch_more_data_begin() was not done
1631  * successfully and thus the per-connection state was not reset in
1632  * fetch_more_data(); in that case reset the per-connection state here.
1633  */
1634  if (entry->state.pendingAreq)
1635  memset(&entry->state, 0, sizeof(entry->state));
1636 
1637  /* Disarm changing_xact_state if it all worked */
1638  entry->changing_xact_state = false;
1639 }
1640 
1641 /*
1642  * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1643  * don't wait for the result.
1644  *
1645  * Returns true if the abort command or cancel request is successfully issued,
1646  * false otherwise. If the abort command is successfully issued, the given
1647  * connection cache entry is appended to *pending_entries. Otherwise, if the
1648  * cancel request is successfully issued, it is appended to *cancel_requested.
1649  */
1650 static bool
1652  List **pending_entries, List **cancel_requested)
1653 {
1654  /*
1655  * Don't try to clean up the connection if we're already in error
1656  * recursion trouble.
1657  */
1659  entry->changing_xact_state = true;
1660 
1661  /*
1662  * If connection is already unsalvageable, don't touch it further.
1663  */
1664  if (entry->changing_xact_state)
1665  return false;
1666 
1667  /*
1668  * Mark this connection as in the process of changing transaction state.
1669  */
1670  entry->changing_xact_state = true;
1671 
1672  /* Assume we might have lost track of prepared statements */
1673  entry->have_error = true;
1674 
1675  /*
1676  * If a command has been submitted to the remote server by using an
1677  * asynchronous execution function, the command might not have yet
1678  * completed. Check to see if a command is still being processed by the
1679  * remote server, and if so, request cancellation of the command.
1680  */
1681  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1682  {
1683  TimestampTz endtime;
1684 
1687  if (!pgfdw_cancel_query_begin(entry->conn, endtime))
1688  return false; /* Unable to cancel running query */
1689  *cancel_requested = lappend(*cancel_requested, entry);
1690  }
1691  else
1692  {
1693  char sql[100];
1694 
1695  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1696  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1697  return false; /* Unable to abort remote transaction */
1698  *pending_entries = lappend(*pending_entries, entry);
1699  }
1700 
1701  return true;
1702 }
1703 
1704 /*
1705  * Finish pre-commit cleanup of connections on each of which we've sent a
1706  * COMMIT command to the remote server.
1707  */
1708 static void
1710 {
1711  ConnCacheEntry *entry;
1712  List *pending_deallocs = NIL;
1713  ListCell *lc;
1714 
1715  Assert(pending_entries);
1716 
1717  /*
1718  * Get the result of the COMMIT command for each of the pending entries
1719  */
1720  foreach(lc, pending_entries)
1721  {
1722  entry = (ConnCacheEntry *) lfirst(lc);
1723 
1724  Assert(entry->changing_xact_state);
1725 
1726  /*
1727  * We might already have received the result on the socket, so pass
1728  * consume_input=true to try to consume it first
1729  */
1730  do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1731  entry->changing_xact_state = false;
1732 
1733  /* Do a DEALLOCATE ALL in parallel if needed */
1734  if (entry->have_prep_stmt && entry->have_error)
1735  {
1736  /* Ignore errors (see notes in pgfdw_xact_callback) */
1737  if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1738  {
1739  pending_deallocs = lappend(pending_deallocs, entry);
1740  continue;
1741  }
1742  }
1743  entry->have_prep_stmt = false;
1744  entry->have_error = false;
1745 
1746  pgfdw_reset_xact_state(entry, true);
1747  }
1748 
1749  /* No further work if no pending entries */
1750  if (!pending_deallocs)
1751  return;
1752 
1753  /*
1754  * Get the result of the DEALLOCATE command for each of the pending
1755  * entries
1756  */
1757  foreach(lc, pending_deallocs)
1758  {
1759  PGresult *res;
1760 
1761  entry = (ConnCacheEntry *) lfirst(lc);
1762 
1763  /* Ignore errors (see notes in pgfdw_xact_callback) */
1764  while ((res = PQgetResult(entry->conn)) != NULL)
1765  {
1766  PQclear(res);
1767  /* Stop if the connection is lost (else we'll loop infinitely) */
1768  if (PQstatus(entry->conn) == CONNECTION_BAD)
1769  break;
1770  }
1771  entry->have_prep_stmt = false;
1772  entry->have_error = false;
1773 
1774  pgfdw_reset_xact_state(entry, true);
1775  }
1776 }
1777 
1778 /*
1779  * Finish pre-subcommit cleanup of connections on each of which we've sent a
1780  * RELEASE command to the remote server.
1781  */
1782 static void
1783 pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1784 {
1785  ConnCacheEntry *entry;
1786  char sql[100];
1787  ListCell *lc;
1788 
1789  Assert(pending_entries);
1790 
1791  /*
1792  * Get the result of the RELEASE command for each of the pending entries
1793  */
1794  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1795  foreach(lc, pending_entries)
1796  {
1797  entry = (ConnCacheEntry *) lfirst(lc);
1798 
1799  Assert(entry->changing_xact_state);
1800 
1801  /*
1802  * We might already have received the result on the socket, so pass
1803  * consume_input=true to try to consume it first
1804  */
1805  do_sql_command_end(entry->conn, sql, true);
1806  entry->changing_xact_state = false;
1807 
1808  pgfdw_reset_xact_state(entry, false);
1809  }
1810 }
1811 
1812 /*
1813  * Finish abort cleanup of connections on each of which we've sent an abort
1814  * command or cancel request to the remote server.
1815  */
1816 static void
1817 pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1818  bool toplevel)
1819 {
1820  List *pending_deallocs = NIL;
1821  ListCell *lc;
1822 
1823  /*
1824  * For each of the pending cancel requests (if any), get and discard the
1825  * result of the query, and submit an abort command to the remote server.
1826  */
1827  if (cancel_requested)
1828  {
1829  foreach(lc, cancel_requested)
1830  {
1831  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1832  TimestampTz endtime;
1833  char sql[100];
1834 
1835  Assert(entry->changing_xact_state);
1836 
1837  /*
1838  * Set end time. You might think we should do this before issuing
1839  * cancel request like in normal mode, but that is problematic,
1840  * because if, for example, it took longer than 30 seconds to
1841  * process the first few entries in the cancel_requested list, it
1842  * would cause a timeout error when processing each of the
1843  * remaining entries in the list, leading to slamming that entry's
1844  * connection shut.
1845  */
1848 
1849  if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
1850  {
1851  /* Unable to cancel running query */
1852  pgfdw_reset_xact_state(entry, toplevel);
1853  continue;
1854  }
1855 
1856  /* Send an abort command in parallel if needed */
1857  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1858  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1859  {
1860  /* Unable to abort remote (sub)transaction */
1861  pgfdw_reset_xact_state(entry, toplevel);
1862  }
1863  else
1864  pending_entries = lappend(pending_entries, entry);
1865  }
1866  }
1867 
1868  /* No further work if no pending entries */
1869  if (!pending_entries)
1870  return;
1871 
1872  /*
1873  * Get the result of the abort command for each of the pending entries
1874  */
1875  foreach(lc, pending_entries)
1876  {
1877  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1878  TimestampTz endtime;
1879  char sql[100];
1880 
1881  Assert(entry->changing_xact_state);
1882 
1883  /*
1884  * Set end time. We do this now, not before issuing the command like
1885  * in normal mode, for the same reason as for the cancel_requested
1886  * entries.
1887  */
1890 
1891  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1892  if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
1893  true, false))
1894  {
1895  /* Unable to abort remote (sub)transaction */
1896  pgfdw_reset_xact_state(entry, toplevel);
1897  continue;
1898  }
1899 
1900  if (toplevel)
1901  {
1902  /* Do a DEALLOCATE ALL in parallel if needed */
1903  if (entry->have_prep_stmt && entry->have_error)
1904  {
1906  "DEALLOCATE ALL"))
1907  {
1908  /* Trouble clearing prepared statements */
1909  pgfdw_reset_xact_state(entry, toplevel);
1910  }
1911  else
1912  pending_deallocs = lappend(pending_deallocs, entry);
1913  continue;
1914  }
1915  entry->have_prep_stmt = false;
1916  entry->have_error = false;
1917  }
1918 
1919  /* Reset the per-connection state if needed */
1920  if (entry->state.pendingAreq)
1921  memset(&entry->state, 0, sizeof(entry->state));
1922 
1923  /* We're done with this entry; unset the changing_xact_state flag */
1924  entry->changing_xact_state = false;
1925  pgfdw_reset_xact_state(entry, toplevel);
1926  }
1927 
1928  /* No further work if no pending entries */
1929  if (!pending_deallocs)
1930  return;
1931  Assert(toplevel);
1932 
1933  /*
1934  * Get the result of the DEALLOCATE command for each of the pending
1935  * entries
1936  */
1937  foreach(lc, pending_deallocs)
1938  {
1939  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1940  TimestampTz endtime;
1941 
1942  Assert(entry->changing_xact_state);
1943  Assert(entry->have_prep_stmt);
1944  Assert(entry->have_error);
1945 
1946  /*
1947  * Set end time. We do this now, not before issuing the command like
1948  * in normal mode, for the same reason as for the cancel_requested
1949  * entries.
1950  */
1953 
1954  if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
1955  endtime, true, true))
1956  {
1957  /* Trouble clearing prepared statements */
1958  pgfdw_reset_xact_state(entry, toplevel);
1959  continue;
1960  }
1961  entry->have_prep_stmt = false;
1962  entry->have_error = false;
1963 
1964  /* Reset the per-connection state if needed */
1965  if (entry->state.pendingAreq)
1966  memset(&entry->state, 0, sizeof(entry->state));
1967 
1968  /* We're done with this entry; unset the changing_xact_state flag */
1969  entry->changing_xact_state = false;
1970  pgfdw_reset_xact_state(entry, toplevel);
1971  }
1972 }
1973 
1974 /*
1975  * List active foreign server connections.
1976  *
1977  * This function takes no input parameter and returns setof record made of
1978  * following values:
1979  * - server_name - server name of active connection. In case the foreign server
1980  * is dropped but still the connection is active, then the server name will
1981  * be NULL in output.
1982  * - valid - true/false representing whether the connection is valid or not.
1983  * Note that the connections can get invalidated in pgfdw_inval_callback.
1984  *
1985  * No records are returned when there are no cached connections at all.
1986  */
1987 Datum
1989 {
1990 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1991  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1992  HASH_SEQ_STATUS scan;
1993  ConnCacheEntry *entry;
1994 
1995  InitMaterializedSRF(fcinfo, 0);
1996 
1997  /* If cache doesn't exist, we return no records */
1998  if (!ConnectionHash)
1999  PG_RETURN_VOID();
2000 
2001  hash_seq_init(&scan, ConnectionHash);
2002  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2003  {
2004  ForeignServer *server;
2006  bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2007 
2008  /* We only look for open remote connections */
2009  if (!entry->conn)
2010  continue;
2011 
2012  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2013 
2014  /*
2015  * The foreign server may have been dropped in current explicit
2016  * transaction. It is not possible to drop the server from another
2017  * session when the connection associated with it is in use in the
2018  * current transaction, if tried so, the drop query in another session
2019  * blocks until the current transaction finishes.
2020  *
2021  * Even though the server is dropped in the current transaction, the
2022  * cache can still have associated active connection entry, say we
2023  * call such connections dangling. Since we can not fetch the server
2024  * name from system catalogs for dangling connections, instead we show
2025  * NULL value for server name in output.
2026  *
2027  * We could have done better by storing the server name in the cache
2028  * entry instead of server oid so that it could be used in the output.
2029  * But the server name in each cache entry requires 64 bytes of
2030  * memory, which is huge, when there are many cached connections and
2031  * the use case i.e. dropping the foreign server within the explicit
2032  * current transaction seems rare. So, we chose to show NULL value for
2033  * server name in output.
2034  *
2035  * Such dangling connections get closed either in next use or at the
2036  * end of current explicit transaction in pgfdw_xact_callback.
2037  */
2038  if (!server)
2039  {
2040  /*
2041  * If the server has been dropped in the current explicit
2042  * transaction, then this entry would have been invalidated in
2043  * pgfdw_inval_callback at the end of drop server command. Note
2044  * that this connection would not have been closed in
2045  * pgfdw_inval_callback because it is still being used in the
2046  * current explicit transaction. So, assert that here.
2047  */
2048  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2049 
2050  /* Show null, if no server name was found */
2051  nulls[0] = true;
2052  }
2053  else
2054  values[0] = CStringGetTextDatum(server->servername);
2055 
2056  values[1] = BoolGetDatum(!entry->invalidated);
2057 
2058  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2059  }
2060 
2061  PG_RETURN_VOID();
2062 }
2063 
2064 /*
2065  * Disconnect the specified cached connections.
2066  *
2067  * This function discards the open connections that are established by
2068  * postgres_fdw from the local session to the foreign server with
2069  * the given name. Note that there can be multiple connections to
2070  * the given server using different user mappings. If the connections
2071  * are used in the current local transaction, they are not disconnected
2072  * and warning messages are reported. This function returns true
2073  * if it disconnects at least one connection, otherwise false. If no
2074  * foreign server with the given name is found, an error is reported.
2075  */
2076 Datum
2078 {
2079  ForeignServer *server;
2080  char *servername;
2081 
2082  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2083  server = GetForeignServerByName(servername, false);
2084 
2086 }
2087 
2088 /*
2089  * Disconnect all the cached connections.
2090  *
2091  * This function discards all the open connections that are established by
2092  * postgres_fdw from the local session to the foreign servers.
2093  * If the connections are used in the current local transaction, they are
2094  * not disconnected and warning messages are reported. This function
2095  * returns true if it disconnects at least one connection, otherwise false.
2096  */
2097 Datum
2099 {
2101 }
2102 
2103 /*
2104  * Workhorse to disconnect cached connections.
2105  *
2106  * This function scans all the connection cache entries and disconnects
2107  * the open connections whose foreign server OID matches with
2108  * the specified one. If InvalidOid is specified, it disconnects all
2109  * the cached connections.
2110  *
2111  * This function emits a warning for each connection that's used in
2112  * the current transaction and doesn't close it. It returns true if
2113  * it disconnects at least one connection, otherwise false.
2114  *
2115  * Note that this function disconnects even the connections that are
2116  * established by other users in the same local session using different
2117  * user mappings. This leads even non-superuser to be able to close
2118  * the connections established by superusers in the same local session.
2119  *
2120  * XXX As of now we don't see any security risk doing this. But we should
2121  * set some restrictions on that, for example, prevent non-superuser
2122  * from closing the connections established by superusers even
2123  * in the same session?
2124  */
2125 static bool
2127 {
2128  HASH_SEQ_STATUS scan;
2129  ConnCacheEntry *entry;
2130  bool all = !OidIsValid(serverid);
2131  bool result = false;
2132 
2133  /*
2134  * Connection cache hashtable has not been initialized yet in this
2135  * session, so return false.
2136  */
2137  if (!ConnectionHash)
2138  return false;
2139 
2140  hash_seq_init(&scan, ConnectionHash);
2141  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2142  {
2143  /* Ignore cache entry if no open connection right now. */
2144  if (!entry->conn)
2145  continue;
2146 
2147  if (all || entry->serverid == serverid)
2148  {
2149  /*
2150  * Emit a warning because the connection to close is used in the
2151  * current transaction and cannot be disconnected right now.
2152  */
2153  if (entry->xact_depth > 0)
2154  {
2155  ForeignServer *server;
2156 
2157  server = GetForeignServerExtended(entry->serverid,
2158  FSV_MISSING_OK);
2159 
2160  if (!server)
2161  {
2162  /*
2163  * If the foreign server was dropped while its connection
2164  * was used in the current transaction, the connection
2165  * must have been marked as invalid by
2166  * pgfdw_inval_callback at the end of DROP SERVER command.
2167  */
2168  Assert(entry->invalidated);
2169 
2170  ereport(WARNING,
2171  (errmsg("cannot close dropped server connection because it is still in use")));
2172  }
2173  else
2174  ereport(WARNING,
2175  (errmsg("cannot close connection for server \"%s\" because it is still in use",
2176  server->servername)));
2177  }
2178  else
2179  {
2180  elog(DEBUG3, "discarding connection %p", entry->conn);
2181  disconnect_pg_server(entry);
2182  result = true;
2183  }
2184  }
2185  }
2186 
2187  return result;
2188 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1766
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
bool be_gssapi_get_delegation(Port *port)
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define CStringGetTextDatum(s)
Definition: builtins.h:97
unsigned int uint32
Definition: c.h:493
uint32 SubTransactionId
Definition: c.h:643
#define OidIsValid(objectId)
Definition: c.h:762
Oid ConnCacheKey
Definition: connection.c:51
static unsigned int prep_stmt_number
Definition: connection.c:81
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:804
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:594
Datum postgres_fdw_get_connections(PG_FUNCTION_ARGS)
Definition: connection.c:1988
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:697
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
Definition: connection.c:1783
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:580
void ReleaseConnection(PGconn *conn)
Definition: connection.c:783
static uint32 pgfdw_we_get_result
Definition: connection.c:89
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections)
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
Definition: connection.c:1333
static void pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, bool toplevel)
Definition: connection.c:1817
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1262
static void configure_remote_session(PGconn *conn)
Definition: connection.c:661
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:869
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
Definition: connection.c:833
static bool xact_got_connection
Definition: connection.c:84
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
struct ConnCacheEntry ConnCacheEntry
Datum postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
Definition: connection.c:2098
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
Definition: connection.c:1346
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
Definition: connection.c:711
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
Definition: connection.c:1489
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
Definition: connection.c:99
static uint32 pgfdw_we_cleanup_result
Definition: connection.c:87
static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
Definition: connection.c:1651
static HTAB * ConnectionHash
Definition: connection.c:77
static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
Definition: connection.c:1434
static unsigned int cursor_number
Definition: connection.c:80
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:340
static void pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
Definition: connection.c:406
Datum postgres_fdw_disconnect(PG_FUNCTION_ARGS)
Definition: connection.c:2077
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:1075
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:444
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1398
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:177
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:818
static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
Definition: connection.c:1418
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1238
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:618
static uint32 pgfdw_we_connect
Definition: connection.c:88
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1188
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:925
#define CONNECTION_CLEANUP_TIMEOUT
Definition: connection.c:96
static void do_sql_command_begin(PGconn *conn, const char *sql)
Definition: connection.c:704
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1577
PGresult * pgfdw_get_result(PGconn *conn)
Definition: connection.c:850
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:740
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1307
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Definition: connection.c:1709
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:2126
char * process_pgfdw_appname(const char *appname)
Definition: option.c:491
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:414
char * pgfdw_application_name
Definition: option.c:52
int64 TimestampTz
Definition: timestamp.h:39
bool defGetBoolean(DefElem *def)
Definition: define.c:107
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1395
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1159
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1779
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1232
int errdetail(const char *fmt,...)
Definition: elog.c:1205
void FlushErrorState(void)
Definition: elog.c:1828
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
bool in_error_recursion_trouble(void)
Definition: elog.c:297
ErrorData * CopyErrorData(void)
Definition: elog.c:1723
#define PG_RE_THROW()
Definition: elog.h:411
#define errcontext
Definition: elog.h:196
#define DEBUG3
Definition: elog.h:28
#define PG_TRY(...)
Definition: elog.h:370
#define WARNING
Definition: elog.h:36
#define PG_END_TRY(...)
Definition: elog.h:395
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define elog(elevel,...)
Definition: elog.h:224
#define PG_FINALLY(...)
Definition: elog.h:387
#define ereport(elevel,...)
Definition: elog.h:149
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6938
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6903
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7014
int PQconnectionUsedGSSAPI(const PGconn *conn)
Definition: fe-connect.c:7025
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6948
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6895
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6974
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3371
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1960
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1425
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2007
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3426
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2038
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:122
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:110
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:181
#define FSV_MISSING_OK
Definition: foreign.h:61
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
struct Port * MyProcPort
Definition: globals.c:49
struct Latch * MyLatch
Definition: globals.c:60
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1516
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
static void libpqsrv_disconnect(PGconn *conn)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
static char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
@ CONNECTION_BAD
Definition: libpq-fe.h:61
@ CONNECTION_OK
Definition: libpq-fe.h:60
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
@ PQTRANS_IDLE
Definition: libpq-fe.h:121
@ PQTRANS_ACTIVE
Definition: libpq-fe.h:122
Assert(fmt[strlen(fmt) - 1] !='\n')
exit(1)
List * lappend(List *list, void *datum)
Definition: list.c:339
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1267
char * pchomp(const char *in)
Definition: mcxt.c:1711
void pfree(void *pointer)
Definition: mcxt.c:1508
MemoryContext CurrentMemoryContext
Definition: mcxt.c:131
void * palloc(Size size)
Definition: mcxt.c:1304
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void * arg
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
static char * user
Definition: pg_regress.c:120
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:59
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:57
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:58
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:63
void process_pending_request(AsyncRequest *areq)
MemoryContextSwitchTo(old_ctx)
tree ctl
Definition: radixtree.h:1807
PGconn * conn
Definition: streamutil.c:55
PGconn * conn
Definition: connection.c:56
bool have_prep_stmt
Definition: connection.c:60
PgFdwConnState state
Definition: connection.c:71
bool invalidated
Definition: connection.c:65
ConnCacheKey key
Definition: connection.c:55
bool parallel_commit
Definition: connection.c:63
uint32 server_hashvalue
Definition: connection.c:69
uint32 mapping_hashvalue
Definition: connection.c:70
bool keep_connections
Definition: connection.c:66
bool parallel_abort
Definition: connection.c:64
bool changing_xact_state
Definition: connection.c:62
char * defname
Definition: parsenodes.h:811
int sqlerrcode
Definition: elog.h:438
List * options
Definition: foreign.h:42
char * servername
Definition: foreign.h:39
Oid serverid
Definition: foreign.h:36
Definition: dynahash.c:220
Definition: pg_list.h:54
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:138
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
Definition: regguts.h:323
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:113
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:750
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
char * text_to_cstring(const text *t)
Definition: varlena.c:217
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition: wait_event.c:162
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:915
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3726
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3786
SubXactEvent
Definition: xact.h:141
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition: xact.h:145
@ SUBXACT_EVENT_ABORT_SUB
Definition: xact.h:144
XactEvent
Definition: xact.h:127
@ XACT_EVENT_PRE_PREPARE
Definition: xact.h:135
@ XACT_EVENT_COMMIT
Definition: xact.h:128
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition: xact.h:134
@ XACT_EVENT_PARALLEL_COMMIT
Definition: xact.h:129
@ XACT_EVENT_ABORT
Definition: xact.h:130
@ XACT_EVENT_PRE_COMMIT
Definition: xact.h:133
@ XACT_EVENT_PARALLEL_ABORT
Definition: xact.h:131
@ XACT_EVENT_PREPARE
Definition: xact.h:132
#define IsolationIsSerializable()
Definition: xact.h:52