PostgreSQL Source Code  git master
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2022, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
18 #include "commands/defrem.h"
19 #include "funcapi.h"
20 #include "mb/pg_wchar.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "postgres_fdw.h"
24 #include "storage/fd.h"
25 #include "storage/latch.h"
26 #include "utils/builtins.h"
27 #include "utils/datetime.h"
28 #include "utils/hsearch.h"
29 #include "utils/inval.h"
30 #include "utils/memutils.h"
31 #include "utils/syscache.h"
32 
33 /*
34  * Connection cache hash table entry
35  *
36  * The lookup key in this hash table is the user mapping OID. We use just one
37  * connection per user mapping ID, which ensures that all the scans use the
38  * same snapshot during a query. Using the user mapping OID rather than
39  * the foreign server OID + user OID avoids creating multiple connections when
40  * the public user mapping applies to all user OIDs.
41  *
42  * The "conn" pointer can be NULL if we don't currently have a live connection.
43  * When we do have a connection, xact_depth tracks the current depth of
44  * transactions and subtransactions open on the remote side. We need to issue
45  * commands at the same nesting depth on the remote as we're executing at
46  * ourselves, so that rolling back a subtransaction will kill the right
47  * queries and not the wrong ones.
48  */
49 typedef Oid ConnCacheKey;
50 
51 typedef struct ConnCacheEntry
52 {
53  ConnCacheKey key; /* hash key (must be first) */
54  PGconn *conn; /* connection to foreign server, or NULL */
55  /* Remaining fields are invalid when conn is NULL: */
56  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
57  * one level of subxact open, etc */
58  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
59  bool have_error; /* have any subxacts aborted in this xact? */
60  bool changing_xact_state; /* xact state change in process */
61  bool parallel_commit; /* do we commit (sub)xacts in parallel? */
62  bool invalidated; /* true if reconnect is pending */
63  bool keep_connections; /* setting value of keep_connections
64  * server option */
65  Oid serverid; /* foreign server OID used to get server name */
66  uint32 server_hashvalue; /* hash value of foreign server OID */
67  uint32 mapping_hashvalue; /* hash value of user mapping OID */
68  PgFdwConnState state; /* extra per-connection state */
70 
71 /*
72  * Connection cache (initialized on first use)
73  */
74 static HTAB *ConnectionHash = NULL;
75 
76 /* for assigning cursor numbers and prepared statement numbers */
77 static unsigned int cursor_number = 0;
78 static unsigned int prep_stmt_number = 0;
79 
80 /* tracks whether any work is needed in callback functions */
81 static bool xact_got_connection = false;
82 
83 /*
84  * SQL functions
85  */
89 
90 /* prototypes of private functions */
93 static void disconnect_pg_server(ConnCacheEntry *entry);
94 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
96 static void do_sql_command_begin(PGconn *conn, const char *sql);
97 static void do_sql_command_end(PGconn *conn, const char *sql,
98  bool consume_input);
99 static void begin_remote_xact(ConnCacheEntry *entry);
100 static void pgfdw_xact_callback(XactEvent event, void *arg);
101 static void pgfdw_subxact_callback(SubXactEvent event,
102  SubTransactionId mySubid,
103  SubTransactionId parentSubid,
104  void *arg);
105 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
107 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
108 static bool pgfdw_cancel_query(PGconn *conn);
109 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
110  bool ignore_errors);
111 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
112  PGresult **result, bool *timed_out);
113 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
114 static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
115 static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
116  int curlevel);
118 static bool disconnect_cached_connections(Oid serverid);
119 
120 /*
121  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
122  * server with the user's authorization. A new connection is established
123  * if we don't already have a suitable one, and a transaction is opened at
124  * the right subtransaction nesting depth if we didn't do that already.
125  *
126  * will_prep_stmt must be true if caller intends to create any prepared
127  * statements. Since those don't go away automatically at transaction end
128  * (not even on error), we need this flag to cue manual cleanup.
129  *
130  * If state is not NULL, *state receives the per-connection state associated
131  * with the PGconn.
132  */
133 PGconn *
135 {
136  bool found;
137  bool retry = false;
138  ConnCacheEntry *entry;
141 
142  /* First time through, initialize connection cache hashtable */
143  if (ConnectionHash == NULL)
144  {
145  HASHCTL ctl;
146 
147  ctl.keysize = sizeof(ConnCacheKey);
148  ctl.entrysize = sizeof(ConnCacheEntry);
149  ConnectionHash = hash_create("postgres_fdw connections", 8,
150  &ctl,
152 
153  /*
154  * Register some callback functions that manage connection cleanup.
155  * This should be done just once in each backend.
156  */
163  }
164 
165  /* Set flag that we did GetConnection during the current transaction */
166  xact_got_connection = true;
167 
168  /* Create hash key for the entry. Assume no pad bytes in key struct */
169  key = user->umid;
170 
171  /*
172  * Find or create cached entry for requested connection.
173  */
174  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
175  if (!found)
176  {
177  /*
178  * We need only clear "conn" here; remaining fields will be filled
179  * later when "conn" is set.
180  */
181  entry->conn = NULL;
182  }
183 
184  /* Reject further use of connections which failed abort cleanup. */
186 
187  /*
188  * If the connection needs to be remade due to invalidation, disconnect as
189  * soon as we're out of all transactions.
190  */
191  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
192  {
193  elog(DEBUG3, "closing connection %p for option changes to take effect",
194  entry->conn);
195  disconnect_pg_server(entry);
196  }
197 
198  /*
199  * If cache entry doesn't have a connection, we have to establish a new
200  * connection. (If connect_pg_server throws an error, the cache entry
201  * will remain in a valid empty state, ie conn == NULL.)
202  */
203  if (entry->conn == NULL)
204  make_new_connection(entry, user);
205 
206  /*
207  * We check the health of the cached connection here when using it. In
208  * cases where we're out of all transactions, if a broken connection is
209  * detected, we try to reestablish a new connection later.
210  */
211  PG_TRY();
212  {
213  /* Process a pending asynchronous request if any. */
214  if (entry->state.pendingAreq)
216  /* Start a new transaction or subtransaction if needed. */
217  begin_remote_xact(entry);
218  }
219  PG_CATCH();
220  {
222  ErrorData *errdata = CopyErrorData();
223 
224  /*
225  * Determine whether to try to reestablish the connection.
226  *
227  * After a broken connection is detected in libpq, any error other
228  * than connection failure (e.g., out-of-memory) can be thrown
229  * somewhere between return from libpq and the expected ereport() call
230  * in pgfdw_report_error(). In this case, since PQstatus() indicates
231  * CONNECTION_BAD, checking only PQstatus() causes the false detection
232  * of connection failure. To avoid this, we also verify that the
233  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
234  * checking only the sqlstate can cause another false detection
235  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
236  * for any libpq-originated error condition.
237  */
238  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
239  PQstatus(entry->conn) != CONNECTION_BAD ||
240  entry->xact_depth > 0)
241  {
242  MemoryContextSwitchTo(ecxt);
243  PG_RE_THROW();
244  }
245 
246  /* Clean up the error state */
247  FlushErrorState();
248  FreeErrorData(errdata);
249  errdata = NULL;
250 
251  retry = true;
252  }
253  PG_END_TRY();
254 
255  /*
256  * If a broken connection is detected, disconnect it, reestablish a new
257  * connection and retry a new remote transaction. If connection failure is
258  * reported again, we give up getting a connection.
259  */
260  if (retry)
261  {
262  Assert(entry->xact_depth == 0);
263 
264  ereport(DEBUG3,
265  (errmsg_internal("could not start remote transaction on connection %p",
266  entry->conn)),
267  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
268 
269  elog(DEBUG3, "closing connection %p to reestablish a new one",
270  entry->conn);
271  disconnect_pg_server(entry);
272 
273  if (entry->conn == NULL)
274  make_new_connection(entry, user);
275 
276  begin_remote_xact(entry);
277  }
278 
279  /* Remember if caller will prepare statements */
280  entry->have_prep_stmt |= will_prep_stmt;
281 
282  /* If caller needs access to the per-connection state, return it. */
283  if (state)
284  *state = &entry->state;
285 
286  return entry->conn;
287 }
288 
289 /*
290  * Reset all transient state fields in the cached connection entry and
291  * establish new connection to the remote server.
292  */
293 static void
295 {
296  ForeignServer *server = GetForeignServer(user->serverid);
297  ListCell *lc;
298 
299  Assert(entry->conn == NULL);
300 
301  /* Reset all transient state fields, to be sure all are clean */
302  entry->xact_depth = 0;
303  entry->have_prep_stmt = false;
304  entry->have_error = false;
305  entry->changing_xact_state = false;
306  entry->invalidated = false;
307  entry->serverid = server->serverid;
308  entry->server_hashvalue =
310  ObjectIdGetDatum(server->serverid));
311  entry->mapping_hashvalue =
313  ObjectIdGetDatum(user->umid));
314  memset(&entry->state, 0, sizeof(entry->state));
315 
316  /*
317  * Determine whether to keep the connection that we're about to make here
318  * open even after the transaction using it ends, so that the subsequent
319  * transactions can re-use it.
320  *
321  * By default, all the connections to any foreign servers are kept open.
322  *
323  * Also determine whether to commit (sub)transactions opened on the remote
324  * server in parallel at (sub)transaction end, which is disabled by
325  * default.
326  *
327  * Note: it's enough to determine these only when making a new connection
328  * because if these settings for it are changed, it will be closed and
329  * re-made later.
330  */
331  entry->keep_connections = true;
332  entry->parallel_commit = false;
333  foreach(lc, server->options)
334  {
335  DefElem *def = (DefElem *) lfirst(lc);
336 
337  if (strcmp(def->defname, "keep_connections") == 0)
338  entry->keep_connections = defGetBoolean(def);
339  else if (strcmp(def->defname, "parallel_commit") == 0)
340  entry->parallel_commit = defGetBoolean(def);
341  }
342 
343  /* Now try to make the connection */
344  entry->conn = connect_pg_server(server, user);
345 
346  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
347  entry->conn, server->servername, user->umid, user->userid);
348 }
349 
350 /*
351  * Connect to remote server using specified server and user mapping properties.
352  */
353 static PGconn *
355 {
356  PGconn *volatile conn = NULL;
357 
358  /*
359  * Use PG_TRY block to ensure closing connection on error.
360  */
361  PG_TRY();
362  {
363  const char **keywords;
364  const char **values;
365  char *appname = NULL;
366  int n;
367 
368  /*
369  * Construct connection params from generic options of ForeignServer
370  * and UserMapping. (Some of them might not be libpq options, in
371  * which case we'll just waste a few array slots.) Add 4 extra slots
372  * for application_name, fallback_application_name, client_encoding,
373  * end marker.
374  */
375  n = list_length(server->options) + list_length(user->options) + 4;
376  keywords = (const char **) palloc(n * sizeof(char *));
377  values = (const char **) palloc(n * sizeof(char *));
378 
379  n = 0;
380  n += ExtractConnectionOptions(server->options,
381  keywords + n, values + n);
382  n += ExtractConnectionOptions(user->options,
383  keywords + n, values + n);
384 
385  /*
386  * Use pgfdw_application_name as application_name if set.
387  *
388  * PQconnectdbParams() processes the parameter arrays from start to
389  * end. If any key word is repeated, the last value is used. Therefore
390  * note that pgfdw_application_name must be added to the arrays after
391  * options of ForeignServer are, so that it can override
392  * application_name set in ForeignServer.
393  */
395  {
396  keywords[n] = "application_name";
398  n++;
399  }
400 
401  /*
402  * Search the parameter arrays to find application_name setting, and
403  * replace escape sequences in it with status information if found.
404  * The arrays are searched backwards because the last value is used if
405  * application_name is repeatedly set.
406  */
407  for (int i = n - 1; i >= 0; i--)
408  {
409  if (strcmp(keywords[i], "application_name") == 0 &&
410  *(values[i]) != '\0')
411  {
412  /*
413  * Use this application_name setting if it's not empty string
414  * even after any escape sequences in it are replaced.
415  */
416  appname = process_pgfdw_appname(values[i]);
417  if (appname[0] != '\0')
418  {
419  values[i] = appname;
420  break;
421  }
422 
423  /*
424  * This empty application_name is not used, so we set
425  * values[i] to NULL and keep searching the array to find the
426  * next one.
427  */
428  values[i] = NULL;
429  pfree(appname);
430  appname = NULL;
431  }
432  }
433 
434  /* Use "postgres_fdw" as fallback_application_name */
435  keywords[n] = "fallback_application_name";
436  values[n] = "postgres_fdw";
437  n++;
438 
439  /* Set client_encoding so that libpq can convert encoding properly. */
440  keywords[n] = "client_encoding";
442  n++;
443 
444  keywords[n] = values[n] = NULL;
445 
446  /* verify the set of connection parameters */
447  check_conn_params(keywords, values, user);
448 
449  /*
450  * We must obey fd.c's limit on non-virtual file descriptors. Assume
451  * that a PGconn represents one long-lived FD. (Doing this here also
452  * ensures that VFDs are closed if needed to make room.)
453  */
454  if (!AcquireExternalFD())
455  {
456 #ifndef WIN32 /* can't write #if within ereport() macro */
457  ereport(ERROR,
458  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
459  errmsg("could not connect to server \"%s\"",
460  server->servername),
461  errdetail("There are too many open files on the local server."),
462  errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
463 #else
464  ereport(ERROR,
465  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
466  errmsg("could not connect to server \"%s\"",
467  server->servername),
468  errdetail("There are too many open files on the local server."),
469  errhint("Raise the server's max_files_per_process setting.")));
470 #endif
471  }
472 
473  /* OK to make connection */
474  conn = PQconnectdbParams(keywords, values, false);
475 
476  if (!conn)
477  ReleaseExternalFD(); /* because the PG_CATCH block won't */
478 
479  if (!conn || PQstatus(conn) != CONNECTION_OK)
480  ereport(ERROR,
481  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
482  errmsg("could not connect to server \"%s\"",
483  server->servername),
485 
486  /*
487  * Check that non-superuser has used password to establish connection;
488  * otherwise, he's piggybacking on the postgres server's user
489  * identity. See also dblink_security_check() in contrib/dblink and
490  * check_conn_params.
491  */
494  ereport(ERROR,
495  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
496  errmsg("password is required"),
497  errdetail("Non-superuser cannot connect if the server does not request a password."),
498  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
499 
500  /* Prepare new session for use */
502 
503  if (appname != NULL)
504  pfree(appname);
505  pfree(keywords);
506  pfree(values);
507  }
508  PG_CATCH();
509  {
510  /* Release PGconn data structure if we managed to create one */
511  if (conn)
512  {
513  PQfinish(conn);
515  }
516  PG_RE_THROW();
517  }
518  PG_END_TRY();
519 
520  return conn;
521 }
522 
523 /*
524  * Disconnect any open connection for a connection cache entry.
525  */
526 static void
528 {
529  if (entry->conn != NULL)
530  {
531  PQfinish(entry->conn);
532  entry->conn = NULL;
534  }
535 }
536 
537 /*
538  * Return true if the password_required is defined and false for this user
539  * mapping, otherwise false. The mapping has been pre-validated.
540  */
541 static bool
543 {
544  ListCell *cell;
545 
546  foreach(cell, user->options)
547  {
548  DefElem *def = (DefElem *) lfirst(cell);
549 
550  if (strcmp(def->defname, "password_required") == 0)
551  return defGetBoolean(def);
552  }
553 
554  return true;
555 }
556 
557 /*
558  * For non-superusers, insist that the connstr specify a password. This
559  * prevents a password from being picked up from .pgpass, a service file, the
560  * environment, etc. We don't want the postgres user's passwords,
561  * certificates, etc to be accessible to non-superusers. (See also
562  * dblink_connstr_check in contrib/dblink.)
563  */
564 static void
565 check_conn_params(const char **keywords, const char **values, UserMapping *user)
566 {
567  int i;
568 
569  /* no check required if superuser */
570  if (superuser_arg(user->userid))
571  return;
572 
573  /* ok if params contain a non-empty password */
574  for (i = 0; keywords[i] != NULL; i++)
575  {
576  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
577  return;
578  }
579 
580  /* ok if the superuser explicitly said so at user mapping creation time */
582  return;
583 
584  ereport(ERROR,
585  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
586  errmsg("password is required"),
587  errdetail("Non-superusers must provide a password in the user mapping.")));
588 }
589 
590 /*
591  * Issue SET commands to make sure remote session is configured properly.
592  *
593  * We do this just once at connection, assuming nothing will change the
594  * values later. Since we'll never send volatile function calls to the
595  * remote, there shouldn't be any way to break this assumption from our end.
596  * It's possible to think of ways to break it at the remote end, eg making
597  * a foreign table point to a view that includes a set_config call ---
598  * but once you admit the possibility of a malicious view definition,
599  * there are any number of ways to break things.
600  */
601 static void
603 {
604  int remoteversion = PQserverVersion(conn);
605 
606  /* Force the search path to contain only pg_catalog (see deparse.c) */
607  do_sql_command(conn, "SET search_path = pg_catalog");
608 
609  /*
610  * Set remote timezone; this is basically just cosmetic, since all
611  * transmitted and returned timestamptzs should specify a zone explicitly
612  * anyway. However it makes the regression test outputs more predictable.
613  *
614  * We don't risk setting remote zone equal to ours, since the remote
615  * server might use a different timezone database. Instead, use UTC
616  * (quoted, because very old servers are picky about case).
617  */
618  do_sql_command(conn, "SET timezone = 'UTC'");
619 
620  /*
621  * Set values needed to ensure unambiguous data output from remote. (This
622  * logic should match what pg_dump does. See also set_transmission_modes
623  * in postgres_fdw.c.)
624  */
625  do_sql_command(conn, "SET datestyle = ISO");
626  if (remoteversion >= 80400)
627  do_sql_command(conn, "SET intervalstyle = postgres");
628  if (remoteversion >= 90000)
629  do_sql_command(conn, "SET extra_float_digits = 3");
630  else
631  do_sql_command(conn, "SET extra_float_digits = 2");
632 }
633 
634 /*
635  * Convenience subroutine to issue a non-data-returning SQL command to remote
636  */
637 void
638 do_sql_command(PGconn *conn, const char *sql)
639 {
641  do_sql_command_end(conn, sql, false);
642 }
643 
644 static void
645 do_sql_command_begin(PGconn *conn, const char *sql)
646 {
647  if (!PQsendQuery(conn, sql))
648  pgfdw_report_error(ERROR, NULL, conn, false, sql);
649 }
650 
651 static void
652 do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
653 {
654  PGresult *res;
655 
656  /*
657  * If requested, consume whatever data is available from the socket. (Note
658  * that if all data is available, this allows pgfdw_get_result to call
659  * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
660  * would be large compared to the overhead of PQconsumeInput.)
661  */
662  if (consume_input && !PQconsumeInput(conn))
663  pgfdw_report_error(ERROR, NULL, conn, false, sql);
664  res = pgfdw_get_result(conn, sql);
666  pgfdw_report_error(ERROR, res, conn, true, sql);
667  PQclear(res);
668 }
669 
670 /*
671  * Start remote transaction or subtransaction, if needed.
672  *
673  * Note that we always use at least REPEATABLE READ in the remote session.
674  * This is so that, if a query initiates multiple scans of the same or
675  * different foreign tables, we will get snapshot-consistent results from
676  * those scans. A disadvantage is that we can't provide sane emulation of
677  * READ COMMITTED behavior --- it would be nice if we had some other way to
678  * control which remote queries share a snapshot.
679  */
680 static void
682 {
683  int curlevel = GetCurrentTransactionNestLevel();
684 
685  /* Start main transaction if we haven't yet */
686  if (entry->xact_depth <= 0)
687  {
688  const char *sql;
689 
690  elog(DEBUG3, "starting remote transaction on connection %p",
691  entry->conn);
692 
694  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
695  else
696  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
697  entry->changing_xact_state = true;
698  do_sql_command(entry->conn, sql);
699  entry->xact_depth = 1;
700  entry->changing_xact_state = false;
701  }
702 
703  /*
704  * If we're in a subtransaction, stack up savepoints to match our level.
705  * This ensures we can rollback just the desired effects when a
706  * subtransaction aborts.
707  */
708  while (entry->xact_depth < curlevel)
709  {
710  char sql[64];
711 
712  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
713  entry->changing_xact_state = true;
714  do_sql_command(entry->conn, sql);
715  entry->xact_depth++;
716  entry->changing_xact_state = false;
717  }
718 }
719 
720 /*
721  * Release connection reference count created by calling GetConnection.
722  */
723 void
725 {
726  /*
727  * Currently, we don't actually track connection references because all
728  * cleanup is managed on a transaction or subtransaction basis instead. So
729  * there's nothing to do here.
730  */
731 }
732 
733 /*
734  * Assign a "unique" number for a cursor.
735  *
736  * These really only need to be unique per connection within a transaction.
737  * For the moment we ignore the per-connection point and assign them across
738  * all connections in the transaction, but we ask for the connection to be
739  * supplied in case we want to refine that.
740  *
741  * Note that even if wraparound happens in a very long transaction, actual
742  * collisions are highly improbable; just be sure to use %u not %d to print.
743  */
744 unsigned int
746 {
747  return ++cursor_number;
748 }
749 
750 /*
751  * Assign a "unique" number for a prepared statement.
752  *
753  * This works much like GetCursorNumber, except that we never reset the counter
754  * within a session. That's because we can't be 100% sure we've gotten rid
755  * of all prepared statements on all connections, and it's not really worth
756  * increasing the risk of prepared-statement name collisions by resetting.
757  */
758 unsigned int
760 {
761  return ++prep_stmt_number;
762 }
763 
764 /*
765  * Submit a query and wait for the result.
766  *
767  * This function is interruptible by signals.
768  *
769  * Caller is responsible for the error handling on the result.
770  */
771 PGresult *
773 {
774  /* First, process a pending asynchronous request, if any. */
775  if (state && state->pendingAreq)
776  process_pending_request(state->pendingAreq);
777 
778  /*
779  * Submit a query. Since we don't use non-blocking mode, this also can
780  * block. But its risk is relatively small, so we ignore that for now.
781  */
782  if (!PQsendQuery(conn, query))
783  pgfdw_report_error(ERROR, NULL, conn, false, query);
784 
785  /* Wait for the result. */
786  return pgfdw_get_result(conn, query);
787 }
788 
789 /*
790  * Wait for the result from a prior asynchronous execution function call.
791  *
792  * This function offers quick responsiveness by checking for any interruptions.
793  *
794  * This function emulates PQexec()'s behavior of returning the last result
795  * when there are many.
796  *
797  * Caller is responsible for the error handling on the result.
798  */
799 PGresult *
800 pgfdw_get_result(PGconn *conn, const char *query)
801 {
802  PGresult *volatile last_res = NULL;
803 
804  /* In what follows, do not leak any PGresults on an error. */
805  PG_TRY();
806  {
807  for (;;)
808  {
809  PGresult *res;
810 
811  while (PQisBusy(conn))
812  {
813  int wc;
814 
815  /* Sleep until there's something to do */
819  PQsocket(conn),
820  -1L, PG_WAIT_EXTENSION);
822 
824 
825  /* Data available in socket? */
826  if (wc & WL_SOCKET_READABLE)
827  {
828  if (!PQconsumeInput(conn))
829  pgfdw_report_error(ERROR, NULL, conn, false, query);
830  }
831  }
832 
833  res = PQgetResult(conn);
834  if (res == NULL)
835  break; /* query is complete */
836 
837  PQclear(last_res);
838  last_res = res;
839  }
840  }
841  PG_CATCH();
842  {
843  PQclear(last_res);
844  PG_RE_THROW();
845  }
846  PG_END_TRY();
847 
848  return last_res;
849 }
850 
851 /*
852  * Report an error we got from the remote server.
853  *
854  * elevel: error level to use (typically ERROR, but might be less)
855  * res: PGresult containing the error
856  * conn: connection we did the query on
857  * clear: if true, PQclear the result (otherwise caller will handle it)
858  * sql: NULL, or text of remote command we tried to execute
859  *
860  * Note: callers that choose not to throw ERROR for a remote error are
861  * responsible for making sure that the associated ConnCacheEntry gets
862  * marked with have_error = true.
863  */
864 void
866  bool clear, const char *sql)
867 {
868  /* If requested, PGresult must be released before leaving this function. */
869  PG_TRY();
870  {
871  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
872  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
873  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
874  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
875  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
876  int sqlstate;
877 
878  if (diag_sqlstate)
879  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
880  diag_sqlstate[1],
881  diag_sqlstate[2],
882  diag_sqlstate[3],
883  diag_sqlstate[4]);
884  else
885  sqlstate = ERRCODE_CONNECTION_FAILURE;
886 
887  /*
888  * If we don't get a message from the PGresult, try the PGconn. This
889  * is needed because for connection-level failures, PQexec may just
890  * return NULL, not a PGresult at all.
891  */
892  if (message_primary == NULL)
893  message_primary = pchomp(PQerrorMessage(conn));
894 
895  ereport(elevel,
896  (errcode(sqlstate),
897  (message_primary != NULL && message_primary[0] != '\0') ?
898  errmsg_internal("%s", message_primary) :
899  errmsg("could not obtain message string for remote error"),
900  message_detail ? errdetail_internal("%s", message_detail) : 0,
901  message_hint ? errhint("%s", message_hint) : 0,
902  message_context ? errcontext("%s", message_context) : 0,
903  sql ? errcontext("remote SQL command: %s", sql) : 0));
904  }
905  PG_FINALLY();
906  {
907  if (clear)
908  PQclear(res);
909  }
910  PG_END_TRY();
911 }
912 
913 /*
914  * pgfdw_xact_callback --- cleanup at main-transaction end.
915  *
916  * This runs just late enough that it must not enter user-defined code
917  * locally. (Entering such code on the remote side is fine. Its remote
918  * COMMIT TRANSACTION may run deferred triggers.)
919  */
920 static void
922 {
923  HASH_SEQ_STATUS scan;
924  ConnCacheEntry *entry;
925  List *pending_entries = NIL;
926 
927  /* Quick exit if no connections were touched in this transaction. */
928  if (!xact_got_connection)
929  return;
930 
931  /*
932  * Scan all connection cache entries to find open remote transactions, and
933  * close them.
934  */
936  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
937  {
938  PGresult *res;
939 
940  /* Ignore cache entry if no open connection right now */
941  if (entry->conn == NULL)
942  continue;
943 
944  /* If it has an open remote transaction, try to close it */
945  if (entry->xact_depth > 0)
946  {
947  elog(DEBUG3, "closing remote transaction on connection %p",
948  entry->conn);
949 
950  switch (event)
951  {
954 
955  /*
956  * If abort cleanup previously failed for this connection,
957  * we can't issue any more commands against it.
958  */
960 
961  /* Commit all remote transactions during pre-commit */
962  entry->changing_xact_state = true;
963  if (entry->parallel_commit)
964  {
965  do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
966  pending_entries = lappend(pending_entries, entry);
967  continue;
968  }
969  do_sql_command(entry->conn, "COMMIT TRANSACTION");
970  entry->changing_xact_state = false;
971 
972  /*
973  * If there were any errors in subtransactions, and we
974  * made prepared statements, do a DEALLOCATE ALL to make
975  * sure we get rid of all prepared statements. This is
976  * annoying and not terribly bulletproof, but it's
977  * probably not worth trying harder.
978  *
979  * DEALLOCATE ALL only exists in 8.3 and later, so this
980  * constrains how old a server postgres_fdw can
981  * communicate with. We intentionally ignore errors in
982  * the DEALLOCATE, so that we can hobble along to some
983  * extent with older servers (leaking prepared statements
984  * as we go; but we don't really support update operations
985  * pre-8.3 anyway).
986  */
987  if (entry->have_prep_stmt && entry->have_error)
988  {
989  res = PQexec(entry->conn, "DEALLOCATE ALL");
990  PQclear(res);
991  }
992  entry->have_prep_stmt = false;
993  entry->have_error = false;
994  break;
996 
997  /*
998  * We disallow any remote transactions, since it's not
999  * very reasonable to hold them open until the prepared
1000  * transaction is committed. For the moment, throw error
1001  * unconditionally; later we might allow read-only cases.
1002  * Note that the error will cause us to come right back
1003  * here with event == XACT_EVENT_ABORT, so we'll clean up
1004  * the connection state at that point.
1005  */
1006  ereport(ERROR,
1007  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1008  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1009  break;
1011  case XACT_EVENT_COMMIT:
1012  case XACT_EVENT_PREPARE:
1013  /* Pre-commit should have closed the open transaction */
1014  elog(ERROR, "missed cleaning up connection during pre-commit");
1015  break;
1017  case XACT_EVENT_ABORT:
1018  /* Rollback all remote transactions during abort */
1019  pgfdw_abort_cleanup(entry, true);
1020  break;
1021  }
1022  }
1023 
1024  /* Reset state to show we're out of a transaction */
1025  pgfdw_reset_xact_state(entry, true);
1026  }
1027 
1028  /* If there are any pending connections, finish cleaning them up */
1029  if (pending_entries)
1030  {
1032  event == XACT_EVENT_PRE_COMMIT);
1033  pgfdw_finish_pre_commit_cleanup(pending_entries);
1034  }
1035 
1036  /*
1037  * Regardless of the event type, we can now mark ourselves as out of the
1038  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1039  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1040  */
1041  xact_got_connection = false;
1042 
1043  /* Also reset cursor numbering for next transaction */
1044  cursor_number = 0;
1045 }
1046 
1047 /*
1048  * pgfdw_subxact_callback --- cleanup at subtransaction end.
1049  */
1050 static void
1052  SubTransactionId parentSubid, void *arg)
1053 {
1054  HASH_SEQ_STATUS scan;
1055  ConnCacheEntry *entry;
1056  int curlevel;
1057  List *pending_entries = NIL;
1058 
1059  /* Nothing to do at subxact start, nor after commit. */
1060  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1061  event == SUBXACT_EVENT_ABORT_SUB))
1062  return;
1063 
1064  /* Quick exit if no connections were touched in this transaction. */
1065  if (!xact_got_connection)
1066  return;
1067 
1068  /*
1069  * Scan all connection cache entries to find open remote subtransactions
1070  * of the current level, and close them.
1071  */
1072  curlevel = GetCurrentTransactionNestLevel();
1073  hash_seq_init(&scan, ConnectionHash);
1074  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1075  {
1076  char sql[100];
1077 
1078  /*
1079  * We only care about connections with open remote subtransactions of
1080  * the current level.
1081  */
1082  if (entry->conn == NULL || entry->xact_depth < curlevel)
1083  continue;
1084 
1085  if (entry->xact_depth > curlevel)
1086  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1087  entry->xact_depth);
1088 
1089  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1090  {
1091  /*
1092  * If abort cleanup previously failed for this connection, we
1093  * can't issue any more commands against it.
1094  */
1096 
1097  /* Commit all remote subtransactions during pre-commit */
1098  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1099  entry->changing_xact_state = true;
1100  if (entry->parallel_commit)
1101  {
1102  do_sql_command_begin(entry->conn, sql);
1103  pending_entries = lappend(pending_entries, entry);
1104  continue;
1105  }
1106  do_sql_command(entry->conn, sql);
1107  entry->changing_xact_state = false;
1108  }
1109  else
1110  {
1111  /* Rollback all remote subtransactions during abort */
1112  pgfdw_abort_cleanup(entry, false);
1113  }
1114 
1115  /* OK, we're outta that level of subtransaction */
1116  pgfdw_reset_xact_state(entry, false);
1117  }
1118 
1119  /* If there are any pending connections, finish cleaning them up */
1120  if (pending_entries)
1121  {
1123  pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1124  }
1125 }
1126 
1127 /*
1128  * Connection invalidation callback function
1129  *
1130  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1131  * close connections depending on that entry immediately if current transaction
1132  * has not used those connections yet. Otherwise, mark those connections as
1133  * invalid and then make pgfdw_xact_callback() close them at the end of current
1134  * transaction, since they cannot be closed in the midst of the transaction
1135  * using them. Closed connections will be remade at the next opportunity if
1136  * necessary.
1137  *
1138  * Although most cache invalidation callbacks blow away all the related stuff
1139  * regardless of the given hashvalue, connections are expensive enough that
1140  * it's worth trying to avoid that.
1141  *
1142  * NB: We could avoid unnecessary disconnection more strictly by examining
1143  * individual option values, but it seems too much effort for the gain.
1144  */
1145 static void
1146 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1147 {
1148  HASH_SEQ_STATUS scan;
1149  ConnCacheEntry *entry;
1150 
1151  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1152 
1153  /* ConnectionHash must exist already, if we're registered */
1154  hash_seq_init(&scan, ConnectionHash);
1155  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1156  {
1157  /* Ignore invalid entries */
1158  if (entry->conn == NULL)
1159  continue;
1160 
1161  /* hashvalue == 0 means a cache reset, must clear all state */
1162  if (hashvalue == 0 ||
1163  (cacheid == FOREIGNSERVEROID &&
1164  entry->server_hashvalue == hashvalue) ||
1165  (cacheid == USERMAPPINGOID &&
1166  entry->mapping_hashvalue == hashvalue))
1167  {
1168  /*
1169  * Close the connection immediately if it's not used yet in this
1170  * transaction. Otherwise mark it as invalid so that
1171  * pgfdw_xact_callback() can close it at the end of this
1172  * transaction.
1173  */
1174  if (entry->xact_depth == 0)
1175  {
1176  elog(DEBUG3, "discarding connection %p", entry->conn);
1177  disconnect_pg_server(entry);
1178  }
1179  else
1180  entry->invalidated = true;
1181  }
1182  }
1183 }
1184 
1185 /*
1186  * Raise an error if the given connection cache entry is marked as being
1187  * in the middle of an xact state change. This should be called at which no
1188  * such change is expected to be in progress; if one is found to be in
1189  * progress, it means that we aborted in the middle of a previous state change
1190  * and now don't know what the remote transaction state actually is.
1191  * Such connections can't safely be further used. Re-establishing the
1192  * connection would change the snapshot and roll back any writes already
1193  * performed, so that's not an option, either. Thus, we must abort.
1194  */
1195 static void
1197 {
1198  ForeignServer *server;
1199 
1200  /* nothing to do for inactive entries and entries of sane state */
1201  if (entry->conn == NULL || !entry->changing_xact_state)
1202  return;
1203 
1204  /* make sure this entry is inactive */
1205  disconnect_pg_server(entry);
1206 
1207  /* find server name to be shown in the message below */
1208  server = GetForeignServer(entry->serverid);
1209 
1210  ereport(ERROR,
1211  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1212  errmsg("connection to server \"%s\" was lost",
1213  server->servername)));
1214 }
1215 
1216 /*
1217  * Reset state to show we're out of a (sub)transaction.
1218  */
1219 static void
1221 {
1222  if (toplevel)
1223  {
1224  /* Reset state to show we're out of a transaction */
1225  entry->xact_depth = 0;
1226 
1227  /*
1228  * If the connection isn't in a good idle state, it is marked as
1229  * invalid or keep_connections option of its server is disabled, then
1230  * discard it to recover. Next GetConnection will open a new
1231  * connection.
1232  */
1233  if (PQstatus(entry->conn) != CONNECTION_OK ||
1234  PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1235  entry->changing_xact_state ||
1236  entry->invalidated ||
1237  !entry->keep_connections)
1238  {
1239  elog(DEBUG3, "discarding connection %p", entry->conn);
1240  disconnect_pg_server(entry);
1241  }
1242  }
1243  else
1244  {
1245  /* Reset state to show we're out of a subtransaction */
1246  entry->xact_depth--;
1247  }
1248 }
1249 
1250 /*
1251  * Cancel the currently-in-progress query (whose query text we do not have)
1252  * and ignore the result. Returns true if we successfully cancel the query
1253  * and discard any pending result, and false if not.
1254  *
1255  * It's not a huge problem if we throw an ERROR here, but if we get into error
1256  * recursion trouble, we'll end up slamming the connection shut, which will
1257  * necessitate failing the entire toplevel transaction even if subtransactions
1258  * were used. Try to use WARNING where we can.
1259  *
1260  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1261  * query text from the pendingAreq saved in the per-connection state, then
1262  * report the query using it.
1263  */
1264 static bool
1266 {
1267  PGcancel *cancel;
1268  char errbuf[256];
1269  PGresult *result = NULL;
1270  TimestampTz endtime;
1271  bool timed_out;
1272 
1273  /*
1274  * If it takes too long to cancel the query and discard the result, assume
1275  * the connection is dead.
1276  */
1278 
1279  /*
1280  * Issue cancel request. Unfortunately, there's no good way to limit the
1281  * amount of time that we might block inside PQgetCancel().
1282  */
1283  if ((cancel = PQgetCancel(conn)))
1284  {
1285  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1286  {
1287  ereport(WARNING,
1288  (errcode(ERRCODE_CONNECTION_FAILURE),
1289  errmsg("could not send cancel request: %s",
1290  errbuf)));
1291  PQfreeCancel(cancel);
1292  return false;
1293  }
1294  PQfreeCancel(cancel);
1295  }
1296 
1297  /* Get and discard the result of the query. */
1298  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1299  {
1300  if (timed_out)
1301  ereport(WARNING,
1302  (errmsg("could not get result of cancel request due to timeout")));
1303  else
1304  ereport(WARNING,
1305  (errcode(ERRCODE_CONNECTION_FAILURE),
1306  errmsg("could not get result of cancel request: %s",
1307  pchomp(PQerrorMessage(conn)))));
1308 
1309  return false;
1310  }
1311  PQclear(result);
1312 
1313  return true;
1314 }
1315 
1316 /*
1317  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1318  * result. If the query is executed without error, the return value is true.
1319  * If the query is executed successfully but returns an error, the return
1320  * value is true if and only if ignore_errors is set. If the query can't be
1321  * sent or times out, the return value is false.
1322  *
1323  * It's not a huge problem if we throw an ERROR here, but if we get into error
1324  * recursion trouble, we'll end up slamming the connection shut, which will
1325  * necessitate failing the entire toplevel transaction even if subtransactions
1326  * were used. Try to use WARNING where we can.
1327  */
1328 static bool
1329 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1330 {
1331  PGresult *result = NULL;
1332  TimestampTz endtime;
1333  bool timed_out;
1334 
1335  /*
1336  * If it takes too long to execute a cleanup query, assume the connection
1337  * is dead. It's fairly likely that this is why we aborted in the first
1338  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1339  * be too long.
1340  */
1342 
1343  /*
1344  * Submit a query. Since we don't use non-blocking mode, this also can
1345  * block. But its risk is relatively small, so we ignore that for now.
1346  */
1347  if (!PQsendQuery(conn, query))
1348  {
1349  pgfdw_report_error(WARNING, NULL, conn, false, query);
1350  return false;
1351  }
1352 
1353  /* Get the result of the query. */
1354  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1355  {
1356  if (timed_out)
1357  ereport(WARNING,
1358  (errmsg("could not get query result due to timeout"),
1359  query ? errcontext("remote SQL command: %s", query) : 0));
1360  else
1361  pgfdw_report_error(WARNING, NULL, conn, false, query);
1362 
1363  return false;
1364  }
1365 
1366  /* Issue a warning if not successful. */
1367  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1368  {
1369  pgfdw_report_error(WARNING, result, conn, true, query);
1370  return ignore_errors;
1371  }
1372  PQclear(result);
1373 
1374  return true;
1375 }
1376 
1377 /*
1378  * Get, during abort cleanup, the result of a query that is in progress. This
1379  * might be a query that is being interrupted by transaction abort, or it might
1380  * be a query that was initiated as part of transaction abort to get the remote
1381  * side back to the appropriate state.
1382  *
1383  * endtime is the time at which we should give up and assume the remote
1384  * side is dead. Returns true if the timeout expired or connection trouble
1385  * occurred, false otherwise. Sets *result except in case of a timeout.
1386  * Sets timed_out to true only when the timeout expired.
1387  */
1388 static bool
1390  bool *timed_out)
1391 {
1392  volatile bool failed = false;
1393  PGresult *volatile last_res = NULL;
1394 
1395  *timed_out = false;
1396 
1397  /* In what follows, do not leak any PGresults on an error. */
1398  PG_TRY();
1399  {
1400  for (;;)
1401  {
1402  PGresult *res;
1403 
1404  while (PQisBusy(conn))
1405  {
1406  int wc;
1408  long cur_timeout;
1409 
1410  /* If timeout has expired, give up, else get sleep time. */
1411  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1412  if (cur_timeout <= 0)
1413  {
1414  *timed_out = true;
1415  failed = true;
1416  goto exit;
1417  }
1418 
1419  /* Sleep until there's something to do */
1423  PQsocket(conn),
1424  cur_timeout, PG_WAIT_EXTENSION);
1426 
1428 
1429  /* Data available in socket? */
1430  if (wc & WL_SOCKET_READABLE)
1431  {
1432  if (!PQconsumeInput(conn))
1433  {
1434  /* connection trouble */
1435  failed = true;
1436  goto exit;
1437  }
1438  }
1439  }
1440 
1441  res = PQgetResult(conn);
1442  if (res == NULL)
1443  break; /* query is complete */
1444 
1445  PQclear(last_res);
1446  last_res = res;
1447  }
1448 exit: ;
1449  }
1450  PG_CATCH();
1451  {
1452  PQclear(last_res);
1453  PG_RE_THROW();
1454  }
1455  PG_END_TRY();
1456 
1457  if (failed)
1458  PQclear(last_res);
1459  else
1460  *result = last_res;
1461  return failed;
1462 }
1463 
1464 /*
1465  * Abort remote transaction or subtransaction.
1466  *
1467  * "toplevel" should be set to true if toplevel (main) transaction is
1468  * rollbacked, false otherwise.
1469  *
1470  * Set entry->changing_xact_state to false on success, true on failure.
1471  */
1472 static void
1473 pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1474 {
1475  char sql[100];
1476 
1477  /*
1478  * Don't try to clean up the connection if we're already in error
1479  * recursion trouble.
1480  */
1482  entry->changing_xact_state = true;
1483 
1484  /*
1485  * If connection is already unsalvageable, don't touch it further.
1486  */
1487  if (entry->changing_xact_state)
1488  return;
1489 
1490  /*
1491  * Mark this connection as in the process of changing transaction state.
1492  */
1493  entry->changing_xact_state = true;
1494 
1495  /* Assume we might have lost track of prepared statements */
1496  entry->have_error = true;
1497 
1498  /*
1499  * If a command has been submitted to the remote server by using an
1500  * asynchronous execution function, the command might not have yet
1501  * completed. Check to see if a command is still being processed by the
1502  * remote server, and if so, request cancellation of the command.
1503  */
1504  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1505  !pgfdw_cancel_query(entry->conn))
1506  return; /* Unable to cancel running query */
1507 
1508  if (toplevel)
1509  snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
1510  else
1511  snprintf(sql, sizeof(sql),
1512  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
1513  entry->xact_depth, entry->xact_depth);
1514  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1515  return; /* Unable to abort remote (sub)transaction */
1516 
1517  if (toplevel)
1518  {
1519  if (entry->have_prep_stmt && entry->have_error &&
1521  "DEALLOCATE ALL",
1522  true))
1523  return; /* Trouble clearing prepared statements */
1524 
1525  entry->have_prep_stmt = false;
1526  entry->have_error = false;
1527  }
1528 
1529  /*
1530  * If pendingAreq of the per-connection state is not NULL, it means that
1531  * an asynchronous fetch begun by fetch_more_data_begin() was not done
1532  * successfully and thus the per-connection state was not reset in
1533  * fetch_more_data(); in that case reset the per-connection state here.
1534  */
1535  if (entry->state.pendingAreq)
1536  memset(&entry->state, 0, sizeof(entry->state));
1537 
1538  /* Disarm changing_xact_state if it all worked */
1539  entry->changing_xact_state = false;
1540 }
1541 
1542 /*
1543  * Finish pre-commit cleanup of connections on each of which we've sent a
1544  * COMMIT command to the remote server.
1545  */
1546 static void
1548 {
1549  ConnCacheEntry *entry;
1550  List *pending_deallocs = NIL;
1551  ListCell *lc;
1552 
1553  Assert(pending_entries);
1554 
1555  /*
1556  * Get the result of the COMMIT command for each of the pending entries
1557  */
1558  foreach(lc, pending_entries)
1559  {
1560  entry = (ConnCacheEntry *) lfirst(lc);
1561 
1562  Assert(entry->changing_xact_state);
1563 
1564  /*
1565  * We might already have received the result on the socket, so pass
1566  * consume_input=true to try to consume it first
1567  */
1568  do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1569  entry->changing_xact_state = false;
1570 
1571  /* Do a DEALLOCATE ALL in parallel if needed */
1572  if (entry->have_prep_stmt && entry->have_error)
1573  {
1574  /* Ignore errors (see notes in pgfdw_xact_callback) */
1575  if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1576  {
1577  pending_deallocs = lappend(pending_deallocs, entry);
1578  continue;
1579  }
1580  }
1581  entry->have_prep_stmt = false;
1582  entry->have_error = false;
1583 
1584  pgfdw_reset_xact_state(entry, true);
1585  }
1586 
1587  /* No further work if no pending entries */
1588  if (!pending_deallocs)
1589  return;
1590 
1591  /*
1592  * Get the result of the DEALLOCATE command for each of the pending
1593  * entries
1594  */
1595  foreach(lc, pending_deallocs)
1596  {
1597  PGresult *res;
1598 
1599  entry = (ConnCacheEntry *) lfirst(lc);
1600 
1601  /* Ignore errors (see notes in pgfdw_xact_callback) */
1602  while ((res = PQgetResult(entry->conn)) != NULL)
1603  {
1604  PQclear(res);
1605  /* Stop if the connection is lost (else we'll loop infinitely) */
1606  if (PQstatus(entry->conn) == CONNECTION_BAD)
1607  break;
1608  }
1609  entry->have_prep_stmt = false;
1610  entry->have_error = false;
1611 
1612  pgfdw_reset_xact_state(entry, true);
1613  }
1614 }
1615 
1616 /*
1617  * Finish pre-subcommit cleanup of connections on each of which we've sent a
1618  * RELEASE command to the remote server.
1619  */
1620 static void
1621 pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1622 {
1623  ConnCacheEntry *entry;
1624  char sql[100];
1625  ListCell *lc;
1626 
1627  Assert(pending_entries);
1628 
1629  /*
1630  * Get the result of the RELEASE command for each of the pending entries
1631  */
1632  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1633  foreach(lc, pending_entries)
1634  {
1635  entry = (ConnCacheEntry *) lfirst(lc);
1636 
1637  Assert(entry->changing_xact_state);
1638 
1639  /*
1640  * We might already have received the result on the socket, so pass
1641  * consume_input=true to try to consume it first
1642  */
1643  do_sql_command_end(entry->conn, sql, true);
1644  entry->changing_xact_state = false;
1645 
1646  pgfdw_reset_xact_state(entry, false);
1647  }
1648 }
1649 
1650 /*
1651  * List active foreign server connections.
1652  *
1653  * This function takes no input parameter and returns setof record made of
1654  * following values:
1655  * - server_name - server name of active connection. In case the foreign server
1656  * is dropped but still the connection is active, then the server name will
1657  * be NULL in output.
1658  * - valid - true/false representing whether the connection is valid or not.
1659  * Note that the connections can get invalidated in pgfdw_inval_callback.
1660  *
1661  * No records are returned when there are no cached connections at all.
1662  */
1663 Datum
1665 {
1666 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1667  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1668  HASH_SEQ_STATUS scan;
1669  ConnCacheEntry *entry;
1670 
1671  InitMaterializedSRF(fcinfo, 0);
1672 
1673  /* If cache doesn't exist, we return no records */
1674  if (!ConnectionHash)
1675  PG_RETURN_VOID();
1676 
1677  hash_seq_init(&scan, ConnectionHash);
1678  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1679  {
1680  ForeignServer *server;
1682  bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
1683 
1684  /* We only look for open remote connections */
1685  if (!entry->conn)
1686  continue;
1687 
1688  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
1689 
1690  /*
1691  * The foreign server may have been dropped in current explicit
1692  * transaction. It is not possible to drop the server from another
1693  * session when the connection associated with it is in use in the
1694  * current transaction, if tried so, the drop query in another session
1695  * blocks until the current transaction finishes.
1696  *
1697  * Even though the server is dropped in the current transaction, the
1698  * cache can still have associated active connection entry, say we
1699  * call such connections dangling. Since we can not fetch the server
1700  * name from system catalogs for dangling connections, instead we show
1701  * NULL value for server name in output.
1702  *
1703  * We could have done better by storing the server name in the cache
1704  * entry instead of server oid so that it could be used in the output.
1705  * But the server name in each cache entry requires 64 bytes of
1706  * memory, which is huge, when there are many cached connections and
1707  * the use case i.e. dropping the foreign server within the explicit
1708  * current transaction seems rare. So, we chose to show NULL value for
1709  * server name in output.
1710  *
1711  * Such dangling connections get closed either in next use or at the
1712  * end of current explicit transaction in pgfdw_xact_callback.
1713  */
1714  if (!server)
1715  {
1716  /*
1717  * If the server has been dropped in the current explicit
1718  * transaction, then this entry would have been invalidated in
1719  * pgfdw_inval_callback at the end of drop server command. Note
1720  * that this connection would not have been closed in
1721  * pgfdw_inval_callback because it is still being used in the
1722  * current explicit transaction. So, assert that here.
1723  */
1724  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
1725 
1726  /* Show null, if no server name was found */
1727  nulls[0] = true;
1728  }
1729  else
1730  values[0] = CStringGetTextDatum(server->servername);
1731 
1732  values[1] = BoolGetDatum(!entry->invalidated);
1733 
1734  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1735  }
1736 
1737  PG_RETURN_VOID();
1738 }
1739 
1740 /*
1741  * Disconnect the specified cached connections.
1742  *
1743  * This function discards the open connections that are established by
1744  * postgres_fdw from the local session to the foreign server with
1745  * the given name. Note that there can be multiple connections to
1746  * the given server using different user mappings. If the connections
1747  * are used in the current local transaction, they are not disconnected
1748  * and warning messages are reported. This function returns true
1749  * if it disconnects at least one connection, otherwise false. If no
1750  * foreign server with the given name is found, an error is reported.
1751  */
1752 Datum
1754 {
1755  ForeignServer *server;
1756  char *servername;
1757 
1758  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
1759  server = GetForeignServerByName(servername, false);
1760 
1762 }
1763 
1764 /*
1765  * Disconnect all the cached connections.
1766  *
1767  * This function discards all the open connections that are established by
1768  * postgres_fdw from the local session to the foreign servers.
1769  * If the connections are used in the current local transaction, they are
1770  * not disconnected and warning messages are reported. This function
1771  * returns true if it disconnects at least one connection, otherwise false.
1772  */
1773 Datum
1775 {
1777 }
1778 
1779 /*
1780  * Workhorse to disconnect cached connections.
1781  *
1782  * This function scans all the connection cache entries and disconnects
1783  * the open connections whose foreign server OID matches with
1784  * the specified one. If InvalidOid is specified, it disconnects all
1785  * the cached connections.
1786  *
1787  * This function emits a warning for each connection that's used in
1788  * the current transaction and doesn't close it. It returns true if
1789  * it disconnects at least one connection, otherwise false.
1790  *
1791  * Note that this function disconnects even the connections that are
1792  * established by other users in the same local session using different
1793  * user mappings. This leads even non-superuser to be able to close
1794  * the connections established by superusers in the same local session.
1795  *
1796  * XXX As of now we don't see any security risk doing this. But we should
1797  * set some restrictions on that, for example, prevent non-superuser
1798  * from closing the connections established by superusers even
1799  * in the same session?
1800  */
1801 static bool
1803 {
1804  HASH_SEQ_STATUS scan;
1805  ConnCacheEntry *entry;
1806  bool all = !OidIsValid(serverid);
1807  bool result = false;
1808 
1809  /*
1810  * Connection cache hashtable has not been initialized yet in this
1811  * session, so return false.
1812  */
1813  if (!ConnectionHash)
1814  return false;
1815 
1816  hash_seq_init(&scan, ConnectionHash);
1817  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1818  {
1819  /* Ignore cache entry if no open connection right now. */
1820  if (!entry->conn)
1821  continue;
1822 
1823  if (all || entry->serverid == serverid)
1824  {
1825  /*
1826  * Emit a warning because the connection to close is used in the
1827  * current transaction and cannot be disconnected right now.
1828  */
1829  if (entry->xact_depth > 0)
1830  {
1831  ForeignServer *server;
1832 
1833  server = GetForeignServerExtended(entry->serverid,
1834  FSV_MISSING_OK);
1835 
1836  if (!server)
1837  {
1838  /*
1839  * If the foreign server was dropped while its connection
1840  * was used in the current transaction, the connection
1841  * must have been marked as invalid by
1842  * pgfdw_inval_callback at the end of DROP SERVER command.
1843  */
1844  Assert(entry->invalidated);
1845 
1846  ereport(WARNING,
1847  (errmsg("cannot close dropped server connection because it is still in use")));
1848  }
1849  else
1850  ereport(WARNING,
1851  (errmsg("cannot close connection for server \"%s\" because it is still in use",
1852  server->servername)));
1853  }
1854  else
1855  {
1856  elog(DEBUG3, "discarding connection %p", entry->conn);
1857  disconnect_pg_server(entry);
1858  result = true;
1859  }
1860  }
1861  }
1862 
1863  return result;
1864 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1701
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1573
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1537
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:85
unsigned int uint32
Definition: c.h:442
uint32 SubTransactionId
Definition: c.h:592
#define OidIsValid(objectId)
Definition: c.h:711
Oid ConnCacheKey
Definition: connection.c:49
static unsigned int prep_stmt_number
Definition: connection.c:78
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:745
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:542
Datum postgres_fdw_get_connections(PG_FUNCTION_ARGS)
Definition: connection.c:1664
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:638
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
Definition: connection.c:1621
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:527
void ReleaseConnection(PGconn *conn)
Definition: connection.c:724
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections)
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:800
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1220
static void configure_remote_session(PGconn *conn)
Definition: connection.c:602
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:865
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
Definition: connection.c:772
static bool xact_got_connection
Definition: connection.c:81
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
struct ConnCacheEntry ConnCacheEntry
Datum postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
Definition: connection.c:1774
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
Definition: connection.c:652
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
Definition: connection.c:1389
static HTAB * ConnectionHash
Definition: connection.c:74
static unsigned int cursor_number
Definition: connection.c:77
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:294
Datum postgres_fdw_disconnect(PG_FUNCTION_ARGS)
Definition: connection.c:1753
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:1051
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:354
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1329
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
Definition: connection.c:134
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:759
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1196
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:565
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1146
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:921
static void do_sql_command_begin(PGconn *conn, const char *sql)
Definition: connection.c:645
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1473
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:681
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1265
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Definition: connection.c:1547
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:1802
char * process_pgfdw_appname(const char *appname)
Definition: option.c:462
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:385
char * pgfdw_application_name
Definition: option.c:52
int64 TimestampTz
Definition: timestamp.h:39
bool defGetBoolean(DefElem *def)
Definition: define.c:108
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1431
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1421
int errmsg_internal(const char *fmt,...)
Definition: elog.c:993
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1613
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1066
int errdetail(const char *fmt,...)
Definition: elog.c:1039
void FlushErrorState(void)
Definition: elog.c:1651
int errhint(const char *fmt,...)
Definition: elog.c:1153
int errcode(int sqlerrcode)
Definition: elog.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:906
bool in_error_recursion_trouble(void)
Definition: elog.c:290
ErrorData * CopyErrorData(void)
Definition: elog.c:1557
#define PG_RE_THROW()
Definition: elog.h:350
#define errcontext
Definition: elog.h:192
#define DEBUG3
Definition: elog.h:24
#define PG_TRY(...)
Definition: elog.h:309
#define WARNING
Definition: elog.h:32
#define PG_END_TRY(...)
Definition: elog.h:334
#define ERROR
Definition: elog.h:35
#define PG_CATCH(...)
Definition: elog.h:319
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:52
#define PG_FINALLY(...)
Definition: elog.h:326
#define ereport(elevel,...)
Definition: elog.h:145
void ReleaseExternalFD(void)
Definition: fd.c:1152
bool AcquireExternalFD(void)
Definition: fd.c:1099
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6733
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4235
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:652
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6698
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6809
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6743
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4349
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6690
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4130
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4303
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6769
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3240
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2225
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1953
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1418
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2000
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3295
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2031
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:123
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:111
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:182
#define FSV_MISSING_OK
Definition: foreign.h:61
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
struct Latch * MyLatch
Definition: globals.c:58
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:524
void ResetLatch(Latch *latch)
Definition: latch.c:683
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
@ CONNECTION_BAD
Definition: libpq-fe.h:61
@ CONNECTION_OK
Definition: libpq-fe.h:60
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
@ PQTRANS_IDLE
Definition: libpq-fe.h:118
@ PQTRANS_ACTIVE
Definition: libpq-fe.h:119
Assert(fmt[strlen(fmt) - 1] !='\n')
exit(1)
List * lappend(List *list, void *datum)
Definition: list.c:338
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1216
char * pchomp(const char *in)
Definition: mcxt.c:1511
void pfree(void *pointer)
Definition: mcxt.c:1306
MemoryContext CurrentMemoryContext
Definition: mcxt.c:124
void * palloc(Size size)
Definition: mcxt.c:1199
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:135
void * arg
#define lfirst(lc)
Definition: pg_list.h:170
static int list_length(const List *l)
Definition: pg_list.h:150
#define NIL
Definition: pg_list.h:66
static char * user
Definition: pg_regress.c:93
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:412
static Datum BoolGetDatum(bool X)
Definition: postgres.h:450
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:600
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:59
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:57
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:58
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:63
void process_pending_request(AsyncRequest *areq)
PGconn * conn
Definition: streamutil.c:54
PGconn * conn
Definition: connection.c:54
bool have_prep_stmt
Definition: connection.c:58
PgFdwConnState state
Definition: connection.c:68
bool invalidated
Definition: connection.c:62
ConnCacheKey key
Definition: connection.c:53
bool parallel_commit
Definition: connection.c:61
uint32 server_hashvalue
Definition: connection.c:66
uint32 mapping_hashvalue
Definition: connection.c:67
bool keep_connections
Definition: connection.c:63
bool changing_xact_state
Definition: connection.c:60
char * defname
Definition: parsenodes.h:777
int sqlerrcode
Definition: elog.h:377
List * options
Definition: foreign.h:42
char * servername
Definition: foreign.h:39
Oid serverid
Definition: foreign.h:36
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
Definition: dynahash.c:220
Definition: pg_list.h:52
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:134
TupleDesc setDesc
Definition: execnodes.h:332
Tuplestorestate * setResult
Definition: execnodes.h:331
Definition: regguts.h:318
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
@ FOREIGNSERVEROID
Definition: syscache.h:64
@ USERMAPPINGOID
Definition: syscache.h:115
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:206
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:84
char * text_to_cstring(const text *t)
Definition: varlena.c:222
#define PG_WAIT_EXTENSION
Definition: wait_event.h:23
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:913
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3630
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3690
SubXactEvent
Definition: xact.h:134
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition: xact.h:138
@ SUBXACT_EVENT_ABORT_SUB
Definition: xact.h:137
XactEvent
Definition: xact.h:120
@ XACT_EVENT_PRE_PREPARE
Definition: xact.h:128
@ XACT_EVENT_COMMIT
Definition: xact.h:121
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition: xact.h:127
@ XACT_EVENT_PARALLEL_COMMIT
Definition: xact.h:122
@ XACT_EVENT_ABORT
Definition: xact.h:123
@ XACT_EVENT_PRE_COMMIT
Definition: xact.h:126
@ XACT_EVENT_PARALLEL_ABORT
Definition: xact.h:124
@ XACT_EVENT_PREPARE
Definition: xact.h:125
#define IsolationIsSerializable()
Definition: xact.h:52