PostgreSQL Source Code  git master
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
18 #include "commands/defrem.h"
19 #include "funcapi.h"
20 #include "libpq/libpq-be.h"
22 #include "mb/pg_wchar.h"
23 #include "miscadmin.h"
24 #include "pgstat.h"
25 #include "postgres_fdw.h"
26 #include "storage/fd.h"
27 #include "storage/latch.h"
28 #include "utils/builtins.h"
29 #include "utils/datetime.h"
30 #include "utils/hsearch.h"
31 #include "utils/inval.h"
32 #include "utils/memutils.h"
33 #include "utils/syscache.h"
34 
35 /*
36  * Connection cache hash table entry
37  *
38  * The lookup key in this hash table is the user mapping OID. We use just one
39  * connection per user mapping ID, which ensures that all the scans use the
40  * same snapshot during a query. Using the user mapping OID rather than
41  * the foreign server OID + user OID avoids creating multiple connections when
42  * the public user mapping applies to all user OIDs.
43  *
44  * The "conn" pointer can be NULL if we don't currently have a live connection.
45  * When we do have a connection, xact_depth tracks the current depth of
46  * transactions and subtransactions open on the remote side. We need to issue
47  * commands at the same nesting depth on the remote as we're executing at
48  * ourselves, so that rolling back a subtransaction will kill the right
49  * queries and not the wrong ones.
50  */
51 typedef Oid ConnCacheKey;
52 
53 typedef struct ConnCacheEntry
54 {
55  ConnCacheKey key; /* hash key (must be first) */
56  PGconn *conn; /* connection to foreign server, or NULL */
57  /* Remaining fields are invalid when conn is NULL: */
58  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
59  * one level of subxact open, etc */
60  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
61  bool have_error; /* have any subxacts aborted in this xact? */
62  bool changing_xact_state; /* xact state change in process */
63  bool parallel_commit; /* do we commit (sub)xacts in parallel? */
64  bool parallel_abort; /* do we abort (sub)xacts in parallel? */
65  bool invalidated; /* true if reconnect is pending */
66  bool keep_connections; /* setting value of keep_connections
67  * server option */
68  Oid serverid; /* foreign server OID used to get server name */
69  uint32 server_hashvalue; /* hash value of foreign server OID */
70  uint32 mapping_hashvalue; /* hash value of user mapping OID */
71  PgFdwConnState state; /* extra per-connection state */
73 
74 /*
75  * Connection cache (initialized on first use)
76  */
77 static HTAB *ConnectionHash = NULL;
78 
79 /* for assigning cursor numbers and prepared statement numbers */
80 static unsigned int cursor_number = 0;
81 static unsigned int prep_stmt_number = 0;
82 
83 /* tracks whether any work is needed in callback functions */
84 static bool xact_got_connection = false;
85 
86 /* custom wait event values, retrieved from shared memory */
90 
91 /*
92  * Milliseconds to wait to cancel an in-progress query or execute a cleanup
93  * query; if it takes longer than 30 seconds to do these, we assume the
94  * connection is dead.
95  */
96 #define CONNECTION_CLEANUP_TIMEOUT 30000
97 
98 /* Macro for constructing abort command to be sent */
99 #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
100  do { \
101  if (toplevel) \
102  snprintf((sql), sizeof(sql), \
103  "ABORT TRANSACTION"); \
104  else \
105  snprintf((sql), sizeof(sql), \
106  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
107  (entry)->xact_depth, (entry)->xact_depth); \
108  } while(0)
109 
110 /*
111  * SQL functions
112  */
116 
117 /* prototypes of private functions */
118 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
120 static void disconnect_pg_server(ConnCacheEntry *entry);
121 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
122 static void configure_remote_session(PGconn *conn);
123 static void do_sql_command_begin(PGconn *conn, const char *sql);
124 static void do_sql_command_end(PGconn *conn, const char *sql,
125  bool consume_input);
126 static void begin_remote_xact(ConnCacheEntry *entry);
127 static void pgfdw_xact_callback(XactEvent event, void *arg);
128 static void pgfdw_subxact_callback(SubXactEvent event,
129  SubTransactionId mySubid,
130  SubTransactionId parentSubid,
131  void *arg);
132 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
134 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
135 static bool pgfdw_cancel_query(PGconn *conn);
136 static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
137 static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
138  bool consume_input);
139 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
140  bool ignore_errors);
141 static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
142 static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
143  TimestampTz endtime,
144  bool consume_input,
145  bool ignore_errors);
146 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
147  PGresult **result, bool *timed_out);
148 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
149 static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
150  List **pending_entries,
151  List **cancel_requested);
152 static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
153 static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
154  int curlevel);
155 static void pgfdw_finish_abort_cleanup(List *pending_entries,
156  List *cancel_requested,
157  bool toplevel);
158 static void pgfdw_security_check(const char **keywords, const char **values,
161 static bool disconnect_cached_connections(Oid serverid);
162 
163 /*
164  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
165  * server with the user's authorization. A new connection is established
166  * if we don't already have a suitable one, and a transaction is opened at
167  * the right subtransaction nesting depth if we didn't do that already.
168  *
169  * will_prep_stmt must be true if caller intends to create any prepared
170  * statements. Since those don't go away automatically at transaction end
171  * (not even on error), we need this flag to cue manual cleanup.
172  *
173  * If state is not NULL, *state receives the per-connection state associated
174  * with the PGconn.
175  */
176 PGconn *
178 {
179  bool found;
180  bool retry = false;
181  ConnCacheEntry *entry;
184 
185  /* First time through, initialize connection cache hashtable */
186  if (ConnectionHash == NULL)
187  {
188  HASHCTL ctl;
189 
190  if (pgfdw_we_get_result == 0)
192  WaitEventExtensionNew("PostgresFdwGetResult");
193 
194  ctl.keysize = sizeof(ConnCacheKey);
195  ctl.entrysize = sizeof(ConnCacheEntry);
196  ConnectionHash = hash_create("postgres_fdw connections", 8,
197  &ctl,
199 
200  /*
201  * Register some callback functions that manage connection cleanup.
202  * This should be done just once in each backend.
203  */
206  CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
208  CacheRegisterSyscacheCallback(USERMAPPINGOID,
210  }
211 
212  /* Set flag that we did GetConnection during the current transaction */
213  xact_got_connection = true;
214 
215  /* Create hash key for the entry. Assume no pad bytes in key struct */
216  key = user->umid;
217 
218  /*
219  * Find or create cached entry for requested connection.
220  */
221  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
222  if (!found)
223  {
224  /*
225  * We need only clear "conn" here; remaining fields will be filled
226  * later when "conn" is set.
227  */
228  entry->conn = NULL;
229  }
230 
231  /* Reject further use of connections which failed abort cleanup. */
233 
234  /*
235  * If the connection needs to be remade due to invalidation, disconnect as
236  * soon as we're out of all transactions.
237  */
238  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
239  {
240  elog(DEBUG3, "closing connection %p for option changes to take effect",
241  entry->conn);
242  disconnect_pg_server(entry);
243  }
244 
245  /*
246  * If cache entry doesn't have a connection, we have to establish a new
247  * connection. (If connect_pg_server throws an error, the cache entry
248  * will remain in a valid empty state, ie conn == NULL.)
249  */
250  if (entry->conn == NULL)
251  make_new_connection(entry, user);
252 
253  /*
254  * We check the health of the cached connection here when using it. In
255  * cases where we're out of all transactions, if a broken connection is
256  * detected, we try to reestablish a new connection later.
257  */
258  PG_TRY();
259  {
260  /* Process a pending asynchronous request if any. */
261  if (entry->state.pendingAreq)
263  /* Start a new transaction or subtransaction if needed. */
264  begin_remote_xact(entry);
265  }
266  PG_CATCH();
267  {
269  ErrorData *errdata = CopyErrorData();
270 
271  /*
272  * Determine whether to try to reestablish the connection.
273  *
274  * After a broken connection is detected in libpq, any error other
275  * than connection failure (e.g., out-of-memory) can be thrown
276  * somewhere between return from libpq and the expected ereport() call
277  * in pgfdw_report_error(). In this case, since PQstatus() indicates
278  * CONNECTION_BAD, checking only PQstatus() causes the false detection
279  * of connection failure. To avoid this, we also verify that the
280  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
281  * checking only the sqlstate can cause another false detection
282  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
283  * for any libpq-originated error condition.
284  */
285  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
286  PQstatus(entry->conn) != CONNECTION_BAD ||
287  entry->xact_depth > 0)
288  {
289  MemoryContextSwitchTo(ecxt);
290  PG_RE_THROW();
291  }
292 
293  /* Clean up the error state */
294  FlushErrorState();
295  FreeErrorData(errdata);
296  errdata = NULL;
297 
298  retry = true;
299  }
300  PG_END_TRY();
301 
302  /*
303  * If a broken connection is detected, disconnect it, reestablish a new
304  * connection and retry a new remote transaction. If connection failure is
305  * reported again, we give up getting a connection.
306  */
307  if (retry)
308  {
309  Assert(entry->xact_depth == 0);
310 
311  ereport(DEBUG3,
312  (errmsg_internal("could not start remote transaction on connection %p",
313  entry->conn)),
314  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
315 
316  elog(DEBUG3, "closing connection %p to reestablish a new one",
317  entry->conn);
318  disconnect_pg_server(entry);
319 
320  make_new_connection(entry, user);
321 
322  begin_remote_xact(entry);
323  }
324 
325  /* Remember if caller will prepare statements */
326  entry->have_prep_stmt |= will_prep_stmt;
327 
328  /* If caller needs access to the per-connection state, return it. */
329  if (state)
330  *state = &entry->state;
331 
332  return entry->conn;
333 }
334 
335 /*
336  * Reset all transient state fields in the cached connection entry and
337  * establish new connection to the remote server.
338  */
339 static void
341 {
342  ForeignServer *server = GetForeignServer(user->serverid);
343  ListCell *lc;
344 
345  Assert(entry->conn == NULL);
346 
347  /* Reset all transient state fields, to be sure all are clean */
348  entry->xact_depth = 0;
349  entry->have_prep_stmt = false;
350  entry->have_error = false;
351  entry->changing_xact_state = false;
352  entry->invalidated = false;
353  entry->serverid = server->serverid;
354  entry->server_hashvalue =
355  GetSysCacheHashValue1(FOREIGNSERVEROID,
356  ObjectIdGetDatum(server->serverid));
357  entry->mapping_hashvalue =
358  GetSysCacheHashValue1(USERMAPPINGOID,
359  ObjectIdGetDatum(user->umid));
360  memset(&entry->state, 0, sizeof(entry->state));
361 
362  /*
363  * Determine whether to keep the connection that we're about to make here
364  * open even after the transaction using it ends, so that the subsequent
365  * transactions can re-use it.
366  *
367  * By default, all the connections to any foreign servers are kept open.
368  *
369  * Also determine whether to commit/abort (sub)transactions opened on the
370  * remote server in parallel at (sub)transaction end, which is disabled by
371  * default.
372  *
373  * Note: it's enough to determine these only when making a new connection
374  * because if these settings for it are changed, it will be closed and
375  * re-made later.
376  */
377  entry->keep_connections = true;
378  entry->parallel_commit = false;
379  entry->parallel_abort = false;
380  foreach(lc, server->options)
381  {
382  DefElem *def = (DefElem *) lfirst(lc);
383 
384  if (strcmp(def->defname, "keep_connections") == 0)
385  entry->keep_connections = defGetBoolean(def);
386  else if (strcmp(def->defname, "parallel_commit") == 0)
387  entry->parallel_commit = defGetBoolean(def);
388  else if (strcmp(def->defname, "parallel_abort") == 0)
389  entry->parallel_abort = defGetBoolean(def);
390  }
391 
392  /* Now try to make the connection */
393  entry->conn = connect_pg_server(server, user);
394 
395  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
396  entry->conn, server->servername, user->umid, user->userid);
397 }
398 
399 /*
400  * Check that non-superuser has used password or delegated credentials
401  * to establish connection; otherwise, he's piggybacking on the
402  * postgres server's user identity. See also dblink_security_check()
403  * in contrib/dblink and check_conn_params.
404  */
405 static void
406 pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
407 {
408  /* Superusers bypass the check */
409  if (superuser_arg(user->userid))
410  return;
411 
412 #ifdef ENABLE_GSS
413  /* Connected via GSSAPI with delegated credentials- all good. */
415  return;
416 #endif
417 
418  /* Ok if superuser set PW required false. */
420  return;
421 
422  /* Connected via PW, with PW required true, and provided non-empty PW. */
424  {
425  /* ok if params contain a non-empty password */
426  for (int i = 0; keywords[i] != NULL; i++)
427  {
428  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
429  return;
430  }
431  }
432 
433  ereport(ERROR,
434  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
435  errmsg("password or GSSAPI delegated credentials required"),
436  errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
437  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
438 }
439 
440 /*
441  * Connect to remote server using specified server and user mapping properties.
442  */
443 static PGconn *
445 {
446  PGconn *volatile conn = NULL;
447 
448  /*
449  * Use PG_TRY block to ensure closing connection on error.
450  */
451  PG_TRY();
452  {
453  const char **keywords;
454  const char **values;
455  char *appname = NULL;
456  int n;
457 
458  /*
459  * Construct connection params from generic options of ForeignServer
460  * and UserMapping. (Some of them might not be libpq options, in
461  * which case we'll just waste a few array slots.) Add 4 extra slots
462  * for application_name, fallback_application_name, client_encoding,
463  * end marker.
464  */
465  n = list_length(server->options) + list_length(user->options) + 4;
466  keywords = (const char **) palloc(n * sizeof(char *));
467  values = (const char **) palloc(n * sizeof(char *));
468 
469  n = 0;
470  n += ExtractConnectionOptions(server->options,
471  keywords + n, values + n);
472  n += ExtractConnectionOptions(user->options,
473  keywords + n, values + n);
474 
475  /*
476  * Use pgfdw_application_name as application_name if set.
477  *
478  * PQconnectdbParams() processes the parameter arrays from start to
479  * end. If any key word is repeated, the last value is used. Therefore
480  * note that pgfdw_application_name must be added to the arrays after
481  * options of ForeignServer are, so that it can override
482  * application_name set in ForeignServer.
483  */
485  {
486  keywords[n] = "application_name";
488  n++;
489  }
490 
491  /*
492  * Search the parameter arrays to find application_name setting, and
493  * replace escape sequences in it with status information if found.
494  * The arrays are searched backwards because the last value is used if
495  * application_name is repeatedly set.
496  */
497  for (int i = n - 1; i >= 0; i--)
498  {
499  if (strcmp(keywords[i], "application_name") == 0 &&
500  *(values[i]) != '\0')
501  {
502  /*
503  * Use this application_name setting if it's not empty string
504  * even after any escape sequences in it are replaced.
505  */
506  appname = process_pgfdw_appname(values[i]);
507  if (appname[0] != '\0')
508  {
509  values[i] = appname;
510  break;
511  }
512 
513  /*
514  * This empty application_name is not used, so we set
515  * values[i] to NULL and keep searching the array to find the
516  * next one.
517  */
518  values[i] = NULL;
519  pfree(appname);
520  appname = NULL;
521  }
522  }
523 
524  /* Use "postgres_fdw" as fallback_application_name */
525  keywords[n] = "fallback_application_name";
526  values[n] = "postgres_fdw";
527  n++;
528 
529  /* Set client_encoding so that libpq can convert encoding properly. */
530  keywords[n] = "client_encoding";
532  n++;
533 
534  keywords[n] = values[n] = NULL;
535 
536  /* verify the set of connection parameters */
537  check_conn_params(keywords, values, user);
538 
539  /* first time, allocate or get the custom wait event */
540  if (pgfdw_we_connect == 0)
541  pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
542 
543  /* OK to make connection */
544  conn = libpqsrv_connect_params(keywords, values,
545  false, /* expand_dbname */
547 
548  if (!conn || PQstatus(conn) != CONNECTION_OK)
549  ereport(ERROR,
550  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
551  errmsg("could not connect to server \"%s\"",
552  server->servername),
554 
555  /* Perform post-connection security checks */
556  pgfdw_security_check(keywords, values, user, conn);
557 
558  /* Prepare new session for use */
560 
561  if (appname != NULL)
562  pfree(appname);
563  pfree(keywords);
564  pfree(values);
565  }
566  PG_CATCH();
567  {
569  PG_RE_THROW();
570  }
571  PG_END_TRY();
572 
573  return conn;
574 }
575 
576 /*
577  * Disconnect any open connection for a connection cache entry.
578  */
579 static void
581 {
582  if (entry->conn != NULL)
583  {
584  libpqsrv_disconnect(entry->conn);
585  entry->conn = NULL;
586  }
587 }
588 
589 /*
590  * Return true if the password_required is defined and false for this user
591  * mapping, otherwise false. The mapping has been pre-validated.
592  */
593 static bool
595 {
596  ListCell *cell;
597 
598  foreach(cell, user->options)
599  {
600  DefElem *def = (DefElem *) lfirst(cell);
601 
602  if (strcmp(def->defname, "password_required") == 0)
603  return defGetBoolean(def);
604  }
605 
606  return true;
607 }
608 
609 /*
610  * For non-superusers, insist that the connstr specify a password or that the
611  * user provided their own GSSAPI delegated credentials. This
612  * prevents a password from being picked up from .pgpass, a service file, the
613  * environment, etc. We don't want the postgres user's passwords,
614  * certificates, etc to be accessible to non-superusers. (See also
615  * dblink_connstr_check in contrib/dblink.)
616  */
617 static void
618 check_conn_params(const char **keywords, const char **values, UserMapping *user)
619 {
620  int i;
621 
622  /* no check required if superuser */
623  if (superuser_arg(user->userid))
624  return;
625 
626 #ifdef ENABLE_GSS
627  /* ok if the user provided their own delegated credentials */
629  return;
630 #endif
631 
632  /* ok if params contain a non-empty password */
633  for (i = 0; keywords[i] != NULL; i++)
634  {
635  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
636  return;
637  }
638 
639  /* ok if the superuser explicitly said so at user mapping creation time */
641  return;
642 
643  ereport(ERROR,
644  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
645  errmsg("password or GSSAPI delegated credentials required"),
646  errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
647 }
648 
649 /*
650  * Issue SET commands to make sure remote session is configured properly.
651  *
652  * We do this just once at connection, assuming nothing will change the
653  * values later. Since we'll never send volatile function calls to the
654  * remote, there shouldn't be any way to break this assumption from our end.
655  * It's possible to think of ways to break it at the remote end, eg making
656  * a foreign table point to a view that includes a set_config call ---
657  * but once you admit the possibility of a malicious view definition,
658  * there are any number of ways to break things.
659  */
660 static void
662 {
663  int remoteversion = PQserverVersion(conn);
664 
665  /* Force the search path to contain only pg_catalog (see deparse.c) */
666  do_sql_command(conn, "SET search_path = pg_catalog");
667 
668  /*
669  * Set remote timezone; this is basically just cosmetic, since all
670  * transmitted and returned timestamptzs should specify a zone explicitly
671  * anyway. However it makes the regression test outputs more predictable.
672  *
673  * We don't risk setting remote zone equal to ours, since the remote
674  * server might use a different timezone database. Instead, use UTC
675  * (quoted, because very old servers are picky about case).
676  */
677  do_sql_command(conn, "SET timezone = 'UTC'");
678 
679  /*
680  * Set values needed to ensure unambiguous data output from remote. (This
681  * logic should match what pg_dump does. See also set_transmission_modes
682  * in postgres_fdw.c.)
683  */
684  do_sql_command(conn, "SET datestyle = ISO");
685  if (remoteversion >= 80400)
686  do_sql_command(conn, "SET intervalstyle = postgres");
687  if (remoteversion >= 90000)
688  do_sql_command(conn, "SET extra_float_digits = 3");
689  else
690  do_sql_command(conn, "SET extra_float_digits = 2");
691 }
692 
693 /*
694  * Convenience subroutine to issue a non-data-returning SQL command to remote
695  */
696 void
697 do_sql_command(PGconn *conn, const char *sql)
698 {
700  do_sql_command_end(conn, sql, false);
701 }
702 
703 static void
704 do_sql_command_begin(PGconn *conn, const char *sql)
705 {
706  if (!PQsendQuery(conn, sql))
707  pgfdw_report_error(ERROR, NULL, conn, false, sql);
708 }
709 
710 static void
711 do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
712 {
713  PGresult *res;
714 
715  /*
716  * If requested, consume whatever data is available from the socket. (Note
717  * that if all data is available, this allows pgfdw_get_result to call
718  * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
719  * would be large compared to the overhead of PQconsumeInput.)
720  */
721  if (consume_input && !PQconsumeInput(conn))
722  pgfdw_report_error(ERROR, NULL, conn, false, sql);
725  pgfdw_report_error(ERROR, res, conn, true, sql);
726  PQclear(res);
727 }
728 
729 /*
730  * Start remote transaction or subtransaction, if needed.
731  *
732  * Note that we always use at least REPEATABLE READ in the remote session.
733  * This is so that, if a query initiates multiple scans of the same or
734  * different foreign tables, we will get snapshot-consistent results from
735  * those scans. A disadvantage is that we can't provide sane emulation of
736  * READ COMMITTED behavior --- it would be nice if we had some other way to
737  * control which remote queries share a snapshot.
738  */
739 static void
741 {
742  int curlevel = GetCurrentTransactionNestLevel();
743 
744  /* Start main transaction if we haven't yet */
745  if (entry->xact_depth <= 0)
746  {
747  const char *sql;
748 
749  elog(DEBUG3, "starting remote transaction on connection %p",
750  entry->conn);
751 
753  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
754  else
755  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
756  entry->changing_xact_state = true;
757  do_sql_command(entry->conn, sql);
758  entry->xact_depth = 1;
759  entry->changing_xact_state = false;
760  }
761 
762  /*
763  * If we're in a subtransaction, stack up savepoints to match our level.
764  * This ensures we can rollback just the desired effects when a
765  * subtransaction aborts.
766  */
767  while (entry->xact_depth < curlevel)
768  {
769  char sql[64];
770 
771  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
772  entry->changing_xact_state = true;
773  do_sql_command(entry->conn, sql);
774  entry->xact_depth++;
775  entry->changing_xact_state = false;
776  }
777 }
778 
779 /*
780  * Release connection reference count created by calling GetConnection.
781  */
782 void
784 {
785  /*
786  * Currently, we don't actually track connection references because all
787  * cleanup is managed on a transaction or subtransaction basis instead. So
788  * there's nothing to do here.
789  */
790 }
791 
792 /*
793  * Assign a "unique" number for a cursor.
794  *
795  * These really only need to be unique per connection within a transaction.
796  * For the moment we ignore the per-connection point and assign them across
797  * all connections in the transaction, but we ask for the connection to be
798  * supplied in case we want to refine that.
799  *
800  * Note that even if wraparound happens in a very long transaction, actual
801  * collisions are highly improbable; just be sure to use %u not %d to print.
802  */
803 unsigned int
805 {
806  return ++cursor_number;
807 }
808 
809 /*
810  * Assign a "unique" number for a prepared statement.
811  *
812  * This works much like GetCursorNumber, except that we never reset the counter
813  * within a session. That's because we can't be 100% sure we've gotten rid
814  * of all prepared statements on all connections, and it's not really worth
815  * increasing the risk of prepared-statement name collisions by resetting.
816  */
817 unsigned int
819 {
820  return ++prep_stmt_number;
821 }
822 
823 /*
824  * Submit a query and wait for the result.
825  *
826  * Since we don't use non-blocking mode, this can't process interrupts while
827  * pushing the query text to the server. That risk is relatively small, so we
828  * ignore that for now.
829  *
830  * Caller is responsible for the error handling on the result.
831  */
832 PGresult *
834 {
835  /* First, process a pending asynchronous request, if any. */
836  if (state && state->pendingAreq)
837  process_pending_request(state->pendingAreq);
838 
839  if (!PQsendQuery(conn, query))
840  return NULL;
841  return pgfdw_get_result(conn);
842 }
843 
844 /*
845  * Wrap libpqsrv_get_result_last(), adding wait event.
846  *
847  * Caller is responsible for the error handling on the result.
848  */
849 PGresult *
851 {
853 }
854 
855 /*
856  * Report an error we got from the remote server.
857  *
858  * elevel: error level to use (typically ERROR, but might be less)
859  * res: PGresult containing the error
860  * conn: connection we did the query on
861  * clear: if true, PQclear the result (otherwise caller will handle it)
862  * sql: NULL, or text of remote command we tried to execute
863  *
864  * Note: callers that choose not to throw ERROR for a remote error are
865  * responsible for making sure that the associated ConnCacheEntry gets
866  * marked with have_error = true.
867  */
868 void
870  bool clear, const char *sql)
871 {
872  /* If requested, PGresult must be released before leaving this function. */
873  PG_TRY();
874  {
875  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
876  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
877  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
878  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
879  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
880  int sqlstate;
881 
882  if (diag_sqlstate)
883  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
884  diag_sqlstate[1],
885  diag_sqlstate[2],
886  diag_sqlstate[3],
887  diag_sqlstate[4]);
888  else
889  sqlstate = ERRCODE_CONNECTION_FAILURE;
890 
891  /*
892  * If we don't get a message from the PGresult, try the PGconn. This
893  * is needed because for connection-level failures, PQgetResult may
894  * just return NULL, not a PGresult at all.
895  */
896  if (message_primary == NULL)
897  message_primary = pchomp(PQerrorMessage(conn));
898 
899  ereport(elevel,
900  (errcode(sqlstate),
901  (message_primary != NULL && message_primary[0] != '\0') ?
902  errmsg_internal("%s", message_primary) :
903  errmsg("could not obtain message string for remote error"),
904  message_detail ? errdetail_internal("%s", message_detail) : 0,
905  message_hint ? errhint("%s", message_hint) : 0,
906  message_context ? errcontext("%s", message_context) : 0,
907  sql ? errcontext("remote SQL command: %s", sql) : 0));
908  }
909  PG_FINALLY();
910  {
911  if (clear)
912  PQclear(res);
913  }
914  PG_END_TRY();
915 }
916 
917 /*
918  * pgfdw_xact_callback --- cleanup at main-transaction end.
919  *
920  * This runs just late enough that it must not enter user-defined code
921  * locally. (Entering such code on the remote side is fine. Its remote
922  * COMMIT TRANSACTION may run deferred triggers.)
923  */
924 static void
926 {
927  HASH_SEQ_STATUS scan;
928  ConnCacheEntry *entry;
929  List *pending_entries = NIL;
930  List *cancel_requested = NIL;
931 
932  /* Quick exit if no connections were touched in this transaction. */
933  if (!xact_got_connection)
934  return;
935 
936  /*
937  * Scan all connection cache entries to find open remote transactions, and
938  * close them.
939  */
941  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
942  {
943  PGresult *res;
944 
945  /* Ignore cache entry if no open connection right now */
946  if (entry->conn == NULL)
947  continue;
948 
949  /* If it has an open remote transaction, try to close it */
950  if (entry->xact_depth > 0)
951  {
952  elog(DEBUG3, "closing remote transaction on connection %p",
953  entry->conn);
954 
955  switch (event)
956  {
959 
960  /*
961  * If abort cleanup previously failed for this connection,
962  * we can't issue any more commands against it.
963  */
965 
966  /* Commit all remote transactions during pre-commit */
967  entry->changing_xact_state = true;
968  if (entry->parallel_commit)
969  {
970  do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
971  pending_entries = lappend(pending_entries, entry);
972  continue;
973  }
974  do_sql_command(entry->conn, "COMMIT TRANSACTION");
975  entry->changing_xact_state = false;
976 
977  /*
978  * If there were any errors in subtransactions, and we
979  * made prepared statements, do a DEALLOCATE ALL to make
980  * sure we get rid of all prepared statements. This is
981  * annoying and not terribly bulletproof, but it's
982  * probably not worth trying harder.
983  *
984  * DEALLOCATE ALL only exists in 8.3 and later, so this
985  * constrains how old a server postgres_fdw can
986  * communicate with. We intentionally ignore errors in
987  * the DEALLOCATE, so that we can hobble along to some
988  * extent with older servers (leaking prepared statements
989  * as we go; but we don't really support update operations
990  * pre-8.3 anyway).
991  */
992  if (entry->have_prep_stmt && entry->have_error)
993  {
994  res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
995  NULL);
996  PQclear(res);
997  }
998  entry->have_prep_stmt = false;
999  entry->have_error = false;
1000  break;
1002 
1003  /*
1004  * We disallow any remote transactions, since it's not
1005  * very reasonable to hold them open until the prepared
1006  * transaction is committed. For the moment, throw error
1007  * unconditionally; later we might allow read-only cases.
1008  * Note that the error will cause us to come right back
1009  * here with event == XACT_EVENT_ABORT, so we'll clean up
1010  * the connection state at that point.
1011  */
1012  ereport(ERROR,
1013  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1014  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1015  break;
1017  case XACT_EVENT_COMMIT:
1018  case XACT_EVENT_PREPARE:
1019  /* Pre-commit should have closed the open transaction */
1020  elog(ERROR, "missed cleaning up connection during pre-commit");
1021  break;
1023  case XACT_EVENT_ABORT:
1024  /* Rollback all remote transactions during abort */
1025  if (entry->parallel_abort)
1026  {
1027  if (pgfdw_abort_cleanup_begin(entry, true,
1028  &pending_entries,
1029  &cancel_requested))
1030  continue;
1031  }
1032  else
1033  pgfdw_abort_cleanup(entry, true);
1034  break;
1035  }
1036  }
1037 
1038  /* Reset state to show we're out of a transaction */
1039  pgfdw_reset_xact_state(entry, true);
1040  }
1041 
1042  /* If there are any pending connections, finish cleaning them up */
1043  if (pending_entries || cancel_requested)
1044  {
1045  if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1046  event == XACT_EVENT_PRE_COMMIT)
1047  {
1048  Assert(cancel_requested == NIL);
1049  pgfdw_finish_pre_commit_cleanup(pending_entries);
1050  }
1051  else
1052  {
1053  Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1054  event == XACT_EVENT_ABORT);
1055  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1056  true);
1057  }
1058  }
1059 
1060  /*
1061  * Regardless of the event type, we can now mark ourselves as out of the
1062  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1063  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1064  */
1065  xact_got_connection = false;
1066 
1067  /* Also reset cursor numbering for next transaction */
1068  cursor_number = 0;
1069 }
1070 
1071 /*
1072  * pgfdw_subxact_callback --- cleanup at subtransaction end.
1073  */
1074 static void
1076  SubTransactionId parentSubid, void *arg)
1077 {
1078  HASH_SEQ_STATUS scan;
1079  ConnCacheEntry *entry;
1080  int curlevel;
1081  List *pending_entries = NIL;
1082  List *cancel_requested = NIL;
1083 
1084  /* Nothing to do at subxact start, nor after commit. */
1085  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1086  event == SUBXACT_EVENT_ABORT_SUB))
1087  return;
1088 
1089  /* Quick exit if no connections were touched in this transaction. */
1090  if (!xact_got_connection)
1091  return;
1092 
1093  /*
1094  * Scan all connection cache entries to find open remote subtransactions
1095  * of the current level, and close them.
1096  */
1097  curlevel = GetCurrentTransactionNestLevel();
1098  hash_seq_init(&scan, ConnectionHash);
1099  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1100  {
1101  char sql[100];
1102 
1103  /*
1104  * We only care about connections with open remote subtransactions of
1105  * the current level.
1106  */
1107  if (entry->conn == NULL || entry->xact_depth < curlevel)
1108  continue;
1109 
1110  if (entry->xact_depth > curlevel)
1111  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1112  entry->xact_depth);
1113 
1114  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1115  {
1116  /*
1117  * If abort cleanup previously failed for this connection, we
1118  * can't issue any more commands against it.
1119  */
1121 
1122  /* Commit all remote subtransactions during pre-commit */
1123  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1124  entry->changing_xact_state = true;
1125  if (entry->parallel_commit)
1126  {
1127  do_sql_command_begin(entry->conn, sql);
1128  pending_entries = lappend(pending_entries, entry);
1129  continue;
1130  }
1131  do_sql_command(entry->conn, sql);
1132  entry->changing_xact_state = false;
1133  }
1134  else
1135  {
1136  /* Rollback all remote subtransactions during abort */
1137  if (entry->parallel_abort)
1138  {
1139  if (pgfdw_abort_cleanup_begin(entry, false,
1140  &pending_entries,
1141  &cancel_requested))
1142  continue;
1143  }
1144  else
1145  pgfdw_abort_cleanup(entry, false);
1146  }
1147 
1148  /* OK, we're outta that level of subtransaction */
1149  pgfdw_reset_xact_state(entry, false);
1150  }
1151 
1152  /* If there are any pending connections, finish cleaning them up */
1153  if (pending_entries || cancel_requested)
1154  {
1155  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1156  {
1157  Assert(cancel_requested == NIL);
1158  pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1159  }
1160  else
1161  {
1162  Assert(event == SUBXACT_EVENT_ABORT_SUB);
1163  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1164  false);
1165  }
1166  }
1167 }
1168 
1169 /*
1170  * Connection invalidation callback function
1171  *
1172  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1173  * close connections depending on that entry immediately if current transaction
1174  * has not used those connections yet. Otherwise, mark those connections as
1175  * invalid and then make pgfdw_xact_callback() close them at the end of current
1176  * transaction, since they cannot be closed in the midst of the transaction
1177  * using them. Closed connections will be remade at the next opportunity if
1178  * necessary.
1179  *
1180  * Although most cache invalidation callbacks blow away all the related stuff
1181  * regardless of the given hashvalue, connections are expensive enough that
1182  * it's worth trying to avoid that.
1183  *
1184  * NB: We could avoid unnecessary disconnection more strictly by examining
1185  * individual option values, but it seems too much effort for the gain.
1186  */
1187 static void
1188 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1189 {
1190  HASH_SEQ_STATUS scan;
1191  ConnCacheEntry *entry;
1192 
1193  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1194 
1195  /* ConnectionHash must exist already, if we're registered */
1196  hash_seq_init(&scan, ConnectionHash);
1197  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1198  {
1199  /* Ignore invalid entries */
1200  if (entry->conn == NULL)
1201  continue;
1202 
1203  /* hashvalue == 0 means a cache reset, must clear all state */
1204  if (hashvalue == 0 ||
1205  (cacheid == FOREIGNSERVEROID &&
1206  entry->server_hashvalue == hashvalue) ||
1207  (cacheid == USERMAPPINGOID &&
1208  entry->mapping_hashvalue == hashvalue))
1209  {
1210  /*
1211  * Close the connection immediately if it's not used yet in this
1212  * transaction. Otherwise mark it as invalid so that
1213  * pgfdw_xact_callback() can close it at the end of this
1214  * transaction.
1215  */
1216  if (entry->xact_depth == 0)
1217  {
1218  elog(DEBUG3, "discarding connection %p", entry->conn);
1219  disconnect_pg_server(entry);
1220  }
1221  else
1222  entry->invalidated = true;
1223  }
1224  }
1225 }
1226 
1227 /*
1228  * Raise an error if the given connection cache entry is marked as being
1229  * in the middle of an xact state change. This should be called at which no
1230  * such change is expected to be in progress; if one is found to be in
1231  * progress, it means that we aborted in the middle of a previous state change
1232  * and now don't know what the remote transaction state actually is.
1233  * Such connections can't safely be further used. Re-establishing the
1234  * connection would change the snapshot and roll back any writes already
1235  * performed, so that's not an option, either. Thus, we must abort.
1236  */
1237 static void
1239 {
1240  ForeignServer *server;
1241 
1242  /* nothing to do for inactive entries and entries of sane state */
1243  if (entry->conn == NULL || !entry->changing_xact_state)
1244  return;
1245 
1246  /* make sure this entry is inactive */
1247  disconnect_pg_server(entry);
1248 
1249  /* find server name to be shown in the message below */
1250  server = GetForeignServer(entry->serverid);
1251 
1252  ereport(ERROR,
1253  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1254  errmsg("connection to server \"%s\" was lost",
1255  server->servername)));
1256 }
1257 
1258 /*
1259  * Reset state to show we're out of a (sub)transaction.
1260  */
1261 static void
1263 {
1264  if (toplevel)
1265  {
1266  /* Reset state to show we're out of a transaction */
1267  entry->xact_depth = 0;
1268 
1269  /*
1270  * If the connection isn't in a good idle state, it is marked as
1271  * invalid or keep_connections option of its server is disabled, then
1272  * discard it to recover. Next GetConnection will open a new
1273  * connection.
1274  */
1275  if (PQstatus(entry->conn) != CONNECTION_OK ||
1276  PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1277  entry->changing_xact_state ||
1278  entry->invalidated ||
1279  !entry->keep_connections)
1280  {
1281  elog(DEBUG3, "discarding connection %p", entry->conn);
1282  disconnect_pg_server(entry);
1283  }
1284  }
1285  else
1286  {
1287  /* Reset state to show we're out of a subtransaction */
1288  entry->xact_depth--;
1289  }
1290 }
1291 
1292 /*
1293  * Cancel the currently-in-progress query (whose query text we do not have)
1294  * and ignore the result. Returns true if we successfully cancel the query
1295  * and discard any pending result, and false if not.
1296  *
1297  * It's not a huge problem if we throw an ERROR here, but if we get into error
1298  * recursion trouble, we'll end up slamming the connection shut, which will
1299  * necessitate failing the entire toplevel transaction even if subtransactions
1300  * were used. Try to use WARNING where we can.
1301  *
1302  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1303  * query text from the pendingAreq saved in the per-connection state, then
1304  * report the query using it.
1305  */
1306 static bool
1308 {
1309  TimestampTz endtime;
1310 
1311  /*
1312  * If it takes too long to cancel the query and discard the result, assume
1313  * the connection is dead.
1314  */
1317 
1318  if (!pgfdw_cancel_query_begin(conn, endtime))
1319  return false;
1320  return pgfdw_cancel_query_end(conn, endtime, false);
1321 }
1322 
1323 /*
1324  * Submit a cancel request to the given connection, waiting only until
1325  * the given time.
1326  *
1327  * We sleep interruptibly until we receive confirmation that the cancel
1328  * request has been accepted, and if it is, return true; if the timeout
1329  * lapses without that, or the request fails for whatever reason, return
1330  * false.
1331  */
1332 static bool
1334 {
1335  const char *errormsg = libpqsrv_cancel(conn, endtime);
1336 
1337  if (errormsg != NULL)
1338  ereport(WARNING,
1339  errcode(ERRCODE_CONNECTION_FAILURE),
1340  errmsg("could not send cancel request: %s", errormsg));
1341 
1342  return errormsg == NULL;
1343 }
1344 
1345 static bool
1346 pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
1347 {
1348  PGresult *result = NULL;
1349  bool timed_out;
1350 
1351  /*
1352  * If requested, consume whatever data is available from the socket. (Note
1353  * that if all data is available, this allows pgfdw_get_cleanup_result to
1354  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1355  * which would be large compared to the overhead of PQconsumeInput.)
1356  */
1357  if (consume_input && !PQconsumeInput(conn))
1358  {
1359  ereport(WARNING,
1360  (errcode(ERRCODE_CONNECTION_FAILURE),
1361  errmsg("could not get result of cancel request: %s",
1362  pchomp(PQerrorMessage(conn)))));
1363  return false;
1364  }
1365 
1366  /* Get and discard the result of the query. */
1367  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1368  {
1369  if (timed_out)
1370  ereport(WARNING,
1371  (errmsg("could not get result of cancel request due to timeout")));
1372  else
1373  ereport(WARNING,
1374  (errcode(ERRCODE_CONNECTION_FAILURE),
1375  errmsg("could not get result of cancel request: %s",
1376  pchomp(PQerrorMessage(conn)))));
1377 
1378  return false;
1379  }
1380  PQclear(result);
1381 
1382  return true;
1383 }
1384 
1385 /*
1386  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1387  * result. If the query is executed without error, the return value is true.
1388  * If the query is executed successfully but returns an error, the return
1389  * value is true if and only if ignore_errors is set. If the query can't be
1390  * sent or times out, the return value is false.
1391  *
1392  * It's not a huge problem if we throw an ERROR here, but if we get into error
1393  * recursion trouble, we'll end up slamming the connection shut, which will
1394  * necessitate failing the entire toplevel transaction even if subtransactions
1395  * were used. Try to use WARNING where we can.
1396  */
1397 static bool
1398 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1399 {
1400  TimestampTz endtime;
1401 
1402  /*
1403  * If it takes too long to execute a cleanup query, assume the connection
1404  * is dead. It's fairly likely that this is why we aborted in the first
1405  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1406  * be too long.
1407  */
1410 
1411  if (!pgfdw_exec_cleanup_query_begin(conn, query))
1412  return false;
1413  return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1414  false, ignore_errors);
1415 }
1416 
1417 static bool
1419 {
1420  Assert(query != NULL);
1421 
1422  /*
1423  * Submit a query. Since we don't use non-blocking mode, this also can
1424  * block. But its risk is relatively small, so we ignore that for now.
1425  */
1426  if (!PQsendQuery(conn, query))
1427  {
1428  pgfdw_report_error(WARNING, NULL, conn, false, query);
1429  return false;
1430  }
1431 
1432  return true;
1433 }
1434 
1435 static bool
1437  TimestampTz endtime, bool consume_input,
1438  bool ignore_errors)
1439 {
1440  PGresult *result = NULL;
1441  bool timed_out;
1442 
1443  Assert(query != NULL);
1444 
1445  /*
1446  * If requested, consume whatever data is available from the socket. (Note
1447  * that if all data is available, this allows pgfdw_get_cleanup_result to
1448  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1449  * which would be large compared to the overhead of PQconsumeInput.)
1450  */
1451  if (consume_input && !PQconsumeInput(conn))
1452  {
1453  pgfdw_report_error(WARNING, NULL, conn, false, query);
1454  return false;
1455  }
1456 
1457  /* Get the result of the query. */
1458  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1459  {
1460  if (timed_out)
1461  ereport(WARNING,
1462  (errmsg("could not get query result due to timeout"),
1463  errcontext("remote SQL command: %s", query)));
1464  else
1465  pgfdw_report_error(WARNING, NULL, conn, false, query);
1466 
1467  return false;
1468  }
1469 
1470  /* Issue a warning if not successful. */
1471  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1472  {
1473  pgfdw_report_error(WARNING, result, conn, true, query);
1474  return ignore_errors;
1475  }
1476  PQclear(result);
1477 
1478  return true;
1479 }
1480 
1481 /*
1482  * Get, during abort cleanup, the result of a query that is in progress. This
1483  * might be a query that is being interrupted by transaction abort, or it might
1484  * be a query that was initiated as part of transaction abort to get the remote
1485  * side back to the appropriate state.
1486  *
1487  * endtime is the time at which we should give up and assume the remote
1488  * side is dead. Returns true if the timeout expired or connection trouble
1489  * occurred, false otherwise. Sets *result except in case of a timeout.
1490  * Sets timed_out to true only when the timeout expired.
1491  */
1492 static bool
1494  bool *timed_out)
1495 {
1496  volatile bool failed = false;
1497  PGresult *volatile last_res = NULL;
1498 
1499  *timed_out = false;
1500 
1501  /* In what follows, do not leak any PGresults on an error. */
1502  PG_TRY();
1503  {
1504  for (;;)
1505  {
1506  PGresult *res;
1507 
1508  while (PQisBusy(conn))
1509  {
1510  int wc;
1512  long cur_timeout;
1513 
1514  /* If timeout has expired, give up, else get sleep time. */
1515  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1516  if (cur_timeout <= 0)
1517  {
1518  *timed_out = true;
1519  failed = true;
1520  goto exit;
1521  }
1522 
1523  /* first time, allocate or get the custom wait event */
1524  if (pgfdw_we_cleanup_result == 0)
1525  pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1526 
1527  /* Sleep until there's something to do */
1531  PQsocket(conn),
1532  cur_timeout, pgfdw_we_cleanup_result);
1534 
1536 
1537  /* Data available in socket? */
1538  if (wc & WL_SOCKET_READABLE)
1539  {
1540  if (!PQconsumeInput(conn))
1541  {
1542  /* connection trouble */
1543  failed = true;
1544  goto exit;
1545  }
1546  }
1547  }
1548 
1549  res = PQgetResult(conn);
1550  if (res == NULL)
1551  break; /* query is complete */
1552 
1553  PQclear(last_res);
1554  last_res = res;
1555  }
1556 exit: ;
1557  }
1558  PG_CATCH();
1559  {
1560  PQclear(last_res);
1561  PG_RE_THROW();
1562  }
1563  PG_END_TRY();
1564 
1565  if (failed)
1566  PQclear(last_res);
1567  else
1568  *result = last_res;
1569  return failed;
1570 }
1571 
1572 /*
1573  * Abort remote transaction or subtransaction.
1574  *
1575  * "toplevel" should be set to true if toplevel (main) transaction is
1576  * rollbacked, false otherwise.
1577  *
1578  * Set entry->changing_xact_state to false on success, true on failure.
1579  */
1580 static void
1581 pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1582 {
1583  char sql[100];
1584 
1585  /*
1586  * Don't try to clean up the connection if we're already in error
1587  * recursion trouble.
1588  */
1590  entry->changing_xact_state = true;
1591 
1592  /*
1593  * If connection is already unsalvageable, don't touch it further.
1594  */
1595  if (entry->changing_xact_state)
1596  return;
1597 
1598  /*
1599  * Mark this connection as in the process of changing transaction state.
1600  */
1601  entry->changing_xact_state = true;
1602 
1603  /* Assume we might have lost track of prepared statements */
1604  entry->have_error = true;
1605 
1606  /*
1607  * If a command has been submitted to the remote server by using an
1608  * asynchronous execution function, the command might not have yet
1609  * completed. Check to see if a command is still being processed by the
1610  * remote server, and if so, request cancellation of the command.
1611  */
1612  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1613  !pgfdw_cancel_query(entry->conn))
1614  return; /* Unable to cancel running query */
1615 
1616  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1617  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1618  return; /* Unable to abort remote (sub)transaction */
1619 
1620  if (toplevel)
1621  {
1622  if (entry->have_prep_stmt && entry->have_error &&
1624  "DEALLOCATE ALL",
1625  true))
1626  return; /* Trouble clearing prepared statements */
1627 
1628  entry->have_prep_stmt = false;
1629  entry->have_error = false;
1630  }
1631 
1632  /*
1633  * If pendingAreq of the per-connection state is not NULL, it means that
1634  * an asynchronous fetch begun by fetch_more_data_begin() was not done
1635  * successfully and thus the per-connection state was not reset in
1636  * fetch_more_data(); in that case reset the per-connection state here.
1637  */
1638  if (entry->state.pendingAreq)
1639  memset(&entry->state, 0, sizeof(entry->state));
1640 
1641  /* Disarm changing_xact_state if it all worked */
1642  entry->changing_xact_state = false;
1643 }
1644 
1645 /*
1646  * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1647  * don't wait for the result.
1648  *
1649  * Returns true if the abort command or cancel request is successfully issued,
1650  * false otherwise. If the abort command is successfully issued, the given
1651  * connection cache entry is appended to *pending_entries. Otherwise, if the
1652  * cancel request is successfully issued, it is appended to *cancel_requested.
1653  */
1654 static bool
1656  List **pending_entries, List **cancel_requested)
1657 {
1658  /*
1659  * Don't try to clean up the connection if we're already in error
1660  * recursion trouble.
1661  */
1663  entry->changing_xact_state = true;
1664 
1665  /*
1666  * If connection is already unsalvageable, don't touch it further.
1667  */
1668  if (entry->changing_xact_state)
1669  return false;
1670 
1671  /*
1672  * Mark this connection as in the process of changing transaction state.
1673  */
1674  entry->changing_xact_state = true;
1675 
1676  /* Assume we might have lost track of prepared statements */
1677  entry->have_error = true;
1678 
1679  /*
1680  * If a command has been submitted to the remote server by using an
1681  * asynchronous execution function, the command might not have yet
1682  * completed. Check to see if a command is still being processed by the
1683  * remote server, and if so, request cancellation of the command.
1684  */
1685  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1686  {
1687  TimestampTz endtime;
1688 
1691  if (!pgfdw_cancel_query_begin(entry->conn, endtime))
1692  return false; /* Unable to cancel running query */
1693  *cancel_requested = lappend(*cancel_requested, entry);
1694  }
1695  else
1696  {
1697  char sql[100];
1698 
1699  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1700  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1701  return false; /* Unable to abort remote transaction */
1702  *pending_entries = lappend(*pending_entries, entry);
1703  }
1704 
1705  return true;
1706 }
1707 
1708 /*
1709  * Finish pre-commit cleanup of connections on each of which we've sent a
1710  * COMMIT command to the remote server.
1711  */
1712 static void
1714 {
1715  ConnCacheEntry *entry;
1716  List *pending_deallocs = NIL;
1717  ListCell *lc;
1718 
1719  Assert(pending_entries);
1720 
1721  /*
1722  * Get the result of the COMMIT command for each of the pending entries
1723  */
1724  foreach(lc, pending_entries)
1725  {
1726  entry = (ConnCacheEntry *) lfirst(lc);
1727 
1728  Assert(entry->changing_xact_state);
1729 
1730  /*
1731  * We might already have received the result on the socket, so pass
1732  * consume_input=true to try to consume it first
1733  */
1734  do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1735  entry->changing_xact_state = false;
1736 
1737  /* Do a DEALLOCATE ALL in parallel if needed */
1738  if (entry->have_prep_stmt && entry->have_error)
1739  {
1740  /* Ignore errors (see notes in pgfdw_xact_callback) */
1741  if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1742  {
1743  pending_deallocs = lappend(pending_deallocs, entry);
1744  continue;
1745  }
1746  }
1747  entry->have_prep_stmt = false;
1748  entry->have_error = false;
1749 
1750  pgfdw_reset_xact_state(entry, true);
1751  }
1752 
1753  /* No further work if no pending entries */
1754  if (!pending_deallocs)
1755  return;
1756 
1757  /*
1758  * Get the result of the DEALLOCATE command for each of the pending
1759  * entries
1760  */
1761  foreach(lc, pending_deallocs)
1762  {
1763  PGresult *res;
1764 
1765  entry = (ConnCacheEntry *) lfirst(lc);
1766 
1767  /* Ignore errors (see notes in pgfdw_xact_callback) */
1768  while ((res = PQgetResult(entry->conn)) != NULL)
1769  {
1770  PQclear(res);
1771  /* Stop if the connection is lost (else we'll loop infinitely) */
1772  if (PQstatus(entry->conn) == CONNECTION_BAD)
1773  break;
1774  }
1775  entry->have_prep_stmt = false;
1776  entry->have_error = false;
1777 
1778  pgfdw_reset_xact_state(entry, true);
1779  }
1780 }
1781 
1782 /*
1783  * Finish pre-subcommit cleanup of connections on each of which we've sent a
1784  * RELEASE command to the remote server.
1785  */
1786 static void
1787 pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1788 {
1789  ConnCacheEntry *entry;
1790  char sql[100];
1791  ListCell *lc;
1792 
1793  Assert(pending_entries);
1794 
1795  /*
1796  * Get the result of the RELEASE command for each of the pending entries
1797  */
1798  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1799  foreach(lc, pending_entries)
1800  {
1801  entry = (ConnCacheEntry *) lfirst(lc);
1802 
1803  Assert(entry->changing_xact_state);
1804 
1805  /*
1806  * We might already have received the result on the socket, so pass
1807  * consume_input=true to try to consume it first
1808  */
1809  do_sql_command_end(entry->conn, sql, true);
1810  entry->changing_xact_state = false;
1811 
1812  pgfdw_reset_xact_state(entry, false);
1813  }
1814 }
1815 
1816 /*
1817  * Finish abort cleanup of connections on each of which we've sent an abort
1818  * command or cancel request to the remote server.
1819  */
1820 static void
1821 pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1822  bool toplevel)
1823 {
1824  List *pending_deallocs = NIL;
1825  ListCell *lc;
1826 
1827  /*
1828  * For each of the pending cancel requests (if any), get and discard the
1829  * result of the query, and submit an abort command to the remote server.
1830  */
1831  if (cancel_requested)
1832  {
1833  foreach(lc, cancel_requested)
1834  {
1835  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1836  TimestampTz endtime;
1837  char sql[100];
1838 
1839  Assert(entry->changing_xact_state);
1840 
1841  /*
1842  * Set end time. You might think we should do this before issuing
1843  * cancel request like in normal mode, but that is problematic,
1844  * because if, for example, it took longer than 30 seconds to
1845  * process the first few entries in the cancel_requested list, it
1846  * would cause a timeout error when processing each of the
1847  * remaining entries in the list, leading to slamming that entry's
1848  * connection shut.
1849  */
1852 
1853  if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
1854  {
1855  /* Unable to cancel running query */
1856  pgfdw_reset_xact_state(entry, toplevel);
1857  continue;
1858  }
1859 
1860  /* Send an abort command in parallel if needed */
1861  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1862  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1863  {
1864  /* Unable to abort remote (sub)transaction */
1865  pgfdw_reset_xact_state(entry, toplevel);
1866  }
1867  else
1868  pending_entries = lappend(pending_entries, entry);
1869  }
1870  }
1871 
1872  /* No further work if no pending entries */
1873  if (!pending_entries)
1874  return;
1875 
1876  /*
1877  * Get the result of the abort command for each of the pending entries
1878  */
1879  foreach(lc, pending_entries)
1880  {
1881  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1882  TimestampTz endtime;
1883  char sql[100];
1884 
1885  Assert(entry->changing_xact_state);
1886 
1887  /*
1888  * Set end time. We do this now, not before issuing the command like
1889  * in normal mode, for the same reason as for the cancel_requested
1890  * entries.
1891  */
1894 
1895  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1896  if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
1897  true, false))
1898  {
1899  /* Unable to abort remote (sub)transaction */
1900  pgfdw_reset_xact_state(entry, toplevel);
1901  continue;
1902  }
1903 
1904  if (toplevel)
1905  {
1906  /* Do a DEALLOCATE ALL in parallel if needed */
1907  if (entry->have_prep_stmt && entry->have_error)
1908  {
1910  "DEALLOCATE ALL"))
1911  {
1912  /* Trouble clearing prepared statements */
1913  pgfdw_reset_xact_state(entry, toplevel);
1914  }
1915  else
1916  pending_deallocs = lappend(pending_deallocs, entry);
1917  continue;
1918  }
1919  entry->have_prep_stmt = false;
1920  entry->have_error = false;
1921  }
1922 
1923  /* Reset the per-connection state if needed */
1924  if (entry->state.pendingAreq)
1925  memset(&entry->state, 0, sizeof(entry->state));
1926 
1927  /* We're done with this entry; unset the changing_xact_state flag */
1928  entry->changing_xact_state = false;
1929  pgfdw_reset_xact_state(entry, toplevel);
1930  }
1931 
1932  /* No further work if no pending entries */
1933  if (!pending_deallocs)
1934  return;
1935  Assert(toplevel);
1936 
1937  /*
1938  * Get the result of the DEALLOCATE command for each of the pending
1939  * entries
1940  */
1941  foreach(lc, pending_deallocs)
1942  {
1943  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1944  TimestampTz endtime;
1945 
1946  Assert(entry->changing_xact_state);
1947  Assert(entry->have_prep_stmt);
1948  Assert(entry->have_error);
1949 
1950  /*
1951  * Set end time. We do this now, not before issuing the command like
1952  * in normal mode, for the same reason as for the cancel_requested
1953  * entries.
1954  */
1957 
1958  if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
1959  endtime, true, true))
1960  {
1961  /* Trouble clearing prepared statements */
1962  pgfdw_reset_xact_state(entry, toplevel);
1963  continue;
1964  }
1965  entry->have_prep_stmt = false;
1966  entry->have_error = false;
1967 
1968  /* Reset the per-connection state if needed */
1969  if (entry->state.pendingAreq)
1970  memset(&entry->state, 0, sizeof(entry->state));
1971 
1972  /* We're done with this entry; unset the changing_xact_state flag */
1973  entry->changing_xact_state = false;
1974  pgfdw_reset_xact_state(entry, toplevel);
1975  }
1976 }
1977 
1978 /*
1979  * List active foreign server connections.
1980  *
1981  * This function takes no input parameter and returns setof record made of
1982  * following values:
1983  * - server_name - server name of active connection. In case the foreign server
1984  * is dropped but still the connection is active, then the server name will
1985  * be NULL in output.
1986  * - valid - true/false representing whether the connection is valid or not.
1987  * Note that the connections can get invalidated in pgfdw_inval_callback.
1988  *
1989  * No records are returned when there are no cached connections at all.
1990  */
1991 Datum
1993 {
1994 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1995  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1996  HASH_SEQ_STATUS scan;
1997  ConnCacheEntry *entry;
1998 
1999  InitMaterializedSRF(fcinfo, 0);
2000 
2001  /* If cache doesn't exist, we return no records */
2002  if (!ConnectionHash)
2003  PG_RETURN_VOID();
2004 
2005  hash_seq_init(&scan, ConnectionHash);
2006  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2007  {
2008  ForeignServer *server;
2010  bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2011 
2012  /* We only look for open remote connections */
2013  if (!entry->conn)
2014  continue;
2015 
2016  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2017 
2018  /*
2019  * The foreign server may have been dropped in current explicit
2020  * transaction. It is not possible to drop the server from another
2021  * session when the connection associated with it is in use in the
2022  * current transaction, if tried so, the drop query in another session
2023  * blocks until the current transaction finishes.
2024  *
2025  * Even though the server is dropped in the current transaction, the
2026  * cache can still have associated active connection entry, say we
2027  * call such connections dangling. Since we can not fetch the server
2028  * name from system catalogs for dangling connections, instead we show
2029  * NULL value for server name in output.
2030  *
2031  * We could have done better by storing the server name in the cache
2032  * entry instead of server oid so that it could be used in the output.
2033  * But the server name in each cache entry requires 64 bytes of
2034  * memory, which is huge, when there are many cached connections and
2035  * the use case i.e. dropping the foreign server within the explicit
2036  * current transaction seems rare. So, we chose to show NULL value for
2037  * server name in output.
2038  *
2039  * Such dangling connections get closed either in next use or at the
2040  * end of current explicit transaction in pgfdw_xact_callback.
2041  */
2042  if (!server)
2043  {
2044  /*
2045  * If the server has been dropped in the current explicit
2046  * transaction, then this entry would have been invalidated in
2047  * pgfdw_inval_callback at the end of drop server command. Note
2048  * that this connection would not have been closed in
2049  * pgfdw_inval_callback because it is still being used in the
2050  * current explicit transaction. So, assert that here.
2051  */
2052  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2053 
2054  /* Show null, if no server name was found */
2055  nulls[0] = true;
2056  }
2057  else
2058  values[0] = CStringGetTextDatum(server->servername);
2059 
2060  values[1] = BoolGetDatum(!entry->invalidated);
2061 
2062  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2063  }
2064 
2065  PG_RETURN_VOID();
2066 }
2067 
2068 /*
2069  * Disconnect the specified cached connections.
2070  *
2071  * This function discards the open connections that are established by
2072  * postgres_fdw from the local session to the foreign server with
2073  * the given name. Note that there can be multiple connections to
2074  * the given server using different user mappings. If the connections
2075  * are used in the current local transaction, they are not disconnected
2076  * and warning messages are reported. This function returns true
2077  * if it disconnects at least one connection, otherwise false. If no
2078  * foreign server with the given name is found, an error is reported.
2079  */
2080 Datum
2082 {
2083  ForeignServer *server;
2084  char *servername;
2085 
2086  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2087  server = GetForeignServerByName(servername, false);
2088 
2090 }
2091 
2092 /*
2093  * Disconnect all the cached connections.
2094  *
2095  * This function discards all the open connections that are established by
2096  * postgres_fdw from the local session to the foreign servers.
2097  * If the connections are used in the current local transaction, they are
2098  * not disconnected and warning messages are reported. This function
2099  * returns true if it disconnects at least one connection, otherwise false.
2100  */
2101 Datum
2103 {
2105 }
2106 
2107 /*
2108  * Workhorse to disconnect cached connections.
2109  *
2110  * This function scans all the connection cache entries and disconnects
2111  * the open connections whose foreign server OID matches with
2112  * the specified one. If InvalidOid is specified, it disconnects all
2113  * the cached connections.
2114  *
2115  * This function emits a warning for each connection that's used in
2116  * the current transaction and doesn't close it. It returns true if
2117  * it disconnects at least one connection, otherwise false.
2118  *
2119  * Note that this function disconnects even the connections that are
2120  * established by other users in the same local session using different
2121  * user mappings. This leads even non-superuser to be able to close
2122  * the connections established by superusers in the same local session.
2123  *
2124  * XXX As of now we don't see any security risk doing this. But we should
2125  * set some restrictions on that, for example, prevent non-superuser
2126  * from closing the connections established by superusers even
2127  * in the same session?
2128  */
2129 static bool
2131 {
2132  HASH_SEQ_STATUS scan;
2133  ConnCacheEntry *entry;
2134  bool all = !OidIsValid(serverid);
2135  bool result = false;
2136 
2137  /*
2138  * Connection cache hashtable has not been initialized yet in this
2139  * session, so return false.
2140  */
2141  if (!ConnectionHash)
2142  return false;
2143 
2144  hash_seq_init(&scan, ConnectionHash);
2145  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2146  {
2147  /* Ignore cache entry if no open connection right now. */
2148  if (!entry->conn)
2149  continue;
2150 
2151  if (all || entry->serverid == serverid)
2152  {
2153  /*
2154  * Emit a warning because the connection to close is used in the
2155  * current transaction and cannot be disconnected right now.
2156  */
2157  if (entry->xact_depth > 0)
2158  {
2159  ForeignServer *server;
2160 
2161  server = GetForeignServerExtended(entry->serverid,
2162  FSV_MISSING_OK);
2163 
2164  if (!server)
2165  {
2166  /*
2167  * If the foreign server was dropped while its connection
2168  * was used in the current transaction, the connection
2169  * must have been marked as invalid by
2170  * pgfdw_inval_callback at the end of DROP SERVER command.
2171  */
2172  Assert(entry->invalidated);
2173 
2174  ereport(WARNING,
2175  (errmsg("cannot close dropped server connection because it is still in use")));
2176  }
2177  else
2178  ereport(WARNING,
2179  (errmsg("cannot close connection for server \"%s\" because it is still in use",
2180  server->servername)));
2181  }
2182  else
2183  {
2184  elog(DEBUG3, "discarding connection %p", entry->conn);
2185  disconnect_pg_server(entry);
2186  result = true;
2187  }
2188  }
2189  }
2190 
2191  return result;
2192 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1766
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
bool be_gssapi_get_delegation(Port *port)
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define CStringGetTextDatum(s)
Definition: builtins.h:97
unsigned int uint32
Definition: c.h:506
uint32 SubTransactionId
Definition: c.h:656
#define Assert(condition)
Definition: c.h:858
#define OidIsValid(objectId)
Definition: c.h:775
Oid ConnCacheKey
Definition: connection.c:51
static unsigned int prep_stmt_number
Definition: connection.c:81
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:804
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:594
Datum postgres_fdw_get_connections(PG_FUNCTION_ARGS)
Definition: connection.c:1992
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:697
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
Definition: connection.c:1787
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:580
void ReleaseConnection(PGconn *conn)
Definition: connection.c:783
static uint32 pgfdw_we_get_result
Definition: connection.c:89
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections)
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
Definition: connection.c:1333
static void pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, bool toplevel)
Definition: connection.c:1821
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1262
static void configure_remote_session(PGconn *conn)
Definition: connection.c:661
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:869
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
Definition: connection.c:833
static bool xact_got_connection
Definition: connection.c:84
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
struct ConnCacheEntry ConnCacheEntry
Datum postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
Definition: connection.c:2102
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
Definition: connection.c:1346
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
Definition: connection.c:711
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
Definition: connection.c:1493
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
Definition: connection.c:99
static uint32 pgfdw_we_cleanup_result
Definition: connection.c:87
static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
Definition: connection.c:1655
static HTAB * ConnectionHash
Definition: connection.c:77
static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
Definition: connection.c:1436
static unsigned int cursor_number
Definition: connection.c:80
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:340
static void pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
Definition: connection.c:406
Datum postgres_fdw_disconnect(PG_FUNCTION_ARGS)
Definition: connection.c:2081
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:1075
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:444
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1398
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:177
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:818
static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
Definition: connection.c:1418
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1238
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:618
static uint32 pgfdw_we_connect
Definition: connection.c:88
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1188
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:925
#define CONNECTION_CLEANUP_TIMEOUT
Definition: connection.c:96
static void do_sql_command_begin(PGconn *conn, const char *sql)
Definition: connection.c:704
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1581
PGresult * pgfdw_get_result(PGconn *conn)
Definition: connection.c:850
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:740
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1307
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Definition: connection.c:1713
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:2130
char * process_pgfdw_appname(const char *appname)
Definition: option.c:491
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:414
char * pgfdw_application_name
Definition: option.c:52
int64 TimestampTz
Definition: timestamp.h:39
bool defGetBoolean(DefElem *def)
Definition: define.c:107
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1395
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1159
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1787
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1232
int errdetail(const char *fmt,...)
Definition: elog.c:1205
void FlushErrorState(void)
Definition: elog.c:1836
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
bool in_error_recursion_trouble(void)
Definition: elog.c:297
ErrorData * CopyErrorData(void)
Definition: elog.c:1731
#define PG_RE_THROW()
Definition: elog.h:411
#define errcontext
Definition: elog.h:196
#define DEBUG3
Definition: elog.h:28
#define PG_TRY(...)
Definition: elog.h:370
#define WARNING
Definition: elog.h:36
#define PG_END_TRY(...)
Definition: elog.h:395
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define elog(elevel,...)
Definition: elog.h:224
#define PG_FINALLY(...)
Definition: elog.h:387
#define ereport(elevel,...)
Definition: elog.h:149
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7131
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:7096
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7207
int PQconnectionUsedGSSAPI(const PGconn *conn)
Definition: fe-connect.c:7218
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7141
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7088
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7167
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:122
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:110
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:181
#define FSV_MISSING_OK
Definition: foreign.h:61
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
struct Port * MyProcPort
Definition: globals.c:49
struct Latch * MyLatch
Definition: globals.c:60
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1516
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
static void libpqsrv_disconnect(PGconn *conn)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
@ CONNECTION_BAD
Definition: libpq-fe.h:62
@ CONNECTION_OK
Definition: libpq-fe.h:61
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
@ PQTRANS_IDLE
Definition: libpq-fe.h:122
@ PQTRANS_ACTIVE
Definition: libpq-fe.h:123
exit(1)
List * lappend(List *list, void *datum)
Definition: list.c:339
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1267
char * pchomp(const char *in)
Definition: mcxt.c:1723
void pfree(void *pointer)
Definition: mcxt.c:1520
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * palloc(Size size)
Definition: mcxt.c:1316
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void * arg
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
static char * user
Definition: pg_regress.c:120
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:59
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:57
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:58
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:63
void process_pending_request(AsyncRequest *areq)
MemoryContextSwitchTo(old_ctx)
tree ctl
Definition: radixtree.h:1847
PGconn * conn
Definition: streamutil.c:55
PGconn * conn
Definition: connection.c:56
bool have_prep_stmt
Definition: connection.c:60
PgFdwConnState state
Definition: connection.c:71
bool invalidated
Definition: connection.c:65
ConnCacheKey key
Definition: connection.c:55
bool parallel_commit
Definition: connection.c:63
uint32 server_hashvalue
Definition: connection.c:69
uint32 mapping_hashvalue
Definition: connection.c:70
bool keep_connections
Definition: connection.c:66
bool parallel_abort
Definition: connection.c:64
bool changing_xact_state
Definition: connection.c:62
char * defname
Definition: parsenodes.h:815
int sqlerrcode
Definition: elog.h:438
List * options
Definition: foreign.h:42
char * servername
Definition: foreign.h:39
Oid serverid
Definition: foreign.h:36
Definition: dynahash.c:220
Definition: pg_list.h:54
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:138
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
Definition: regguts.h:323
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:113
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:750
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
char * text_to_cstring(const text *t)
Definition: varlena.c:217
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition: wait_event.c:162
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:926
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3766
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3826
SubXactEvent
Definition: xact.h:141
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition: xact.h:145
@ SUBXACT_EVENT_ABORT_SUB
Definition: xact.h:144
XactEvent
Definition: xact.h:127
@ XACT_EVENT_PRE_PREPARE
Definition: xact.h:135
@ XACT_EVENT_COMMIT
Definition: xact.h:128
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition: xact.h:134
@ XACT_EVENT_PARALLEL_COMMIT
Definition: xact.h:129
@ XACT_EVENT_ABORT
Definition: xact.h:130
@ XACT_EVENT_PRE_COMMIT
Definition: xact.h:133
@ XACT_EVENT_PARALLEL_ABORT
Definition: xact.h:131
@ XACT_EVENT_PREPARE
Definition: xact.h:132
#define IsolationIsSerializable()
Definition: xact.h:52