PostgreSQL Source Code  git master
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
18 #include "commands/defrem.h"
19 #include "funcapi.h"
20 #include "libpq/libpq-be.h"
22 #include "mb/pg_wchar.h"
23 #include "miscadmin.h"
24 #include "pgstat.h"
25 #include "postgres_fdw.h"
26 #include "storage/fd.h"
27 #include "storage/latch.h"
28 #include "utils/builtins.h"
29 #include "utils/datetime.h"
30 #include "utils/hsearch.h"
31 #include "utils/inval.h"
32 #include "utils/memutils.h"
33 #include "utils/syscache.h"
34 
35 /*
36  * Connection cache hash table entry
37  *
38  * The lookup key in this hash table is the user mapping OID. We use just one
39  * connection per user mapping ID, which ensures that all the scans use the
40  * same snapshot during a query. Using the user mapping OID rather than
41  * the foreign server OID + user OID avoids creating multiple connections when
42  * the public user mapping applies to all user OIDs.
43  *
44  * The "conn" pointer can be NULL if we don't currently have a live connection.
45  * When we do have a connection, xact_depth tracks the current depth of
46  * transactions and subtransactions open on the remote side. We need to issue
47  * commands at the same nesting depth on the remote as we're executing at
48  * ourselves, so that rolling back a subtransaction will kill the right
49  * queries and not the wrong ones.
50  */
51 typedef Oid ConnCacheKey;
52 
53 typedef struct ConnCacheEntry
54 {
55  ConnCacheKey key; /* hash key (must be first) */
56  PGconn *conn; /* connection to foreign server, or NULL */
57  /* Remaining fields are invalid when conn is NULL: */
58  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
59  * one level of subxact open, etc */
60  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
61  bool have_error; /* have any subxacts aborted in this xact? */
62  bool changing_xact_state; /* xact state change in process */
63  bool parallel_commit; /* do we commit (sub)xacts in parallel? */
64  bool parallel_abort; /* do we abort (sub)xacts in parallel? */
65  bool invalidated; /* true if reconnect is pending */
66  bool keep_connections; /* setting value of keep_connections
67  * server option */
68  Oid serverid; /* foreign server OID used to get server name */
69  uint32 server_hashvalue; /* hash value of foreign server OID */
70  uint32 mapping_hashvalue; /* hash value of user mapping OID */
71  PgFdwConnState state; /* extra per-connection state */
73 
74 /*
75  * Connection cache (initialized on first use)
76  */
77 static HTAB *ConnectionHash = NULL;
78 
79 /* for assigning cursor numbers and prepared statement numbers */
80 static unsigned int cursor_number = 0;
81 static unsigned int prep_stmt_number = 0;
82 
83 /* tracks whether any work is needed in callback functions */
84 static bool xact_got_connection = false;
85 
86 /* custom wait event values, retrieved from shared memory */
90 
91 /*
92  * Milliseconds to wait to cancel an in-progress query or execute a cleanup
93  * query; if it takes longer than 30 seconds to do these, we assume the
94  * connection is dead.
95  */
96 #define CONNECTION_CLEANUP_TIMEOUT 30000
97 
98 /* Macro for constructing abort command to be sent */
99 #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
100  do { \
101  if (toplevel) \
102  snprintf((sql), sizeof(sql), \
103  "ABORT TRANSACTION"); \
104  else \
105  snprintf((sql), sizeof(sql), \
106  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
107  (entry)->xact_depth, (entry)->xact_depth); \
108  } while(0)
109 
110 /*
111  * SQL functions
112  */
116 
117 /* prototypes of private functions */
118 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
120 static void disconnect_pg_server(ConnCacheEntry *entry);
121 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
122 static void configure_remote_session(PGconn *conn);
123 static void do_sql_command_begin(PGconn *conn, const char *sql);
124 static void do_sql_command_end(PGconn *conn, const char *sql,
125  bool consume_input);
126 static void begin_remote_xact(ConnCacheEntry *entry);
127 static void pgfdw_xact_callback(XactEvent event, void *arg);
128 static void pgfdw_subxact_callback(SubXactEvent event,
129  SubTransactionId mySubid,
130  SubTransactionId parentSubid,
131  void *arg);
132 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
134 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
135 static bool pgfdw_cancel_query(PGconn *conn);
136 static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
137 static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
138  bool consume_input);
139 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
140  bool ignore_errors);
141 static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
142 static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
143  TimestampTz endtime,
144  bool consume_input,
145  bool ignore_errors);
146 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
147  PGresult **result, bool *timed_out);
148 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
149 static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
150  List **pending_entries,
151  List **cancel_requested);
152 static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
153 static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
154  int curlevel);
155 static void pgfdw_finish_abort_cleanup(List *pending_entries,
156  List *cancel_requested,
157  bool toplevel);
158 static void pgfdw_security_check(const char **keywords, const char **values,
161 static bool disconnect_cached_connections(Oid serverid);
162 
163 /*
164  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
165  * server with the user's authorization. A new connection is established
166  * if we don't already have a suitable one, and a transaction is opened at
167  * the right subtransaction nesting depth if we didn't do that already.
168  *
169  * will_prep_stmt must be true if caller intends to create any prepared
170  * statements. Since those don't go away automatically at transaction end
171  * (not even on error), we need this flag to cue manual cleanup.
172  *
173  * If state is not NULL, *state receives the per-connection state associated
174  * with the PGconn.
175  */
176 PGconn *
178 {
179  bool found;
180  bool retry = false;
181  ConnCacheEntry *entry;
184 
185  /* First time through, initialize connection cache hashtable */
186  if (ConnectionHash == NULL)
187  {
188  HASHCTL ctl;
189 
190  if (pgfdw_we_get_result == 0)
192  WaitEventExtensionNew("PostgresFdwGetResult");
193 
194  ctl.keysize = sizeof(ConnCacheKey);
195  ctl.entrysize = sizeof(ConnCacheEntry);
196  ConnectionHash = hash_create("postgres_fdw connections", 8,
197  &ctl,
199 
200  /*
201  * Register some callback functions that manage connection cleanup.
202  * This should be done just once in each backend.
203  */
206  CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
208  CacheRegisterSyscacheCallback(USERMAPPINGOID,
210  }
211 
212  /* Set flag that we did GetConnection during the current transaction */
213  xact_got_connection = true;
214 
215  /* Create hash key for the entry. Assume no pad bytes in key struct */
216  key = user->umid;
217 
218  /*
219  * Find or create cached entry for requested connection.
220  */
221  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
222  if (!found)
223  {
224  /*
225  * We need only clear "conn" here; remaining fields will be filled
226  * later when "conn" is set.
227  */
228  entry->conn = NULL;
229  }
230 
231  /* Reject further use of connections which failed abort cleanup. */
233 
234  /*
235  * If the connection needs to be remade due to invalidation, disconnect as
236  * soon as we're out of all transactions.
237  */
238  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
239  {
240  elog(DEBUG3, "closing connection %p for option changes to take effect",
241  entry->conn);
242  disconnect_pg_server(entry);
243  }
244 
245  /*
246  * If cache entry doesn't have a connection, we have to establish a new
247  * connection. (If connect_pg_server throws an error, the cache entry
248  * will remain in a valid empty state, ie conn == NULL.)
249  */
250  if (entry->conn == NULL)
251  make_new_connection(entry, user);
252 
253  /*
254  * We check the health of the cached connection here when using it. In
255  * cases where we're out of all transactions, if a broken connection is
256  * detected, we try to reestablish a new connection later.
257  */
258  PG_TRY();
259  {
260  /* Process a pending asynchronous request if any. */
261  if (entry->state.pendingAreq)
263  /* Start a new transaction or subtransaction if needed. */
264  begin_remote_xact(entry);
265  }
266  PG_CATCH();
267  {
269  ErrorData *errdata = CopyErrorData();
270 
271  /*
272  * Determine whether to try to reestablish the connection.
273  *
274  * After a broken connection is detected in libpq, any error other
275  * than connection failure (e.g., out-of-memory) can be thrown
276  * somewhere between return from libpq and the expected ereport() call
277  * in pgfdw_report_error(). In this case, since PQstatus() indicates
278  * CONNECTION_BAD, checking only PQstatus() causes the false detection
279  * of connection failure. To avoid this, we also verify that the
280  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
281  * checking only the sqlstate can cause another false detection
282  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
283  * for any libpq-originated error condition.
284  */
285  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
286  PQstatus(entry->conn) != CONNECTION_BAD ||
287  entry->xact_depth > 0)
288  {
289  MemoryContextSwitchTo(ecxt);
290  PG_RE_THROW();
291  }
292 
293  /* Clean up the error state */
294  FlushErrorState();
295  FreeErrorData(errdata);
296  errdata = NULL;
297 
298  retry = true;
299  }
300  PG_END_TRY();
301 
302  /*
303  * If a broken connection is detected, disconnect it, reestablish a new
304  * connection and retry a new remote transaction. If connection failure is
305  * reported again, we give up getting a connection.
306  */
307  if (retry)
308  {
309  Assert(entry->xact_depth == 0);
310 
311  ereport(DEBUG3,
312  (errmsg_internal("could not start remote transaction on connection %p",
313  entry->conn)),
314  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
315 
316  elog(DEBUG3, "closing connection %p to reestablish a new one",
317  entry->conn);
318  disconnect_pg_server(entry);
319 
320  make_new_connection(entry, user);
321 
322  begin_remote_xact(entry);
323  }
324 
325  /* Remember if caller will prepare statements */
326  entry->have_prep_stmt |= will_prep_stmt;
327 
328  /* If caller needs access to the per-connection state, return it. */
329  if (state)
330  *state = &entry->state;
331 
332  return entry->conn;
333 }
334 
335 /*
336  * Reset all transient state fields in the cached connection entry and
337  * establish new connection to the remote server.
338  */
339 static void
341 {
342  ForeignServer *server = GetForeignServer(user->serverid);
343  ListCell *lc;
344 
345  Assert(entry->conn == NULL);
346 
347  /* Reset all transient state fields, to be sure all are clean */
348  entry->xact_depth = 0;
349  entry->have_prep_stmt = false;
350  entry->have_error = false;
351  entry->changing_xact_state = false;
352  entry->invalidated = false;
353  entry->serverid = server->serverid;
354  entry->server_hashvalue =
355  GetSysCacheHashValue1(FOREIGNSERVEROID,
356  ObjectIdGetDatum(server->serverid));
357  entry->mapping_hashvalue =
358  GetSysCacheHashValue1(USERMAPPINGOID,
359  ObjectIdGetDatum(user->umid));
360  memset(&entry->state, 0, sizeof(entry->state));
361 
362  /*
363  * Determine whether to keep the connection that we're about to make here
364  * open even after the transaction using it ends, so that the subsequent
365  * transactions can re-use it.
366  *
367  * By default, all the connections to any foreign servers are kept open.
368  *
369  * Also determine whether to commit/abort (sub)transactions opened on the
370  * remote server in parallel at (sub)transaction end, which is disabled by
371  * default.
372  *
373  * Note: it's enough to determine these only when making a new connection
374  * because if these settings for it are changed, it will be closed and
375  * re-made later.
376  */
377  entry->keep_connections = true;
378  entry->parallel_commit = false;
379  entry->parallel_abort = false;
380  foreach(lc, server->options)
381  {
382  DefElem *def = (DefElem *) lfirst(lc);
383 
384  if (strcmp(def->defname, "keep_connections") == 0)
385  entry->keep_connections = defGetBoolean(def);
386  else if (strcmp(def->defname, "parallel_commit") == 0)
387  entry->parallel_commit = defGetBoolean(def);
388  else if (strcmp(def->defname, "parallel_abort") == 0)
389  entry->parallel_abort = defGetBoolean(def);
390  }
391 
392  /* Now try to make the connection */
393  entry->conn = connect_pg_server(server, user);
394 
395  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
396  entry->conn, server->servername, user->umid, user->userid);
397 }
398 
399 /*
400  * Check that non-superuser has used password or delegated credentials
401  * to establish connection; otherwise, he's piggybacking on the
402  * postgres server's user identity. See also dblink_security_check()
403  * in contrib/dblink and check_conn_params.
404  */
405 static void
406 pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
407 {
408  /* Superusers bypass the check */
409  if (superuser_arg(user->userid))
410  return;
411 
412 #ifdef ENABLE_GSS
413  /* Connected via GSSAPI with delegated credentials- all good. */
415  return;
416 #endif
417 
418  /* Ok if superuser set PW required false. */
420  return;
421 
422  /* Connected via PW, with PW required true, and provided non-empty PW. */
424  {
425  /* ok if params contain a non-empty password */
426  for (int i = 0; keywords[i] != NULL; i++)
427  {
428  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
429  return;
430  }
431  }
432 
433  ereport(ERROR,
434  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
435  errmsg("password or GSSAPI delegated credentials required"),
436  errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
437  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
438 }
439 
440 /*
441  * Connect to remote server using specified server and user mapping properties.
442  */
443 static PGconn *
445 {
446  PGconn *volatile conn = NULL;
447 
448  /*
449  * Use PG_TRY block to ensure closing connection on error.
450  */
451  PG_TRY();
452  {
453  const char **keywords;
454  const char **values;
455  char *appname = NULL;
456  int n;
457 
458  /*
459  * Construct connection params from generic options of ForeignServer
460  * and UserMapping. (Some of them might not be libpq options, in
461  * which case we'll just waste a few array slots.) Add 4 extra slots
462  * for application_name, fallback_application_name, client_encoding,
463  * end marker.
464  */
465  n = list_length(server->options) + list_length(user->options) + 4;
466  keywords = (const char **) palloc(n * sizeof(char *));
467  values = (const char **) palloc(n * sizeof(char *));
468 
469  n = 0;
470  n += ExtractConnectionOptions(server->options,
471  keywords + n, values + n);
472  n += ExtractConnectionOptions(user->options,
473  keywords + n, values + n);
474 
475  /*
476  * Use pgfdw_application_name as application_name if set.
477  *
478  * PQconnectdbParams() processes the parameter arrays from start to
479  * end. If any key word is repeated, the last value is used. Therefore
480  * note that pgfdw_application_name must be added to the arrays after
481  * options of ForeignServer are, so that it can override
482  * application_name set in ForeignServer.
483  */
485  {
486  keywords[n] = "application_name";
488  n++;
489  }
490 
491  /*
492  * Search the parameter arrays to find application_name setting, and
493  * replace escape sequences in it with status information if found.
494  * The arrays are searched backwards because the last value is used if
495  * application_name is repeatedly set.
496  */
497  for (int i = n - 1; i >= 0; i--)
498  {
499  if (strcmp(keywords[i], "application_name") == 0 &&
500  *(values[i]) != '\0')
501  {
502  /*
503  * Use this application_name setting if it's not empty string
504  * even after any escape sequences in it are replaced.
505  */
506  appname = process_pgfdw_appname(values[i]);
507  if (appname[0] != '\0')
508  {
509  values[i] = appname;
510  break;
511  }
512 
513  /*
514  * This empty application_name is not used, so we set
515  * values[i] to NULL and keep searching the array to find the
516  * next one.
517  */
518  values[i] = NULL;
519  pfree(appname);
520  appname = NULL;
521  }
522  }
523 
524  /* Use "postgres_fdw" as fallback_application_name */
525  keywords[n] = "fallback_application_name";
526  values[n] = "postgres_fdw";
527  n++;
528 
529  /* Set client_encoding so that libpq can convert encoding properly. */
530  keywords[n] = "client_encoding";
532  n++;
533 
534  keywords[n] = values[n] = NULL;
535 
536  /* verify the set of connection parameters */
537  check_conn_params(keywords, values, user);
538 
539  /* first time, allocate or get the custom wait event */
540  if (pgfdw_we_connect == 0)
541  pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
542 
543  /* OK to make connection */
544  conn = libpqsrv_connect_params(keywords, values,
545  false, /* expand_dbname */
547 
548  if (!conn || PQstatus(conn) != CONNECTION_OK)
549  ereport(ERROR,
550  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
551  errmsg("could not connect to server \"%s\"",
552  server->servername),
554 
555  /* Perform post-connection security checks */
556  pgfdw_security_check(keywords, values, user, conn);
557 
558  /* Prepare new session for use */
560 
561  if (appname != NULL)
562  pfree(appname);
563  pfree(keywords);
564  pfree(values);
565  }
566  PG_CATCH();
567  {
569  PG_RE_THROW();
570  }
571  PG_END_TRY();
572 
573  return conn;
574 }
575 
576 /*
577  * Disconnect any open connection for a connection cache entry.
578  */
579 static void
581 {
582  if (entry->conn != NULL)
583  {
584  libpqsrv_disconnect(entry->conn);
585  entry->conn = NULL;
586  }
587 }
588 
589 /*
590  * Return true if the password_required is defined and false for this user
591  * mapping, otherwise false. The mapping has been pre-validated.
592  */
593 static bool
595 {
596  ListCell *cell;
597 
598  foreach(cell, user->options)
599  {
600  DefElem *def = (DefElem *) lfirst(cell);
601 
602  if (strcmp(def->defname, "password_required") == 0)
603  return defGetBoolean(def);
604  }
605 
606  return true;
607 }
608 
609 /*
610  * For non-superusers, insist that the connstr specify a password or that the
611  * user provided their own GSSAPI delegated credentials. This
612  * prevents a password from being picked up from .pgpass, a service file, the
613  * environment, etc. We don't want the postgres user's passwords,
614  * certificates, etc to be accessible to non-superusers. (See also
615  * dblink_connstr_check in contrib/dblink.)
616  */
617 static void
618 check_conn_params(const char **keywords, const char **values, UserMapping *user)
619 {
620  int i;
621 
622  /* no check required if superuser */
623  if (superuser_arg(user->userid))
624  return;
625 
626 #ifdef ENABLE_GSS
627  /* ok if the user provided their own delegated credentials */
629  return;
630 #endif
631 
632  /* ok if params contain a non-empty password */
633  for (i = 0; keywords[i] != NULL; i++)
634  {
635  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
636  return;
637  }
638 
639  /* ok if the superuser explicitly said so at user mapping creation time */
641  return;
642 
643  ereport(ERROR,
644  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
645  errmsg("password or GSSAPI delegated credentials required"),
646  errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
647 }
648 
649 /*
650  * Issue SET commands to make sure remote session is configured properly.
651  *
652  * We do this just once at connection, assuming nothing will change the
653  * values later. Since we'll never send volatile function calls to the
654  * remote, there shouldn't be any way to break this assumption from our end.
655  * It's possible to think of ways to break it at the remote end, eg making
656  * a foreign table point to a view that includes a set_config call ---
657  * but once you admit the possibility of a malicious view definition,
658  * there are any number of ways to break things.
659  */
660 static void
662 {
663  int remoteversion = PQserverVersion(conn);
664 
665  /* Force the search path to contain only pg_catalog (see deparse.c) */
666  do_sql_command(conn, "SET search_path = pg_catalog");
667 
668  /*
669  * Set remote timezone; this is basically just cosmetic, since all
670  * transmitted and returned timestamptzs should specify a zone explicitly
671  * anyway. However it makes the regression test outputs more predictable.
672  *
673  * We don't risk setting remote zone equal to ours, since the remote
674  * server might use a different timezone database. Instead, use GMT
675  * (quoted, because very old servers are picky about case). That's
676  * guaranteed to work regardless of the remote's timezone database,
677  * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
678  */
679  do_sql_command(conn, "SET timezone = 'GMT'");
680 
681  /*
682  * Set values needed to ensure unambiguous data output from remote. (This
683  * logic should match what pg_dump does. See also set_transmission_modes
684  * in postgres_fdw.c.)
685  */
686  do_sql_command(conn, "SET datestyle = ISO");
687  if (remoteversion >= 80400)
688  do_sql_command(conn, "SET intervalstyle = postgres");
689  if (remoteversion >= 90000)
690  do_sql_command(conn, "SET extra_float_digits = 3");
691  else
692  do_sql_command(conn, "SET extra_float_digits = 2");
693 }
694 
695 /*
696  * Convenience subroutine to issue a non-data-returning SQL command to remote
697  */
698 void
699 do_sql_command(PGconn *conn, const char *sql)
700 {
702  do_sql_command_end(conn, sql, false);
703 }
704 
705 static void
706 do_sql_command_begin(PGconn *conn, const char *sql)
707 {
708  if (!PQsendQuery(conn, sql))
709  pgfdw_report_error(ERROR, NULL, conn, false, sql);
710 }
711 
712 static void
713 do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
714 {
715  PGresult *res;
716 
717  /*
718  * If requested, consume whatever data is available from the socket. (Note
719  * that if all data is available, this allows pgfdw_get_result to call
720  * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
721  * would be large compared to the overhead of PQconsumeInput.)
722  */
723  if (consume_input && !PQconsumeInput(conn))
724  pgfdw_report_error(ERROR, NULL, conn, false, sql);
727  pgfdw_report_error(ERROR, res, conn, true, sql);
728  PQclear(res);
729 }
730 
731 /*
732  * Start remote transaction or subtransaction, if needed.
733  *
734  * Note that we always use at least REPEATABLE READ in the remote session.
735  * This is so that, if a query initiates multiple scans of the same or
736  * different foreign tables, we will get snapshot-consistent results from
737  * those scans. A disadvantage is that we can't provide sane emulation of
738  * READ COMMITTED behavior --- it would be nice if we had some other way to
739  * control which remote queries share a snapshot.
740  */
741 static void
743 {
744  int curlevel = GetCurrentTransactionNestLevel();
745 
746  /* Start main transaction if we haven't yet */
747  if (entry->xact_depth <= 0)
748  {
749  const char *sql;
750 
751  elog(DEBUG3, "starting remote transaction on connection %p",
752  entry->conn);
753 
755  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
756  else
757  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
758  entry->changing_xact_state = true;
759  do_sql_command(entry->conn, sql);
760  entry->xact_depth = 1;
761  entry->changing_xact_state = false;
762  }
763 
764  /*
765  * If we're in a subtransaction, stack up savepoints to match our level.
766  * This ensures we can rollback just the desired effects when a
767  * subtransaction aborts.
768  */
769  while (entry->xact_depth < curlevel)
770  {
771  char sql[64];
772 
773  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
774  entry->changing_xact_state = true;
775  do_sql_command(entry->conn, sql);
776  entry->xact_depth++;
777  entry->changing_xact_state = false;
778  }
779 }
780 
781 /*
782  * Release connection reference count created by calling GetConnection.
783  */
784 void
786 {
787  /*
788  * Currently, we don't actually track connection references because all
789  * cleanup is managed on a transaction or subtransaction basis instead. So
790  * there's nothing to do here.
791  */
792 }
793 
794 /*
795  * Assign a "unique" number for a cursor.
796  *
797  * These really only need to be unique per connection within a transaction.
798  * For the moment we ignore the per-connection point and assign them across
799  * all connections in the transaction, but we ask for the connection to be
800  * supplied in case we want to refine that.
801  *
802  * Note that even if wraparound happens in a very long transaction, actual
803  * collisions are highly improbable; just be sure to use %u not %d to print.
804  */
805 unsigned int
807 {
808  return ++cursor_number;
809 }
810 
811 /*
812  * Assign a "unique" number for a prepared statement.
813  *
814  * This works much like GetCursorNumber, except that we never reset the counter
815  * within a session. That's because we can't be 100% sure we've gotten rid
816  * of all prepared statements on all connections, and it's not really worth
817  * increasing the risk of prepared-statement name collisions by resetting.
818  */
819 unsigned int
821 {
822  return ++prep_stmt_number;
823 }
824 
825 /*
826  * Submit a query and wait for the result.
827  *
828  * Since we don't use non-blocking mode, this can't process interrupts while
829  * pushing the query text to the server. That risk is relatively small, so we
830  * ignore that for now.
831  *
832  * Caller is responsible for the error handling on the result.
833  */
834 PGresult *
836 {
837  /* First, process a pending asynchronous request, if any. */
838  if (state && state->pendingAreq)
839  process_pending_request(state->pendingAreq);
840 
841  if (!PQsendQuery(conn, query))
842  return NULL;
843  return pgfdw_get_result(conn);
844 }
845 
846 /*
847  * Wrap libpqsrv_get_result_last(), adding wait event.
848  *
849  * Caller is responsible for the error handling on the result.
850  */
851 PGresult *
853 {
855 }
856 
857 /*
858  * Report an error we got from the remote server.
859  *
860  * elevel: error level to use (typically ERROR, but might be less)
861  * res: PGresult containing the error
862  * conn: connection we did the query on
863  * clear: if true, PQclear the result (otherwise caller will handle it)
864  * sql: NULL, or text of remote command we tried to execute
865  *
866  * Note: callers that choose not to throw ERROR for a remote error are
867  * responsible for making sure that the associated ConnCacheEntry gets
868  * marked with have_error = true.
869  */
870 void
872  bool clear, const char *sql)
873 {
874  /* If requested, PGresult must be released before leaving this function. */
875  PG_TRY();
876  {
877  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
878  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
879  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
880  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
881  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
882  int sqlstate;
883 
884  if (diag_sqlstate)
885  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
886  diag_sqlstate[1],
887  diag_sqlstate[2],
888  diag_sqlstate[3],
889  diag_sqlstate[4]);
890  else
891  sqlstate = ERRCODE_CONNECTION_FAILURE;
892 
893  /*
894  * If we don't get a message from the PGresult, try the PGconn. This
895  * is needed because for connection-level failures, PQgetResult may
896  * just return NULL, not a PGresult at all.
897  */
898  if (message_primary == NULL)
899  message_primary = pchomp(PQerrorMessage(conn));
900 
901  ereport(elevel,
902  (errcode(sqlstate),
903  (message_primary != NULL && message_primary[0] != '\0') ?
904  errmsg_internal("%s", message_primary) :
905  errmsg("could not obtain message string for remote error"),
906  message_detail ? errdetail_internal("%s", message_detail) : 0,
907  message_hint ? errhint("%s", message_hint) : 0,
908  message_context ? errcontext("%s", message_context) : 0,
909  sql ? errcontext("remote SQL command: %s", sql) : 0));
910  }
911  PG_FINALLY();
912  {
913  if (clear)
914  PQclear(res);
915  }
916  PG_END_TRY();
917 }
918 
919 /*
920  * pgfdw_xact_callback --- cleanup at main-transaction end.
921  *
922  * This runs just late enough that it must not enter user-defined code
923  * locally. (Entering such code on the remote side is fine. Its remote
924  * COMMIT TRANSACTION may run deferred triggers.)
925  */
926 static void
928 {
929  HASH_SEQ_STATUS scan;
930  ConnCacheEntry *entry;
931  List *pending_entries = NIL;
932  List *cancel_requested = NIL;
933 
934  /* Quick exit if no connections were touched in this transaction. */
935  if (!xact_got_connection)
936  return;
937 
938  /*
939  * Scan all connection cache entries to find open remote transactions, and
940  * close them.
941  */
943  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
944  {
945  PGresult *res;
946 
947  /* Ignore cache entry if no open connection right now */
948  if (entry->conn == NULL)
949  continue;
950 
951  /* If it has an open remote transaction, try to close it */
952  if (entry->xact_depth > 0)
953  {
954  elog(DEBUG3, "closing remote transaction on connection %p",
955  entry->conn);
956 
957  switch (event)
958  {
961 
962  /*
963  * If abort cleanup previously failed for this connection,
964  * we can't issue any more commands against it.
965  */
967 
968  /* Commit all remote transactions during pre-commit */
969  entry->changing_xact_state = true;
970  if (entry->parallel_commit)
971  {
972  do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
973  pending_entries = lappend(pending_entries, entry);
974  continue;
975  }
976  do_sql_command(entry->conn, "COMMIT TRANSACTION");
977  entry->changing_xact_state = false;
978 
979  /*
980  * If there were any errors in subtransactions, and we
981  * made prepared statements, do a DEALLOCATE ALL to make
982  * sure we get rid of all prepared statements. This is
983  * annoying and not terribly bulletproof, but it's
984  * probably not worth trying harder.
985  *
986  * DEALLOCATE ALL only exists in 8.3 and later, so this
987  * constrains how old a server postgres_fdw can
988  * communicate with. We intentionally ignore errors in
989  * the DEALLOCATE, so that we can hobble along to some
990  * extent with older servers (leaking prepared statements
991  * as we go; but we don't really support update operations
992  * pre-8.3 anyway).
993  */
994  if (entry->have_prep_stmt && entry->have_error)
995  {
996  res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
997  NULL);
998  PQclear(res);
999  }
1000  entry->have_prep_stmt = false;
1001  entry->have_error = false;
1002  break;
1004 
1005  /*
1006  * We disallow any remote transactions, since it's not
1007  * very reasonable to hold them open until the prepared
1008  * transaction is committed. For the moment, throw error
1009  * unconditionally; later we might allow read-only cases.
1010  * Note that the error will cause us to come right back
1011  * here with event == XACT_EVENT_ABORT, so we'll clean up
1012  * the connection state at that point.
1013  */
1014  ereport(ERROR,
1015  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1016  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1017  break;
1019  case XACT_EVENT_COMMIT:
1020  case XACT_EVENT_PREPARE:
1021  /* Pre-commit should have closed the open transaction */
1022  elog(ERROR, "missed cleaning up connection during pre-commit");
1023  break;
1025  case XACT_EVENT_ABORT:
1026  /* Rollback all remote transactions during abort */
1027  if (entry->parallel_abort)
1028  {
1029  if (pgfdw_abort_cleanup_begin(entry, true,
1030  &pending_entries,
1031  &cancel_requested))
1032  continue;
1033  }
1034  else
1035  pgfdw_abort_cleanup(entry, true);
1036  break;
1037  }
1038  }
1039 
1040  /* Reset state to show we're out of a transaction */
1041  pgfdw_reset_xact_state(entry, true);
1042  }
1043 
1044  /* If there are any pending connections, finish cleaning them up */
1045  if (pending_entries || cancel_requested)
1046  {
1047  if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1048  event == XACT_EVENT_PRE_COMMIT)
1049  {
1050  Assert(cancel_requested == NIL);
1051  pgfdw_finish_pre_commit_cleanup(pending_entries);
1052  }
1053  else
1054  {
1055  Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1056  event == XACT_EVENT_ABORT);
1057  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1058  true);
1059  }
1060  }
1061 
1062  /*
1063  * Regardless of the event type, we can now mark ourselves as out of the
1064  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1065  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1066  */
1067  xact_got_connection = false;
1068 
1069  /* Also reset cursor numbering for next transaction */
1070  cursor_number = 0;
1071 }
1072 
1073 /*
1074  * pgfdw_subxact_callback --- cleanup at subtransaction end.
1075  */
1076 static void
1078  SubTransactionId parentSubid, void *arg)
1079 {
1080  HASH_SEQ_STATUS scan;
1081  ConnCacheEntry *entry;
1082  int curlevel;
1083  List *pending_entries = NIL;
1084  List *cancel_requested = NIL;
1085 
1086  /* Nothing to do at subxact start, nor after commit. */
1087  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1088  event == SUBXACT_EVENT_ABORT_SUB))
1089  return;
1090 
1091  /* Quick exit if no connections were touched in this transaction. */
1092  if (!xact_got_connection)
1093  return;
1094 
1095  /*
1096  * Scan all connection cache entries to find open remote subtransactions
1097  * of the current level, and close them.
1098  */
1099  curlevel = GetCurrentTransactionNestLevel();
1100  hash_seq_init(&scan, ConnectionHash);
1101  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1102  {
1103  char sql[100];
1104 
1105  /*
1106  * We only care about connections with open remote subtransactions of
1107  * the current level.
1108  */
1109  if (entry->conn == NULL || entry->xact_depth < curlevel)
1110  continue;
1111 
1112  if (entry->xact_depth > curlevel)
1113  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1114  entry->xact_depth);
1115 
1116  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1117  {
1118  /*
1119  * If abort cleanup previously failed for this connection, we
1120  * can't issue any more commands against it.
1121  */
1123 
1124  /* Commit all remote subtransactions during pre-commit */
1125  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1126  entry->changing_xact_state = true;
1127  if (entry->parallel_commit)
1128  {
1129  do_sql_command_begin(entry->conn, sql);
1130  pending_entries = lappend(pending_entries, entry);
1131  continue;
1132  }
1133  do_sql_command(entry->conn, sql);
1134  entry->changing_xact_state = false;
1135  }
1136  else
1137  {
1138  /* Rollback all remote subtransactions during abort */
1139  if (entry->parallel_abort)
1140  {
1141  if (pgfdw_abort_cleanup_begin(entry, false,
1142  &pending_entries,
1143  &cancel_requested))
1144  continue;
1145  }
1146  else
1147  pgfdw_abort_cleanup(entry, false);
1148  }
1149 
1150  /* OK, we're outta that level of subtransaction */
1151  pgfdw_reset_xact_state(entry, false);
1152  }
1153 
1154  /* If there are any pending connections, finish cleaning them up */
1155  if (pending_entries || cancel_requested)
1156  {
1157  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1158  {
1159  Assert(cancel_requested == NIL);
1160  pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1161  }
1162  else
1163  {
1164  Assert(event == SUBXACT_EVENT_ABORT_SUB);
1165  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1166  false);
1167  }
1168  }
1169 }
1170 
1171 /*
1172  * Connection invalidation callback function
1173  *
1174  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1175  * close connections depending on that entry immediately if current transaction
1176  * has not used those connections yet. Otherwise, mark those connections as
1177  * invalid and then make pgfdw_xact_callback() close them at the end of current
1178  * transaction, since they cannot be closed in the midst of the transaction
1179  * using them. Closed connections will be remade at the next opportunity if
1180  * necessary.
1181  *
1182  * Although most cache invalidation callbacks blow away all the related stuff
1183  * regardless of the given hashvalue, connections are expensive enough that
1184  * it's worth trying to avoid that.
1185  *
1186  * NB: We could avoid unnecessary disconnection more strictly by examining
1187  * individual option values, but it seems too much effort for the gain.
1188  */
1189 static void
1190 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1191 {
1192  HASH_SEQ_STATUS scan;
1193  ConnCacheEntry *entry;
1194 
1195  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1196 
1197  /* ConnectionHash must exist already, if we're registered */
1198  hash_seq_init(&scan, ConnectionHash);
1199  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1200  {
1201  /* Ignore invalid entries */
1202  if (entry->conn == NULL)
1203  continue;
1204 
1205  /* hashvalue == 0 means a cache reset, must clear all state */
1206  if (hashvalue == 0 ||
1207  (cacheid == FOREIGNSERVEROID &&
1208  entry->server_hashvalue == hashvalue) ||
1209  (cacheid == USERMAPPINGOID &&
1210  entry->mapping_hashvalue == hashvalue))
1211  {
1212  /*
1213  * Close the connection immediately if it's not used yet in this
1214  * transaction. Otherwise mark it as invalid so that
1215  * pgfdw_xact_callback() can close it at the end of this
1216  * transaction.
1217  */
1218  if (entry->xact_depth == 0)
1219  {
1220  elog(DEBUG3, "discarding connection %p", entry->conn);
1221  disconnect_pg_server(entry);
1222  }
1223  else
1224  entry->invalidated = true;
1225  }
1226  }
1227 }
1228 
1229 /*
1230  * Raise an error if the given connection cache entry is marked as being
1231  * in the middle of an xact state change. This should be called at which no
1232  * such change is expected to be in progress; if one is found to be in
1233  * progress, it means that we aborted in the middle of a previous state change
1234  * and now don't know what the remote transaction state actually is.
1235  * Such connections can't safely be further used. Re-establishing the
1236  * connection would change the snapshot and roll back any writes already
1237  * performed, so that's not an option, either. Thus, we must abort.
1238  */
1239 static void
1241 {
1242  ForeignServer *server;
1243 
1244  /* nothing to do for inactive entries and entries of sane state */
1245  if (entry->conn == NULL || !entry->changing_xact_state)
1246  return;
1247 
1248  /* make sure this entry is inactive */
1249  disconnect_pg_server(entry);
1250 
1251  /* find server name to be shown in the message below */
1252  server = GetForeignServer(entry->serverid);
1253 
1254  ereport(ERROR,
1255  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1256  errmsg("connection to server \"%s\" was lost",
1257  server->servername)));
1258 }
1259 
1260 /*
1261  * Reset state to show we're out of a (sub)transaction.
1262  */
1263 static void
1265 {
1266  if (toplevel)
1267  {
1268  /* Reset state to show we're out of a transaction */
1269  entry->xact_depth = 0;
1270 
1271  /*
1272  * If the connection isn't in a good idle state, it is marked as
1273  * invalid or keep_connections option of its server is disabled, then
1274  * discard it to recover. Next GetConnection will open a new
1275  * connection.
1276  */
1277  if (PQstatus(entry->conn) != CONNECTION_OK ||
1278  PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1279  entry->changing_xact_state ||
1280  entry->invalidated ||
1281  !entry->keep_connections)
1282  {
1283  elog(DEBUG3, "discarding connection %p", entry->conn);
1284  disconnect_pg_server(entry);
1285  }
1286  }
1287  else
1288  {
1289  /* Reset state to show we're out of a subtransaction */
1290  entry->xact_depth--;
1291  }
1292 }
1293 
1294 /*
1295  * Cancel the currently-in-progress query (whose query text we do not have)
1296  * and ignore the result. Returns true if we successfully cancel the query
1297  * and discard any pending result, and false if not.
1298  *
1299  * It's not a huge problem if we throw an ERROR here, but if we get into error
1300  * recursion trouble, we'll end up slamming the connection shut, which will
1301  * necessitate failing the entire toplevel transaction even if subtransactions
1302  * were used. Try to use WARNING where we can.
1303  *
1304  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1305  * query text from the pendingAreq saved in the per-connection state, then
1306  * report the query using it.
1307  */
1308 static bool
1310 {
1311  TimestampTz endtime;
1312 
1313  /*
1314  * If it takes too long to cancel the query and discard the result, assume
1315  * the connection is dead.
1316  */
1319 
1320  if (!pgfdw_cancel_query_begin(conn, endtime))
1321  return false;
1322  return pgfdw_cancel_query_end(conn, endtime, false);
1323 }
1324 
1325 /*
1326  * Submit a cancel request to the given connection, waiting only until
1327  * the given time.
1328  *
1329  * We sleep interruptibly until we receive confirmation that the cancel
1330  * request has been accepted, and if it is, return true; if the timeout
1331  * lapses without that, or the request fails for whatever reason, return
1332  * false.
1333  */
1334 static bool
1336 {
1337  const char *errormsg = libpqsrv_cancel(conn, endtime);
1338 
1339  if (errormsg != NULL)
1340  ereport(WARNING,
1341  errcode(ERRCODE_CONNECTION_FAILURE),
1342  errmsg("could not send cancel request: %s", errormsg));
1343 
1344  return errormsg == NULL;
1345 }
1346 
1347 static bool
1348 pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
1349 {
1350  PGresult *result = NULL;
1351  bool timed_out;
1352 
1353  /*
1354  * If requested, consume whatever data is available from the socket. (Note
1355  * that if all data is available, this allows pgfdw_get_cleanup_result to
1356  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1357  * which would be large compared to the overhead of PQconsumeInput.)
1358  */
1359  if (consume_input && !PQconsumeInput(conn))
1360  {
1361  ereport(WARNING,
1362  (errcode(ERRCODE_CONNECTION_FAILURE),
1363  errmsg("could not get result of cancel request: %s",
1364  pchomp(PQerrorMessage(conn)))));
1365  return false;
1366  }
1367 
1368  /* Get and discard the result of the query. */
1369  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1370  {
1371  if (timed_out)
1372  ereport(WARNING,
1373  (errmsg("could not get result of cancel request due to timeout")));
1374  else
1375  ereport(WARNING,
1376  (errcode(ERRCODE_CONNECTION_FAILURE),
1377  errmsg("could not get result of cancel request: %s",
1378  pchomp(PQerrorMessage(conn)))));
1379 
1380  return false;
1381  }
1382  PQclear(result);
1383 
1384  return true;
1385 }
1386 
1387 /*
1388  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1389  * result. If the query is executed without error, the return value is true.
1390  * If the query is executed successfully but returns an error, the return
1391  * value is true if and only if ignore_errors is set. If the query can't be
1392  * sent or times out, the return value is false.
1393  *
1394  * It's not a huge problem if we throw an ERROR here, but if we get into error
1395  * recursion trouble, we'll end up slamming the connection shut, which will
1396  * necessitate failing the entire toplevel transaction even if subtransactions
1397  * were used. Try to use WARNING where we can.
1398  */
1399 static bool
1400 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1401 {
1402  TimestampTz endtime;
1403 
1404  /*
1405  * If it takes too long to execute a cleanup query, assume the connection
1406  * is dead. It's fairly likely that this is why we aborted in the first
1407  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1408  * be too long.
1409  */
1412 
1413  if (!pgfdw_exec_cleanup_query_begin(conn, query))
1414  return false;
1415  return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1416  false, ignore_errors);
1417 }
1418 
1419 static bool
1421 {
1422  Assert(query != NULL);
1423 
1424  /*
1425  * Submit a query. Since we don't use non-blocking mode, this also can
1426  * block. But its risk is relatively small, so we ignore that for now.
1427  */
1428  if (!PQsendQuery(conn, query))
1429  {
1430  pgfdw_report_error(WARNING, NULL, conn, false, query);
1431  return false;
1432  }
1433 
1434  return true;
1435 }
1436 
1437 static bool
1439  TimestampTz endtime, bool consume_input,
1440  bool ignore_errors)
1441 {
1442  PGresult *result = NULL;
1443  bool timed_out;
1444 
1445  Assert(query != NULL);
1446 
1447  /*
1448  * If requested, consume whatever data is available from the socket. (Note
1449  * that if all data is available, this allows pgfdw_get_cleanup_result to
1450  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1451  * which would be large compared to the overhead of PQconsumeInput.)
1452  */
1453  if (consume_input && !PQconsumeInput(conn))
1454  {
1455  pgfdw_report_error(WARNING, NULL, conn, false, query);
1456  return false;
1457  }
1458 
1459  /* Get the result of the query. */
1460  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1461  {
1462  if (timed_out)
1463  ereport(WARNING,
1464  (errmsg("could not get query result due to timeout"),
1465  errcontext("remote SQL command: %s", query)));
1466  else
1467  pgfdw_report_error(WARNING, NULL, conn, false, query);
1468 
1469  return false;
1470  }
1471 
1472  /* Issue a warning if not successful. */
1473  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1474  {
1475  pgfdw_report_error(WARNING, result, conn, true, query);
1476  return ignore_errors;
1477  }
1478  PQclear(result);
1479 
1480  return true;
1481 }
1482 
1483 /*
1484  * Get, during abort cleanup, the result of a query that is in progress. This
1485  * might be a query that is being interrupted by transaction abort, or it might
1486  * be a query that was initiated as part of transaction abort to get the remote
1487  * side back to the appropriate state.
1488  *
1489  * endtime is the time at which we should give up and assume the remote
1490  * side is dead. Returns true if the timeout expired or connection trouble
1491  * occurred, false otherwise. Sets *result except in case of a timeout.
1492  * Sets timed_out to true only when the timeout expired.
1493  */
1494 static bool
1496  bool *timed_out)
1497 {
1498  volatile bool failed = false;
1499  PGresult *volatile last_res = NULL;
1500 
1501  *timed_out = false;
1502 
1503  /* In what follows, do not leak any PGresults on an error. */
1504  PG_TRY();
1505  {
1506  for (;;)
1507  {
1508  PGresult *res;
1509 
1510  while (PQisBusy(conn))
1511  {
1512  int wc;
1514  long cur_timeout;
1515 
1516  /* If timeout has expired, give up, else get sleep time. */
1517  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1518  if (cur_timeout <= 0)
1519  {
1520  *timed_out = true;
1521  failed = true;
1522  goto exit;
1523  }
1524 
1525  /* first time, allocate or get the custom wait event */
1526  if (pgfdw_we_cleanup_result == 0)
1527  pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1528 
1529  /* Sleep until there's something to do */
1533  PQsocket(conn),
1534  cur_timeout, pgfdw_we_cleanup_result);
1536 
1538 
1539  /* Data available in socket? */
1540  if (wc & WL_SOCKET_READABLE)
1541  {
1542  if (!PQconsumeInput(conn))
1543  {
1544  /* connection trouble */
1545  failed = true;
1546  goto exit;
1547  }
1548  }
1549  }
1550 
1551  res = PQgetResult(conn);
1552  if (res == NULL)
1553  break; /* query is complete */
1554 
1555  PQclear(last_res);
1556  last_res = res;
1557  }
1558 exit: ;
1559  }
1560  PG_CATCH();
1561  {
1562  PQclear(last_res);
1563  PG_RE_THROW();
1564  }
1565  PG_END_TRY();
1566 
1567  if (failed)
1568  PQclear(last_res);
1569  else
1570  *result = last_res;
1571  return failed;
1572 }
1573 
1574 /*
1575  * Abort remote transaction or subtransaction.
1576  *
1577  * "toplevel" should be set to true if toplevel (main) transaction is
1578  * rollbacked, false otherwise.
1579  *
1580  * Set entry->changing_xact_state to false on success, true on failure.
1581  */
1582 static void
1583 pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1584 {
1585  char sql[100];
1586 
1587  /*
1588  * Don't try to clean up the connection if we're already in error
1589  * recursion trouble.
1590  */
1592  entry->changing_xact_state = true;
1593 
1594  /*
1595  * If connection is already unsalvageable, don't touch it further.
1596  */
1597  if (entry->changing_xact_state)
1598  return;
1599 
1600  /*
1601  * Mark this connection as in the process of changing transaction state.
1602  */
1603  entry->changing_xact_state = true;
1604 
1605  /* Assume we might have lost track of prepared statements */
1606  entry->have_error = true;
1607 
1608  /*
1609  * If a command has been submitted to the remote server by using an
1610  * asynchronous execution function, the command might not have yet
1611  * completed. Check to see if a command is still being processed by the
1612  * remote server, and if so, request cancellation of the command.
1613  */
1614  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1615  !pgfdw_cancel_query(entry->conn))
1616  return; /* Unable to cancel running query */
1617 
1618  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1619  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1620  return; /* Unable to abort remote (sub)transaction */
1621 
1622  if (toplevel)
1623  {
1624  if (entry->have_prep_stmt && entry->have_error &&
1626  "DEALLOCATE ALL",
1627  true))
1628  return; /* Trouble clearing prepared statements */
1629 
1630  entry->have_prep_stmt = false;
1631  entry->have_error = false;
1632  }
1633 
1634  /*
1635  * If pendingAreq of the per-connection state is not NULL, it means that
1636  * an asynchronous fetch begun by fetch_more_data_begin() was not done
1637  * successfully and thus the per-connection state was not reset in
1638  * fetch_more_data(); in that case reset the per-connection state here.
1639  */
1640  if (entry->state.pendingAreq)
1641  memset(&entry->state, 0, sizeof(entry->state));
1642 
1643  /* Disarm changing_xact_state if it all worked */
1644  entry->changing_xact_state = false;
1645 }
1646 
1647 /*
1648  * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1649  * don't wait for the result.
1650  *
1651  * Returns true if the abort command or cancel request is successfully issued,
1652  * false otherwise. If the abort command is successfully issued, the given
1653  * connection cache entry is appended to *pending_entries. Otherwise, if the
1654  * cancel request is successfully issued, it is appended to *cancel_requested.
1655  */
1656 static bool
1658  List **pending_entries, List **cancel_requested)
1659 {
1660  /*
1661  * Don't try to clean up the connection if we're already in error
1662  * recursion trouble.
1663  */
1665  entry->changing_xact_state = true;
1666 
1667  /*
1668  * If connection is already unsalvageable, don't touch it further.
1669  */
1670  if (entry->changing_xact_state)
1671  return false;
1672 
1673  /*
1674  * Mark this connection as in the process of changing transaction state.
1675  */
1676  entry->changing_xact_state = true;
1677 
1678  /* Assume we might have lost track of prepared statements */
1679  entry->have_error = true;
1680 
1681  /*
1682  * If a command has been submitted to the remote server by using an
1683  * asynchronous execution function, the command might not have yet
1684  * completed. Check to see if a command is still being processed by the
1685  * remote server, and if so, request cancellation of the command.
1686  */
1687  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1688  {
1689  TimestampTz endtime;
1690 
1693  if (!pgfdw_cancel_query_begin(entry->conn, endtime))
1694  return false; /* Unable to cancel running query */
1695  *cancel_requested = lappend(*cancel_requested, entry);
1696  }
1697  else
1698  {
1699  char sql[100];
1700 
1701  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1702  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1703  return false; /* Unable to abort remote transaction */
1704  *pending_entries = lappend(*pending_entries, entry);
1705  }
1706 
1707  return true;
1708 }
1709 
1710 /*
1711  * Finish pre-commit cleanup of connections on each of which we've sent a
1712  * COMMIT command to the remote server.
1713  */
1714 static void
1716 {
1717  ConnCacheEntry *entry;
1718  List *pending_deallocs = NIL;
1719  ListCell *lc;
1720 
1721  Assert(pending_entries);
1722 
1723  /*
1724  * Get the result of the COMMIT command for each of the pending entries
1725  */
1726  foreach(lc, pending_entries)
1727  {
1728  entry = (ConnCacheEntry *) lfirst(lc);
1729 
1730  Assert(entry->changing_xact_state);
1731 
1732  /*
1733  * We might already have received the result on the socket, so pass
1734  * consume_input=true to try to consume it first
1735  */
1736  do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1737  entry->changing_xact_state = false;
1738 
1739  /* Do a DEALLOCATE ALL in parallel if needed */
1740  if (entry->have_prep_stmt && entry->have_error)
1741  {
1742  /* Ignore errors (see notes in pgfdw_xact_callback) */
1743  if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1744  {
1745  pending_deallocs = lappend(pending_deallocs, entry);
1746  continue;
1747  }
1748  }
1749  entry->have_prep_stmt = false;
1750  entry->have_error = false;
1751 
1752  pgfdw_reset_xact_state(entry, true);
1753  }
1754 
1755  /* No further work if no pending entries */
1756  if (!pending_deallocs)
1757  return;
1758 
1759  /*
1760  * Get the result of the DEALLOCATE command for each of the pending
1761  * entries
1762  */
1763  foreach(lc, pending_deallocs)
1764  {
1765  PGresult *res;
1766 
1767  entry = (ConnCacheEntry *) lfirst(lc);
1768 
1769  /* Ignore errors (see notes in pgfdw_xact_callback) */
1770  while ((res = PQgetResult(entry->conn)) != NULL)
1771  {
1772  PQclear(res);
1773  /* Stop if the connection is lost (else we'll loop infinitely) */
1774  if (PQstatus(entry->conn) == CONNECTION_BAD)
1775  break;
1776  }
1777  entry->have_prep_stmt = false;
1778  entry->have_error = false;
1779 
1780  pgfdw_reset_xact_state(entry, true);
1781  }
1782 }
1783 
1784 /*
1785  * Finish pre-subcommit cleanup of connections on each of which we've sent a
1786  * RELEASE command to the remote server.
1787  */
1788 static void
1789 pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1790 {
1791  ConnCacheEntry *entry;
1792  char sql[100];
1793  ListCell *lc;
1794 
1795  Assert(pending_entries);
1796 
1797  /*
1798  * Get the result of the RELEASE command for each of the pending entries
1799  */
1800  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1801  foreach(lc, pending_entries)
1802  {
1803  entry = (ConnCacheEntry *) lfirst(lc);
1804 
1805  Assert(entry->changing_xact_state);
1806 
1807  /*
1808  * We might already have received the result on the socket, so pass
1809  * consume_input=true to try to consume it first
1810  */
1811  do_sql_command_end(entry->conn, sql, true);
1812  entry->changing_xact_state = false;
1813 
1814  pgfdw_reset_xact_state(entry, false);
1815  }
1816 }
1817 
1818 /*
1819  * Finish abort cleanup of connections on each of which we've sent an abort
1820  * command or cancel request to the remote server.
1821  */
1822 static void
1823 pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1824  bool toplevel)
1825 {
1826  List *pending_deallocs = NIL;
1827  ListCell *lc;
1828 
1829  /*
1830  * For each of the pending cancel requests (if any), get and discard the
1831  * result of the query, and submit an abort command to the remote server.
1832  */
1833  if (cancel_requested)
1834  {
1835  foreach(lc, cancel_requested)
1836  {
1837  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1838  TimestampTz endtime;
1839  char sql[100];
1840 
1841  Assert(entry->changing_xact_state);
1842 
1843  /*
1844  * Set end time. You might think we should do this before issuing
1845  * cancel request like in normal mode, but that is problematic,
1846  * because if, for example, it took longer than 30 seconds to
1847  * process the first few entries in the cancel_requested list, it
1848  * would cause a timeout error when processing each of the
1849  * remaining entries in the list, leading to slamming that entry's
1850  * connection shut.
1851  */
1854 
1855  if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
1856  {
1857  /* Unable to cancel running query */
1858  pgfdw_reset_xact_state(entry, toplevel);
1859  continue;
1860  }
1861 
1862  /* Send an abort command in parallel if needed */
1863  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1864  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1865  {
1866  /* Unable to abort remote (sub)transaction */
1867  pgfdw_reset_xact_state(entry, toplevel);
1868  }
1869  else
1870  pending_entries = lappend(pending_entries, entry);
1871  }
1872  }
1873 
1874  /* No further work if no pending entries */
1875  if (!pending_entries)
1876  return;
1877 
1878  /*
1879  * Get the result of the abort command for each of the pending entries
1880  */
1881  foreach(lc, pending_entries)
1882  {
1883  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1884  TimestampTz endtime;
1885  char sql[100];
1886 
1887  Assert(entry->changing_xact_state);
1888 
1889  /*
1890  * Set end time. We do this now, not before issuing the command like
1891  * in normal mode, for the same reason as for the cancel_requested
1892  * entries.
1893  */
1896 
1897  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1898  if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
1899  true, false))
1900  {
1901  /* Unable to abort remote (sub)transaction */
1902  pgfdw_reset_xact_state(entry, toplevel);
1903  continue;
1904  }
1905 
1906  if (toplevel)
1907  {
1908  /* Do a DEALLOCATE ALL in parallel if needed */
1909  if (entry->have_prep_stmt && entry->have_error)
1910  {
1912  "DEALLOCATE ALL"))
1913  {
1914  /* Trouble clearing prepared statements */
1915  pgfdw_reset_xact_state(entry, toplevel);
1916  }
1917  else
1918  pending_deallocs = lappend(pending_deallocs, entry);
1919  continue;
1920  }
1921  entry->have_prep_stmt = false;
1922  entry->have_error = false;
1923  }
1924 
1925  /* Reset the per-connection state if needed */
1926  if (entry->state.pendingAreq)
1927  memset(&entry->state, 0, sizeof(entry->state));
1928 
1929  /* We're done with this entry; unset the changing_xact_state flag */
1930  entry->changing_xact_state = false;
1931  pgfdw_reset_xact_state(entry, toplevel);
1932  }
1933 
1934  /* No further work if no pending entries */
1935  if (!pending_deallocs)
1936  return;
1937  Assert(toplevel);
1938 
1939  /*
1940  * Get the result of the DEALLOCATE command for each of the pending
1941  * entries
1942  */
1943  foreach(lc, pending_deallocs)
1944  {
1945  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1946  TimestampTz endtime;
1947 
1948  Assert(entry->changing_xact_state);
1949  Assert(entry->have_prep_stmt);
1950  Assert(entry->have_error);
1951 
1952  /*
1953  * Set end time. We do this now, not before issuing the command like
1954  * in normal mode, for the same reason as for the cancel_requested
1955  * entries.
1956  */
1959 
1960  if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
1961  endtime, true, true))
1962  {
1963  /* Trouble clearing prepared statements */
1964  pgfdw_reset_xact_state(entry, toplevel);
1965  continue;
1966  }
1967  entry->have_prep_stmt = false;
1968  entry->have_error = false;
1969 
1970  /* Reset the per-connection state if needed */
1971  if (entry->state.pendingAreq)
1972  memset(&entry->state, 0, sizeof(entry->state));
1973 
1974  /* We're done with this entry; unset the changing_xact_state flag */
1975  entry->changing_xact_state = false;
1976  pgfdw_reset_xact_state(entry, toplevel);
1977  }
1978 }
1979 
1980 /*
1981  * List active foreign server connections.
1982  *
1983  * This function takes no input parameter and returns setof record made of
1984  * following values:
1985  * - server_name - server name of active connection. In case the foreign server
1986  * is dropped but still the connection is active, then the server name will
1987  * be NULL in output.
1988  * - valid - true/false representing whether the connection is valid or not.
1989  * Note that the connections can get invalidated in pgfdw_inval_callback.
1990  *
1991  * No records are returned when there are no cached connections at all.
1992  */
1993 Datum
1995 {
1996 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1997  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1998  HASH_SEQ_STATUS scan;
1999  ConnCacheEntry *entry;
2000 
2001  InitMaterializedSRF(fcinfo, 0);
2002 
2003  /* If cache doesn't exist, we return no records */
2004  if (!ConnectionHash)
2005  PG_RETURN_VOID();
2006 
2007  hash_seq_init(&scan, ConnectionHash);
2008  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2009  {
2010  ForeignServer *server;
2012  bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2013 
2014  /* We only look for open remote connections */
2015  if (!entry->conn)
2016  continue;
2017 
2018  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2019 
2020  /*
2021  * The foreign server may have been dropped in current explicit
2022  * transaction. It is not possible to drop the server from another
2023  * session when the connection associated with it is in use in the
2024  * current transaction, if tried so, the drop query in another session
2025  * blocks until the current transaction finishes.
2026  *
2027  * Even though the server is dropped in the current transaction, the
2028  * cache can still have associated active connection entry, say we
2029  * call such connections dangling. Since we can not fetch the server
2030  * name from system catalogs for dangling connections, instead we show
2031  * NULL value for server name in output.
2032  *
2033  * We could have done better by storing the server name in the cache
2034  * entry instead of server oid so that it could be used in the output.
2035  * But the server name in each cache entry requires 64 bytes of
2036  * memory, which is huge, when there are many cached connections and
2037  * the use case i.e. dropping the foreign server within the explicit
2038  * current transaction seems rare. So, we chose to show NULL value for
2039  * server name in output.
2040  *
2041  * Such dangling connections get closed either in next use or at the
2042  * end of current explicit transaction in pgfdw_xact_callback.
2043  */
2044  if (!server)
2045  {
2046  /*
2047  * If the server has been dropped in the current explicit
2048  * transaction, then this entry would have been invalidated in
2049  * pgfdw_inval_callback at the end of drop server command. Note
2050  * that this connection would not have been closed in
2051  * pgfdw_inval_callback because it is still being used in the
2052  * current explicit transaction. So, assert that here.
2053  */
2054  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2055 
2056  /* Show null, if no server name was found */
2057  nulls[0] = true;
2058  }
2059  else
2060  values[0] = CStringGetTextDatum(server->servername);
2061 
2062  values[1] = BoolGetDatum(!entry->invalidated);
2063 
2064  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2065  }
2066 
2067  PG_RETURN_VOID();
2068 }
2069 
2070 /*
2071  * Disconnect the specified cached connections.
2072  *
2073  * This function discards the open connections that are established by
2074  * postgres_fdw from the local session to the foreign server with
2075  * the given name. Note that there can be multiple connections to
2076  * the given server using different user mappings. If the connections
2077  * are used in the current local transaction, they are not disconnected
2078  * and warning messages are reported. This function returns true
2079  * if it disconnects at least one connection, otherwise false. If no
2080  * foreign server with the given name is found, an error is reported.
2081  */
2082 Datum
2084 {
2085  ForeignServer *server;
2086  char *servername;
2087 
2088  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2089  server = GetForeignServerByName(servername, false);
2090 
2092 }
2093 
2094 /*
2095  * Disconnect all the cached connections.
2096  *
2097  * This function discards all the open connections that are established by
2098  * postgres_fdw from the local session to the foreign servers.
2099  * If the connections are used in the current local transaction, they are
2100  * not disconnected and warning messages are reported. This function
2101  * returns true if it disconnects at least one connection, otherwise false.
2102  */
2103 Datum
2105 {
2107 }
2108 
2109 /*
2110  * Workhorse to disconnect cached connections.
2111  *
2112  * This function scans all the connection cache entries and disconnects
2113  * the open connections whose foreign server OID matches with
2114  * the specified one. If InvalidOid is specified, it disconnects all
2115  * the cached connections.
2116  *
2117  * This function emits a warning for each connection that's used in
2118  * the current transaction and doesn't close it. It returns true if
2119  * it disconnects at least one connection, otherwise false.
2120  *
2121  * Note that this function disconnects even the connections that are
2122  * established by other users in the same local session using different
2123  * user mappings. This leads even non-superuser to be able to close
2124  * the connections established by superusers in the same local session.
2125  *
2126  * XXX As of now we don't see any security risk doing this. But we should
2127  * set some restrictions on that, for example, prevent non-superuser
2128  * from closing the connections established by superusers even
2129  * in the same session?
2130  */
2131 static bool
2133 {
2134  HASH_SEQ_STATUS scan;
2135  ConnCacheEntry *entry;
2136  bool all = !OidIsValid(serverid);
2137  bool result = false;
2138 
2139  /*
2140  * Connection cache hashtable has not been initialized yet in this
2141  * session, so return false.
2142  */
2143  if (!ConnectionHash)
2144  return false;
2145 
2146  hash_seq_init(&scan, ConnectionHash);
2147  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2148  {
2149  /* Ignore cache entry if no open connection right now. */
2150  if (!entry->conn)
2151  continue;
2152 
2153  if (all || entry->serverid == serverid)
2154  {
2155  /*
2156  * Emit a warning because the connection to close is used in the
2157  * current transaction and cannot be disconnected right now.
2158  */
2159  if (entry->xact_depth > 0)
2160  {
2161  ForeignServer *server;
2162 
2163  server = GetForeignServerExtended(entry->serverid,
2164  FSV_MISSING_OK);
2165 
2166  if (!server)
2167  {
2168  /*
2169  * If the foreign server was dropped while its connection
2170  * was used in the current transaction, the connection
2171  * must have been marked as invalid by
2172  * pgfdw_inval_callback at the end of DROP SERVER command.
2173  */
2174  Assert(entry->invalidated);
2175 
2176  ereport(WARNING,
2177  (errmsg("cannot close dropped server connection because it is still in use")));
2178  }
2179  else
2180  ereport(WARNING,
2181  (errmsg("cannot close connection for server \"%s\" because it is still in use",
2182  server->servername)));
2183  }
2184  else
2185  {
2186  elog(DEBUG3, "discarding connection %p", entry->conn);
2187  disconnect_pg_server(entry);
2188  result = true;
2189  }
2190  }
2191  }
2192 
2193  return result;
2194 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1767
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1655
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1619
bool be_gssapi_get_delegation(Port *port)
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define CStringGetTextDatum(s)
Definition: builtins.h:97
unsigned int uint32
Definition: c.h:506
uint32 SubTransactionId
Definition: c.h:656
#define Assert(condition)
Definition: c.h:858
#define OidIsValid(objectId)
Definition: c.h:775
Oid ConnCacheKey
Definition: connection.c:51
static unsigned int prep_stmt_number
Definition: connection.c:81
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:806
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:594
Datum postgres_fdw_get_connections(PG_FUNCTION_ARGS)
Definition: connection.c:1994
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:699
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
Definition: connection.c:1789
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:580
void ReleaseConnection(PGconn *conn)
Definition: connection.c:785
static uint32 pgfdw_we_get_result
Definition: connection.c:89
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections)
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
Definition: connection.c:1335
static void pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, bool toplevel)
Definition: connection.c:1823
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1264
static void configure_remote_session(PGconn *conn)
Definition: connection.c:661
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:871
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
Definition: connection.c:835
static bool xact_got_connection
Definition: connection.c:84
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
struct ConnCacheEntry ConnCacheEntry
Datum postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
Definition: connection.c:2104
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
Definition: connection.c:1348
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
Definition: connection.c:713
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
Definition: connection.c:1495
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
Definition: connection.c:99
static uint32 pgfdw_we_cleanup_result
Definition: connection.c:87
static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
Definition: connection.c:1657
static HTAB * ConnectionHash
Definition: connection.c:77
static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
Definition: connection.c:1438
static unsigned int cursor_number
Definition: connection.c:80
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:340
static void pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
Definition: connection.c:406
Datum postgres_fdw_disconnect(PG_FUNCTION_ARGS)
Definition: connection.c:2083
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:1077
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:444
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1400
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:177
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:820
static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
Definition: connection.c:1420
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1240
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:618
static uint32 pgfdw_we_connect
Definition: connection.c:88
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1190
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:927
#define CONNECTION_CLEANUP_TIMEOUT
Definition: connection.c:96
static void do_sql_command_begin(PGconn *conn, const char *sql)
Definition: connection.c:706
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1583
PGresult * pgfdw_get_result(PGconn *conn)
Definition: connection.c:852
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:742
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1309
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Definition: connection.c:1715
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:2132
char * process_pgfdw_appname(const char *appname)
Definition: option.c:491
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:414
char * pgfdw_application_name
Definition: option.c:52
int64 TimestampTz
Definition: timestamp.h:39
bool defGetBoolean(DefElem *def)
Definition: define.c:107
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1395
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1155
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1799
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1228
int errdetail(const char *fmt,...)
Definition: elog.c:1201
void FlushErrorState(void)
Definition: elog.c:1848
int errhint(const char *fmt,...)
Definition: elog.c:1315
int errcode(int sqlerrcode)
Definition: elog.c:855
int errmsg(const char *fmt,...)
Definition: elog.c:1068
bool in_error_recursion_trouble(void)
Definition: elog.c:295
ErrorData * CopyErrorData(void)
Definition: elog.c:1727
#define PG_RE_THROW()
Definition: elog.h:411
#define errcontext
Definition: elog.h:196
#define DEBUG3
Definition: elog.h:28
#define PG_TRY(...)
Definition: elog.h:370
#define WARNING
Definition: elog.h:36
#define PG_END_TRY(...)
Definition: elog.h:395
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define elog(elevel,...)
Definition: elog.h:224
#define PG_FINALLY(...)
Definition: elog.h:387
#define ereport(elevel,...)
Definition: elog.h:149
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7144
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:7109
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7220
int PQconnectionUsedGSSAPI(const PGconn *conn)
Definition: fe-connect.c:7231
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7154
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7101
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7180
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:122
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:110
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:181
#define FSV_MISSING_OK
Definition: foreign.h:61
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
struct Port * MyProcPort
Definition: globals.c:50
struct Latch * MyLatch
Definition: globals.c:61
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1516
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
static void libpqsrv_disconnect(PGconn *conn)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
@ CONNECTION_BAD
Definition: libpq-fe.h:61
@ CONNECTION_OK
Definition: libpq-fe.h:60
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:99
@ PQTRANS_IDLE
Definition: libpq-fe.h:121
@ PQTRANS_ACTIVE
Definition: libpq-fe.h:122
exit(1)
List * lappend(List *list, void *datum)
Definition: list.c:339
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1267
char * pchomp(const char *in)
Definition: mcxt.c:1724
void pfree(void *pointer)
Definition: mcxt.c:1521
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * palloc(Size size)
Definition: mcxt.c:1317
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void * arg
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
static char * user
Definition: pg_regress.c:120
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:59
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:57
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:58
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:63
void process_pending_request(AsyncRequest *areq)
MemoryContextSwitchTo(old_ctx)
tree ctl
Definition: radixtree.h:1853
PGconn * conn
Definition: streamutil.c:55
PGconn * conn
Definition: connection.c:56
bool have_prep_stmt
Definition: connection.c:60
PgFdwConnState state
Definition: connection.c:71
bool invalidated
Definition: connection.c:65
ConnCacheKey key
Definition: connection.c:55
bool parallel_commit
Definition: connection.c:63
uint32 server_hashvalue
Definition: connection.c:69
uint32 mapping_hashvalue
Definition: connection.c:70
bool keep_connections
Definition: connection.c:66
bool parallel_abort
Definition: connection.c:64
bool changing_xact_state
Definition: connection.c:62
char * defname
Definition: parsenodes.h:815
int sqlerrcode
Definition: elog.h:438
List * options
Definition: foreign.h:42
char * servername
Definition: foreign.h:39
Oid serverid
Definition: foreign.h:36
Definition: dynahash.c:220
Definition: pg_list.h:54
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:138
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
Definition: regguts.h:323
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:113
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:782
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
char * text_to_cstring(const text *t)
Definition: varlena.c:217
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition: wait_event.c:164
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:927
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3791
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3851
SubXactEvent
Definition: xact.h:141
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition: xact.h:145
@ SUBXACT_EVENT_ABORT_SUB
Definition: xact.h:144
XactEvent
Definition: xact.h:127
@ XACT_EVENT_PRE_PREPARE
Definition: xact.h:135
@ XACT_EVENT_COMMIT
Definition: xact.h:128
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition: xact.h:134
@ XACT_EVENT_PARALLEL_COMMIT
Definition: xact.h:129
@ XACT_EVENT_ABORT
Definition: xact.h:130
@ XACT_EVENT_PRE_COMMIT
Definition: xact.h:133
@ XACT_EVENT_PARALLEL_ABORT
Definition: xact.h:131
@ XACT_EVENT_PREPARE
Definition: xact.h:132
#define IsolationIsSerializable()
Definition: xact.h:52