PostgreSQL Source Code  git master
connection.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq-be.h"
#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postgres_fdw.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheEntry
 

Macros

#define CONNECTION_CLEANUP_TIMEOUT   30000
 
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
 
#define POSTGRES_FDW_GET_CONNECTIONS_COLS   2
 

Typedefs

typedef Oid ConnCacheKey
 
typedef struct ConnCacheEntry ConnCacheEntry
 

Functions

 PG_FUNCTION_INFO_V1 (postgres_fdw_get_connections)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_disconnect)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_disconnect_all)
 
static void make_new_connection (ConnCacheEntry *entry, UserMapping *user)
 
static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
 
static void disconnect_pg_server (ConnCacheEntry *entry)
 
static void check_conn_params (const char **keywords, const char **values, UserMapping *user)
 
static void configure_remote_session (PGconn *conn)
 
static void do_sql_command_begin (PGconn *conn, const char *sql)
 
static void do_sql_command_end (PGconn *conn, const char *sql, bool consume_input)
 
static void begin_remote_xact (ConnCacheEntry *entry)
 
static void pgfdw_xact_callback (XactEvent event, void *arg)
 
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
 
static void pgfdw_inval_callback (Datum arg, int cacheid, uint32 hashvalue)
 
static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry *entry)
 
static void pgfdw_reset_xact_state (ConnCacheEntry *entry, bool toplevel)
 
static bool pgfdw_cancel_query (PGconn *conn)
 
static bool pgfdw_cancel_query_begin (PGconn *conn)
 
static bool pgfdw_cancel_query_end (PGconn *conn, TimestampTz endtime, bool consume_input)
 
static bool pgfdw_exec_cleanup_query (PGconn *conn, const char *query, bool ignore_errors)
 
static bool pgfdw_exec_cleanup_query_begin (PGconn *conn, const char *query)
 
static bool pgfdw_exec_cleanup_query_end (PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
 
static bool pgfdw_get_cleanup_result (PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
 
static void pgfdw_abort_cleanup (ConnCacheEntry *entry, bool toplevel)
 
static bool pgfdw_abort_cleanup_begin (ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
 
static void pgfdw_finish_pre_commit_cleanup (List *pending_entries)
 
static void pgfdw_finish_pre_subcommit_cleanup (List *pending_entries, int curlevel)
 
static void pgfdw_finish_abort_cleanup (List *pending_entries, List *cancel_requested, bool toplevel)
 
static void pgfdw_security_check (const char **keywords, const char **values, UserMapping *user, PGconn *conn)
 
static bool UserMappingPasswordRequired (UserMapping *user)
 
static bool disconnect_cached_connections (Oid serverid)
 
PGconnGetConnection (UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
 
void do_sql_command (PGconn *conn, const char *sql)
 
void ReleaseConnection (PGconn *conn)
 
unsigned int GetCursorNumber (PGconn *conn)
 
unsigned int GetPrepStmtNumber (PGconn *conn)
 
PGresultpgfdw_exec_query (PGconn *conn, const char *query, PgFdwConnState *state)
 
PGresultpgfdw_get_result (PGconn *conn, const char *query)
 
void pgfdw_report_error (int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
 
Datum postgres_fdw_get_connections (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_disconnect (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_disconnect_all (PG_FUNCTION_ARGS)
 

Variables

static HTABConnectionHash = NULL
 
static unsigned int cursor_number = 0
 
static unsigned int prep_stmt_number = 0
 
static bool xact_got_connection = false
 

Macro Definition Documentation

◆ CONNECTION_CLEANUP_TIMEOUT

#define CONNECTION_CLEANUP_TIMEOUT   30000

Definition at line 91 of file connection.c.

◆ CONSTRUCT_ABORT_COMMAND

#define CONSTRUCT_ABORT_COMMAND (   sql,
  entry,
  toplevel 
)
Value:
do { \
if (toplevel) \
snprintf((sql), sizeof(sql), \
"ABORT TRANSACTION"); \
snprintf((sql), sizeof(sql), \
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
(entry)->xact_depth, (entry)->xact_depth); \
} while(0)
#define snprintf
Definition: port.h:238

Definition at line 94 of file connection.c.

◆ POSTGRES_FDW_GET_CONNECTIONS_COLS

#define POSTGRES_FDW_GET_CONNECTIONS_COLS   2

Typedef Documentation

◆ ConnCacheEntry

◆ ConnCacheKey

typedef Oid ConnCacheKey

Definition at line 51 of file connection.c.

Function Documentation

◆ begin_remote_xact()

static void begin_remote_xact ( ConnCacheEntry entry)
static

Definition at line 727 of file connection.c.

728 {
729  int curlevel = GetCurrentTransactionNestLevel();
730 
731  /* Start main transaction if we haven't yet */
732  if (entry->xact_depth <= 0)
733  {
734  const char *sql;
735 
736  elog(DEBUG3, "starting remote transaction on connection %p",
737  entry->conn);
738 
740  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
741  else
742  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
743  entry->changing_xact_state = true;
744  do_sql_command(entry->conn, sql);
745  entry->xact_depth = 1;
746  entry->changing_xact_state = false;
747  }
748 
749  /*
750  * If we're in a subtransaction, stack up savepoints to match our level.
751  * This ensures we can rollback just the desired effects when a
752  * subtransaction aborts.
753  */
754  while (entry->xact_depth < curlevel)
755  {
756  char sql[64];
757 
758  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
759  entry->changing_xact_state = true;
760  do_sql_command(entry->conn, sql);
761  entry->xact_depth++;
762  entry->changing_xact_state = false;
763  }
764 }
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:684
#define DEBUG3
Definition: elog.h:28
PGconn * conn
Definition: connection.c:56
bool changing_xact_state
Definition: connection.c:62
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:914
#define IsolationIsSerializable()
Definition: xact.h:52

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog(), GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ check_conn_params()

static void check_conn_params ( const char **  keywords,
const char **  values,
UserMapping user 
)
static

Definition at line 605 of file connection.c.

606 {
607  int i;
608 
609  /* no check required if superuser */
610  if (superuser_arg(user->userid))
611  return;
612 
613 #ifdef ENABLE_GSS
614  /* ok if the user provided their own delegated credentials */
616  return;
617 #endif
618 
619  /* ok if params contain a non-empty password */
620  for (i = 0; keywords[i] != NULL; i++)
621  {
622  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
623  return;
624  }
625 
626  /* ok if the superuser explicitly said so at user mapping creation time */
628  return;
629 
630  ereport(ERROR,
631  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
632  errmsg("password or GSSAPI delegated credentials required"),
633  errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
634 }
bool be_gssapi_get_delegation(Port *port)
static Datum values[MAXATTR]
Definition: bootstrap.c:156
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:581
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
struct Port * MyProcPort
Definition: globals.c:47
int i
Definition: isn.c:73
static char * user
Definition: pg_regress.c:112
bool superuser_arg(Oid roleid)
Definition: superuser.c:56

References be_gssapi_get_delegation(), ereport, errcode(), errdetail(), errmsg(), ERROR, i, MyProcPort, superuser_arg(), user, UserMappingPasswordRequired(), and values.

Referenced by connect_pg_server().

◆ configure_remote_session()

static void configure_remote_session ( PGconn conn)
static

Definition at line 648 of file connection.c.

649 {
650  int remoteversion = PQserverVersion(conn);
651 
652  /* Force the search path to contain only pg_catalog (see deparse.c) */
653  do_sql_command(conn, "SET search_path = pg_catalog");
654 
655  /*
656  * Set remote timezone; this is basically just cosmetic, since all
657  * transmitted and returned timestamptzs should specify a zone explicitly
658  * anyway. However it makes the regression test outputs more predictable.
659  *
660  * We don't risk setting remote zone equal to ours, since the remote
661  * server might use a different timezone database. Instead, use UTC
662  * (quoted, because very old servers are picky about case).
663  */
664  do_sql_command(conn, "SET timezone = 'UTC'");
665 
666  /*
667  * Set values needed to ensure unambiguous data output from remote. (This
668  * logic should match what pg_dump does. See also set_transmission_modes
669  * in postgres_fdw.c.)
670  */
671  do_sql_command(conn, "SET datestyle = ISO");
672  if (remoteversion >= 80400)
673  do_sql_command(conn, "SET intervalstyle = postgres");
674  if (remoteversion >= 90000)
675  do_sql_command(conn, "SET extra_float_digits = 3");
676  else
677  do_sql_command(conn, "SET extra_float_digits = 2");
678 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7238
PGconn * conn
Definition: streamutil.c:54

References conn, do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

◆ connect_pg_server()

static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
)
static

Definition at line 435 of file connection.c.

436 {
437  PGconn *volatile conn = NULL;
438 
439  /*
440  * Use PG_TRY block to ensure closing connection on error.
441  */
442  PG_TRY();
443  {
444  const char **keywords;
445  const char **values;
446  char *appname = NULL;
447  int n;
448 
449  /*
450  * Construct connection params from generic options of ForeignServer
451  * and UserMapping. (Some of them might not be libpq options, in
452  * which case we'll just waste a few array slots.) Add 4 extra slots
453  * for application_name, fallback_application_name, client_encoding,
454  * end marker.
455  */
456  n = list_length(server->options) + list_length(user->options) + 4;
457  keywords = (const char **) palloc(n * sizeof(char *));
458  values = (const char **) palloc(n * sizeof(char *));
459 
460  n = 0;
461  n += ExtractConnectionOptions(server->options,
462  keywords + n, values + n);
463  n += ExtractConnectionOptions(user->options,
464  keywords + n, values + n);
465 
466  /*
467  * Use pgfdw_application_name as application_name if set.
468  *
469  * PQconnectdbParams() processes the parameter arrays from start to
470  * end. If any key word is repeated, the last value is used. Therefore
471  * note that pgfdw_application_name must be added to the arrays after
472  * options of ForeignServer are, so that it can override
473  * application_name set in ForeignServer.
474  */
476  {
477  keywords[n] = "application_name";
479  n++;
480  }
481 
482  /*
483  * Search the parameter arrays to find application_name setting, and
484  * replace escape sequences in it with status information if found.
485  * The arrays are searched backwards because the last value is used if
486  * application_name is repeatedly set.
487  */
488  for (int i = n - 1; i >= 0; i--)
489  {
490  if (strcmp(keywords[i], "application_name") == 0 &&
491  *(values[i]) != '\0')
492  {
493  /*
494  * Use this application_name setting if it's not empty string
495  * even after any escape sequences in it are replaced.
496  */
497  appname = process_pgfdw_appname(values[i]);
498  if (appname[0] != '\0')
499  {
500  values[i] = appname;
501  break;
502  }
503 
504  /*
505  * This empty application_name is not used, so we set
506  * values[i] to NULL and keep searching the array to find the
507  * next one.
508  */
509  values[i] = NULL;
510  pfree(appname);
511  appname = NULL;
512  }
513  }
514 
515  /* Use "postgres_fdw" as fallback_application_name */
516  keywords[n] = "fallback_application_name";
517  values[n] = "postgres_fdw";
518  n++;
519 
520  /* Set client_encoding so that libpq can convert encoding properly. */
521  keywords[n] = "client_encoding";
523  n++;
524 
525  keywords[n] = values[n] = NULL;
526 
527  /* verify the set of connection parameters */
528  check_conn_params(keywords, values, user);
529 
530  /* OK to make connection */
531  conn = libpqsrv_connect_params(keywords, values,
532  false, /* expand_dbname */
534 
535  if (!conn || PQstatus(conn) != CONNECTION_OK)
536  ereport(ERROR,
537  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
538  errmsg("could not connect to server \"%s\"",
539  server->servername),
541 
542  /* Perform post-connection security checks */
543  pgfdw_security_check(keywords, values, user, conn);
544 
545  /* Prepare new session for use */
547 
548  if (appname != NULL)
549  pfree(appname);
550  pfree(keywords);
551  pfree(values);
552  }
553  PG_CATCH();
554  {
556  PG_RE_THROW();
557  }
558  PG_END_TRY();
559 
560  return conn;
561 }
static void configure_remote_session(PGconn *conn)
Definition: connection.c:648
static void pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
Definition: connection.c:397
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:605
char * process_pgfdw_appname(const char *appname)
Definition: option.c:491
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:414
char * pgfdw_application_name
Definition: option.c:52
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1229
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7248
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7195
static void libpqsrv_disconnect(PGconn *conn)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
@ CONNECTION_OK
Definition: libpq-fe.h:60
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1274
char * pchomp(const char *in)
Definition: mcxt.c:1672
void pfree(void *pointer)
Definition: mcxt.c:1456
void * palloc(Size size)
Definition: mcxt.c:1226
static int list_length(const List *l)
Definition: pg_list.h:152
List * options
Definition: foreign.h:42
char * servername
Definition: foreign.h:39
@ WAIT_EVENT_EXTENSION
Definition: wait_event.h:58

References check_conn_params(), configure_remote_session(), conn, CONNECTION_OK, ereport, errcode(), errdetail_internal(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), i, libpqsrv_connect_params(), libpqsrv_disconnect(), list_length(), ForeignServer::options, palloc(), pchomp(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_application_name, pgfdw_security_check(), PQerrorMessage(), PQstatus(), process_pgfdw_appname(), ForeignServer::servername, user, values, and WAIT_EVENT_EXTENSION.

Referenced by make_new_connection().

◆ disconnect_cached_connections()

static bool disconnect_cached_connections ( Oid  serverid)
static

Definition at line 2164 of file connection.c.

2165 {
2166  HASH_SEQ_STATUS scan;
2167  ConnCacheEntry *entry;
2168  bool all = !OidIsValid(serverid);
2169  bool result = false;
2170 
2171  /*
2172  * Connection cache hashtable has not been initialized yet in this
2173  * session, so return false.
2174  */
2175  if (!ConnectionHash)
2176  return false;
2177 
2178  hash_seq_init(&scan, ConnectionHash);
2179  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2180  {
2181  /* Ignore cache entry if no open connection right now. */
2182  if (!entry->conn)
2183  continue;
2184 
2185  if (all || entry->serverid == serverid)
2186  {
2187  /*
2188  * Emit a warning because the connection to close is used in the
2189  * current transaction and cannot be disconnected right now.
2190  */
2191  if (entry->xact_depth > 0)
2192  {
2193  ForeignServer *server;
2194 
2195  server = GetForeignServerExtended(entry->serverid,
2196  FSV_MISSING_OK);
2197 
2198  if (!server)
2199  {
2200  /*
2201  * If the foreign server was dropped while its connection
2202  * was used in the current transaction, the connection
2203  * must have been marked as invalid by
2204  * pgfdw_inval_callback at the end of DROP SERVER command.
2205  */
2206  Assert(entry->invalidated);
2207 
2208  ereport(WARNING,
2209  (errmsg("cannot close dropped server connection because it is still in use")));
2210  }
2211  else
2212  ereport(WARNING,
2213  (errmsg("cannot close connection for server \"%s\" because it is still in use",
2214  server->servername)));
2215  }
2216  else
2217  {
2218  elog(DEBUG3, "discarding connection %p", entry->conn);
2219  disconnect_pg_server(entry);
2220  result = true;
2221  }
2222  }
2223  }
2224 
2225  return result;
2226 }
#define OidIsValid(objectId)
Definition: c.h:764
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:567
static HTAB * ConnectionHash
Definition: connection.c:77
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1431
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1421
#define WARNING
Definition: elog.h:36
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:123
#define FSV_MISSING_OK
Definition: foreign.h:61
Assert(fmt[strlen(fmt) - 1] !='\n')
bool invalidated
Definition: connection.c:65

References Assert(), ConnCacheEntry::conn, ConnectionHash, DEBUG3, disconnect_pg_server(), elog(), ereport, errmsg(), FSV_MISSING_OK, GetForeignServerExtended(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, OidIsValid, ConnCacheEntry::serverid, ForeignServer::servername, WARNING, and ConnCacheEntry::xact_depth.

Referenced by postgres_fdw_disconnect(), and postgres_fdw_disconnect_all().

◆ disconnect_pg_server()

static void disconnect_pg_server ( ConnCacheEntry entry)
static

Definition at line 567 of file connection.c.

568 {
569  if (entry->conn != NULL)
570  {
571  libpqsrv_disconnect(entry->conn);
572  entry->conn = NULL;
573  }
574 }

References ConnCacheEntry::conn, and libpqsrv_disconnect().

Referenced by disconnect_cached_connections(), GetConnection(), pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), and pgfdw_reset_xact_state().

◆ do_sql_command()

void do_sql_command ( PGconn conn,
const char *  sql 
)

Definition at line 684 of file connection.c.

685 {
687  do_sql_command_end(conn, sql, false);
688 }
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
Definition: connection.c:698
static void do_sql_command_begin(PGconn *conn, const char *sql)
Definition: connection.c:691

References conn, do_sql_command_begin(), and do_sql_command_end().

Referenced by begin_remote_xact(), configure_remote_session(), pgfdw_subxact_callback(), pgfdw_xact_callback(), and postgresExecForeignTruncate().

◆ do_sql_command_begin()

static void do_sql_command_begin ( PGconn conn,
const char *  sql 
)
static

Definition at line 691 of file connection.c.

692 {
693  if (!PQsendQuery(conn, sql))
694  pgfdw_report_error(ERROR, NULL, conn, false, sql);
695 }
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:911
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1422

References conn, ERROR, pgfdw_report_error(), and PQsendQuery().

Referenced by do_sql_command(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ do_sql_command_end()

static void do_sql_command_end ( PGconn conn,
const char *  sql,
bool  consume_input 
)
static

Definition at line 698 of file connection.c.

699 {
700  PGresult *res;
701 
702  /*
703  * If requested, consume whatever data is available from the socket. (Note
704  * that if all data is available, this allows pgfdw_get_result to call
705  * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
706  * would be large compared to the overhead of PQconsumeInput.)
707  */
708  if (consume_input && !PQconsumeInput(conn))
709  pgfdw_report_error(ERROR, NULL, conn, false, sql);
710  res = pgfdw_get_result(conn, sql);
712  pgfdw_report_error(ERROR, res, conn, true, sql);
713  PQclear(res);
714 }
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:846
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3325
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1957
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97

References conn, ERROR, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQconsumeInput(), PQresultStatus(), and res.

Referenced by do_sql_command(), pgfdw_finish_pre_commit_cleanup(), and pgfdw_finish_pre_subcommit_cleanup().

◆ GetConnection()

PGconn* GetConnection ( UserMapping user,
bool  will_prep_stmt,
PgFdwConnState **  state 
)

Definition at line 172 of file connection.c.

173 {
174  bool found;
175  bool retry = false;
176  ConnCacheEntry *entry;
179 
180  /* First time through, initialize connection cache hashtable */
181  if (ConnectionHash == NULL)
182  {
183  HASHCTL ctl;
184 
185  ctl.keysize = sizeof(ConnCacheKey);
186  ctl.entrysize = sizeof(ConnCacheEntry);
187  ConnectionHash = hash_create("postgres_fdw connections", 8,
188  &ctl,
190 
191  /*
192  * Register some callback functions that manage connection cleanup.
193  * This should be done just once in each backend.
194  */
201  }
202 
203  /* Set flag that we did GetConnection during the current transaction */
204  xact_got_connection = true;
205 
206  /* Create hash key for the entry. Assume no pad bytes in key struct */
207  key = user->umid;
208 
209  /*
210  * Find or create cached entry for requested connection.
211  */
212  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
213  if (!found)
214  {
215  /*
216  * We need only clear "conn" here; remaining fields will be filled
217  * later when "conn" is set.
218  */
219  entry->conn = NULL;
220  }
221 
222  /* Reject further use of connections which failed abort cleanup. */
224 
225  /*
226  * If the connection needs to be remade due to invalidation, disconnect as
227  * soon as we're out of all transactions.
228  */
229  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
230  {
231  elog(DEBUG3, "closing connection %p for option changes to take effect",
232  entry->conn);
233  disconnect_pg_server(entry);
234  }
235 
236  /*
237  * If cache entry doesn't have a connection, we have to establish a new
238  * connection. (If connect_pg_server throws an error, the cache entry
239  * will remain in a valid empty state, ie conn == NULL.)
240  */
241  if (entry->conn == NULL)
242  make_new_connection(entry, user);
243 
244  /*
245  * We check the health of the cached connection here when using it. In
246  * cases where we're out of all transactions, if a broken connection is
247  * detected, we try to reestablish a new connection later.
248  */
249  PG_TRY();
250  {
251  /* Process a pending asynchronous request if any. */
252  if (entry->state.pendingAreq)
254  /* Start a new transaction or subtransaction if needed. */
255  begin_remote_xact(entry);
256  }
257  PG_CATCH();
258  {
260  ErrorData *errdata = CopyErrorData();
261 
262  /*
263  * Determine whether to try to reestablish the connection.
264  *
265  * After a broken connection is detected in libpq, any error other
266  * than connection failure (e.g., out-of-memory) can be thrown
267  * somewhere between return from libpq and the expected ereport() call
268  * in pgfdw_report_error(). In this case, since PQstatus() indicates
269  * CONNECTION_BAD, checking only PQstatus() causes the false detection
270  * of connection failure. To avoid this, we also verify that the
271  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
272  * checking only the sqlstate can cause another false detection
273  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
274  * for any libpq-originated error condition.
275  */
276  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
277  PQstatus(entry->conn) != CONNECTION_BAD ||
278  entry->xact_depth > 0)
279  {
280  MemoryContextSwitchTo(ecxt);
281  PG_RE_THROW();
282  }
283 
284  /* Clean up the error state */
285  FlushErrorState();
286  FreeErrorData(errdata);
287  errdata = NULL;
288 
289  retry = true;
290  }
291  PG_END_TRY();
292 
293  /*
294  * If a broken connection is detected, disconnect it, reestablish a new
295  * connection and retry a new remote transaction. If connection failure is
296  * reported again, we give up getting a connection.
297  */
298  if (retry)
299  {
300  Assert(entry->xact_depth == 0);
301 
302  ereport(DEBUG3,
303  (errmsg_internal("could not start remote transaction on connection %p",
304  entry->conn)),
305  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
306 
307  elog(DEBUG3, "closing connection %p to reestablish a new one",
308  entry->conn);
309  disconnect_pg_server(entry);
310 
311  make_new_connection(entry, user);
312 
313  begin_remote_xact(entry);
314  }
315 
316  /* Remember if caller will prepare statements */
317  entry->have_prep_stmt |= will_prep_stmt;
318 
319  /* If caller needs access to the per-connection state, return it. */
320  if (state)
321  *state = &entry->state;
322 
323  return entry->conn;
324 }
Oid ConnCacheKey
Definition: connection.c:51
static bool xact_got_connection
Definition: connection.c:84
struct ConnCacheEntry ConnCacheEntry
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:331
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:1116
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1279
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1229
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:967
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:727
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1776
void FlushErrorState(void)
Definition: elog.c:1825
ErrorData * CopyErrorData(void)
Definition: elog.c:1720
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
@ CONNECTION_BAD
Definition: libpq-fe.h:61
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
uintptr_t Datum
Definition: postgres.h:64
void process_pending_request(AsyncRequest *areq)
bool have_prep_stmt
Definition: connection.c:60
PgFdwConnState state
Definition: connection.c:71
int sqlerrcode
Definition: elog.h:438
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:134
Definition: regguts.h:323
@ FOREIGNSERVEROID
Definition: syscache.h:64
@ USERMAPPINGOID
Definition: syscache.h:115
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3650
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3710

References Assert(), begin_remote_xact(), CacheRegisterSyscacheCallback(), ConnCacheEntry::conn, CONNECTION_BAD, ConnectionHash, CopyErrorData(), CurrentMemoryContext, DEBUG3, disconnect_pg_server(), elog(), HASHCTL::entrysize, ereport, errdetail_internal(), errmsg_internal(), FlushErrorState(), FOREIGNSERVEROID, FreeErrorData(), HASH_BLOBS, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, sort-test::key, HASHCTL::keysize, make_new_connection(), MemoryContextSwitchTo(), pchomp(), PgFdwConnState::pendingAreq, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_subxact_callback(), pgfdw_xact_callback(), PQerrorMessage(), PQstatus(), process_pending_request(), RegisterSubXactCallback(), RegisterXactCallback(), ErrorData::sqlerrcode, ConnCacheEntry::state, user, USERMAPPINGOID, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by create_foreign_modify(), dumpDatabase(), dumpDatabaseConfig(), dumpLOs(), dumpTableData_copy(), estimate_path_cost_size(), expand_extension_name_patterns(), expand_foreign_server_name_patterns(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignScan(), postgresExecForeignTruncate(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

◆ GetCursorNumber()

unsigned int GetCursorNumber ( PGconn conn)

Definition at line 791 of file connection.c.

792 {
793  return ++cursor_number;
794 }
static unsigned int cursor_number
Definition: connection.c:80

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

◆ GetPrepStmtNumber()

unsigned int GetPrepStmtNumber ( PGconn conn)

Definition at line 805 of file connection.c.

806 {
807  return ++prep_stmt_number;
808 }
static unsigned int prep_stmt_number
Definition: connection.c:81

References prep_stmt_number.

Referenced by prepare_foreign_modify().

◆ make_new_connection()

static void make_new_connection ( ConnCacheEntry entry,
UserMapping user 
)
static

Definition at line 331 of file connection.c.

332 {
333  ForeignServer *server = GetForeignServer(user->serverid);
334  ListCell *lc;
335 
336  Assert(entry->conn == NULL);
337 
338  /* Reset all transient state fields, to be sure all are clean */
339  entry->xact_depth = 0;
340  entry->have_prep_stmt = false;
341  entry->have_error = false;
342  entry->changing_xact_state = false;
343  entry->invalidated = false;
344  entry->serverid = server->serverid;
345  entry->server_hashvalue =
347  ObjectIdGetDatum(server->serverid));
348  entry->mapping_hashvalue =
350  ObjectIdGetDatum(user->umid));
351  memset(&entry->state, 0, sizeof(entry->state));
352 
353  /*
354  * Determine whether to keep the connection that we're about to make here
355  * open even after the transaction using it ends, so that the subsequent
356  * transactions can re-use it.
357  *
358  * By default, all the connections to any foreign servers are kept open.
359  *
360  * Also determine whether to commit/abort (sub)transactions opened on the
361  * remote server in parallel at (sub)transaction end, which is disabled by
362  * default.
363  *
364  * Note: it's enough to determine these only when making a new connection
365  * because if these settings for it are changed, it will be closed and
366  * re-made later.
367  */
368  entry->keep_connections = true;
369  entry->parallel_commit = false;
370  entry->parallel_abort = false;
371  foreach(lc, server->options)
372  {
373  DefElem *def = (DefElem *) lfirst(lc);
374 
375  if (strcmp(def->defname, "keep_connections") == 0)
376  entry->keep_connections = defGetBoolean(def);
377  else if (strcmp(def->defname, "parallel_commit") == 0)
378  entry->parallel_commit = defGetBoolean(def);
379  else if (strcmp(def->defname, "parallel_abort") == 0)
380  entry->parallel_abort = defGetBoolean(def);
381  }
382 
383  /* Now try to make the connection */
384  entry->conn = connect_pg_server(server, user);
385 
386  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
387  entry->conn, server->servername, user->umid, user->userid);
388 }
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:435
bool defGetBoolean(DefElem *def)
Definition: define.c:108
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:111
#define lfirst(lc)
Definition: pg_list.h:172
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
bool parallel_commit
Definition: connection.c:63
uint32 server_hashvalue
Definition: connection.c:69
uint32 mapping_hashvalue
Definition: connection.c:70
bool keep_connections
Definition: connection.c:66
bool parallel_abort
Definition: connection.c:64
char * defname
Definition: parsenodes.h:809
Oid serverid
Definition: foreign.h:36
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:209

References Assert(), ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, defGetBoolean(), DefElem::defname, elog(), FOREIGNSERVEROID, GetForeignServer(), GetSysCacheHashValue1, ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, ConnCacheEntry::keep_connections, lfirst, ConnCacheEntry::mapping_hashvalue, ObjectIdGetDatum(), ForeignServer::options, ConnCacheEntry::parallel_abort, ConnCacheEntry::parallel_commit, ConnCacheEntry::server_hashvalue, ConnCacheEntry::serverid, ForeignServer::serverid, ForeignServer::servername, ConnCacheEntry::state, user, USERMAPPINGOID, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ PG_FUNCTION_INFO_V1() [1/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_disconnect  )

◆ PG_FUNCTION_INFO_V1() [2/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_disconnect_all  )

◆ PG_FUNCTION_INFO_V1() [3/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_get_connections  )

◆ pgfdw_abort_cleanup()

static void pgfdw_abort_cleanup ( ConnCacheEntry entry,
bool  toplevel 
)
static

Definition at line 1619 of file connection.c.

1620 {
1621  char sql[100];
1622 
1623  /*
1624  * Don't try to clean up the connection if we're already in error
1625  * recursion trouble.
1626  */
1628  entry->changing_xact_state = true;
1629 
1630  /*
1631  * If connection is already unsalvageable, don't touch it further.
1632  */
1633  if (entry->changing_xact_state)
1634  return;
1635 
1636  /*
1637  * Mark this connection as in the process of changing transaction state.
1638  */
1639  entry->changing_xact_state = true;
1640 
1641  /* Assume we might have lost track of prepared statements */
1642  entry->have_error = true;
1643 
1644  /*
1645  * If a command has been submitted to the remote server by using an
1646  * asynchronous execution function, the command might not have yet
1647  * completed. Check to see if a command is still being processed by the
1648  * remote server, and if so, request cancellation of the command.
1649  */
1650  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1651  !pgfdw_cancel_query(entry->conn))
1652  return; /* Unable to cancel running query */
1653 
1654  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1655  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1656  return; /* Unable to abort remote (sub)transaction */
1657 
1658  if (toplevel)
1659  {
1660  if (entry->have_prep_stmt && entry->have_error &&
1662  "DEALLOCATE ALL",
1663  true))
1664  return; /* Trouble clearing prepared statements */
1665 
1666  entry->have_prep_stmt = false;
1667  entry->have_error = false;
1668  }
1669 
1670  /*
1671  * If pendingAreq of the per-connection state is not NULL, it means that
1672  * an asynchronous fetch begun by fetch_more_data_begin() was not done
1673  * successfully and thus the per-connection state was not reset in
1674  * fetch_more_data(); in that case reset the per-connection state here.
1675  */
1676  if (entry->state.pendingAreq)
1677  memset(&entry->state, 0, sizeof(entry->state));
1678 
1679  /* Disarm changing_xact_state if it all worked */
1680  entry->changing_xact_state = false;
1681 }
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
Definition: connection.c:94
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1444
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1348
bool in_error_recursion_trouble(void)
Definition: elog.c:298
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:7203
@ PQTRANS_ACTIVE
Definition: libpq-fe.h:119

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONSTRUCT_ABORT_COMMAND, ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, in_error_recursion_trouble(), PgFdwConnState::pendingAreq, pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), PQTRANS_ACTIVE, PQtransactionStatus(), and ConnCacheEntry::state.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_abort_cleanup_begin()

static bool pgfdw_abort_cleanup_begin ( ConnCacheEntry entry,
bool  toplevel,
List **  pending_entries,
List **  cancel_requested 
)
static

Definition at line 1693 of file connection.c.

1695 {
1696  /*
1697  * Don't try to clean up the connection if we're already in error
1698  * recursion trouble.
1699  */
1701  entry->changing_xact_state = true;
1702 
1703  /*
1704  * If connection is already unsalvageable, don't touch it further.
1705  */
1706  if (entry->changing_xact_state)
1707  return false;
1708 
1709  /*
1710  * Mark this connection as in the process of changing transaction state.
1711  */
1712  entry->changing_xact_state = true;
1713 
1714  /* Assume we might have lost track of prepared statements */
1715  entry->have_error = true;
1716 
1717  /*
1718  * If a command has been submitted to the remote server by using an
1719  * asynchronous execution function, the command might not have yet
1720  * completed. Check to see if a command is still being processed by the
1721  * remote server, and if so, request cancellation of the command.
1722  */
1723  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1724  {
1725  if (!pgfdw_cancel_query_begin(entry->conn))
1726  return false; /* Unable to cancel running query */
1727  *cancel_requested = lappend(*cancel_requested, entry);
1728  }
1729  else
1730  {
1731  char sql[100];
1732 
1733  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1734  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1735  return false; /* Unable to abort remote transaction */
1736  *pending_entries = lappend(*pending_entries, entry);
1737  }
1738 
1739  return true;
1740 }
static bool pgfdw_cancel_query_begin(PGconn *conn)
Definition: connection.c:1365
static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
Definition: connection.c:1464
List * lappend(List *list, void *datum)
Definition: list.c:338

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONSTRUCT_ABORT_COMMAND, ConnCacheEntry::have_error, in_error_recursion_trouble(), lappend(), pgfdw_cancel_query_begin(), pgfdw_exec_cleanup_query_begin(), PQTRANS_ACTIVE, and PQtransactionStatus().

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_cancel_query()

static bool pgfdw_cancel_query ( PGconn conn)
static

Definition at line 1348 of file connection.c.

1349 {
1350  TimestampTz endtime;
1351 
1352  /*
1353  * If it takes too long to cancel the query and discard the result, assume
1354  * the connection is dead.
1355  */
1358 
1360  return false;
1361  return pgfdw_cancel_query_end(conn, endtime, false);
1362 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
Definition: connection.c:1392
#define CONNECTION_CLEANUP_TIMEOUT
Definition: connection.c:91
int64 TimestampTz
Definition: timestamp.h:39
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85

References conn, CONNECTION_CLEANUP_TIMEOUT, GetCurrentTimestamp(), pgfdw_cancel_query_begin(), pgfdw_cancel_query_end(), and TimestampTzPlusMilliseconds.

Referenced by pgfdw_abort_cleanup().

◆ pgfdw_cancel_query_begin()

static bool pgfdw_cancel_query_begin ( PGconn conn)
static

Definition at line 1365 of file connection.c.

1366 {
1367  PGcancel *cancel;
1368  char errbuf[256];
1369 
1370  /*
1371  * Issue cancel request. Unfortunately, there's no good way to limit the
1372  * amount of time that we might block inside PQgetCancel().
1373  */
1374  if ((cancel = PQgetCancel(conn)))
1375  {
1376  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1377  {
1378  ereport(WARNING,
1379  (errcode(ERRCODE_CONNECTION_FAILURE),
1380  errmsg("could not send cancel request: %s",
1381  errbuf)));
1382  PQfreeCancel(cancel);
1383  return false;
1384  }
1385  PQfreeCancel(cancel);
1386  }
1387 
1388  return true;
1389 }
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4707
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4821
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4775

References conn, ereport, errcode(), errmsg(), PQcancel(), PQfreeCancel(), PQgetCancel(), and WARNING.

Referenced by pgfdw_abort_cleanup_begin(), and pgfdw_cancel_query().

◆ pgfdw_cancel_query_end()

static bool pgfdw_cancel_query_end ( PGconn conn,
TimestampTz  endtime,
bool  consume_input 
)
static

Definition at line 1392 of file connection.c.

1393 {
1394  PGresult *result = NULL;
1395  bool timed_out;
1396 
1397  /*
1398  * If requested, consume whatever data is available from the socket. (Note
1399  * that if all data is available, this allows pgfdw_get_cleanup_result to
1400  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1401  * which would be large compared to the overhead of PQconsumeInput.)
1402  */
1403  if (consume_input && !PQconsumeInput(conn))
1404  {
1405  ereport(WARNING,
1406  (errcode(ERRCODE_CONNECTION_FAILURE),
1407  errmsg("could not get result of cancel request: %s",
1408  pchomp(PQerrorMessage(conn)))));
1409  return false;
1410  }
1411 
1412  /* Get and discard the result of the query. */
1413  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1414  {
1415  if (timed_out)
1416  ereport(WARNING,
1417  (errmsg("could not get result of cancel request due to timeout")));
1418  else
1419  ereport(WARNING,
1420  (errcode(ERRCODE_CONNECTION_FAILURE),
1421  errmsg("could not get result of cancel request: %s",
1422  pchomp(PQerrorMessage(conn)))));
1423 
1424  return false;
1425  }
1426  PQclear(result);
1427 
1428  return true;
1429 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
Definition: connection.c:1535

References conn, ereport, errcode(), errmsg(), pchomp(), pgfdw_get_cleanup_result(), PQclear(), PQconsumeInput(), PQerrorMessage(), and WARNING.

Referenced by pgfdw_cancel_query(), and pgfdw_finish_abort_cleanup().

◆ pgfdw_exec_cleanup_query()

static bool pgfdw_exec_cleanup_query ( PGconn conn,
const char *  query,
bool  ignore_errors 
)
static

Definition at line 1444 of file connection.c.

1445 {
1446  TimestampTz endtime;
1447 
1448  /*
1449  * If it takes too long to execute a cleanup query, assume the connection
1450  * is dead. It's fairly likely that this is why we aborted in the first
1451  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1452  * be too long.
1453  */
1456 
1457  if (!pgfdw_exec_cleanup_query_begin(conn, query))
1458  return false;
1459  return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1460  false, ignore_errors);
1461 }
static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
Definition: connection.c:1480

References conn, CONNECTION_CLEANUP_TIMEOUT, GetCurrentTimestamp(), pgfdw_exec_cleanup_query_begin(), pgfdw_exec_cleanup_query_end(), and TimestampTzPlusMilliseconds.

Referenced by pgfdw_abort_cleanup().

◆ pgfdw_exec_cleanup_query_begin()

static bool pgfdw_exec_cleanup_query_begin ( PGconn conn,
const char *  query 
)
static

Definition at line 1464 of file connection.c.

1465 {
1466  /*
1467  * Submit a query. Since we don't use non-blocking mode, this also can
1468  * block. But its risk is relatively small, so we ignore that for now.
1469  */
1470  if (!PQsendQuery(conn, query))
1471  {
1472  pgfdw_report_error(WARNING, NULL, conn, false, query);
1473  return false;
1474  }
1475 
1476  return true;
1477 }

References conn, pgfdw_report_error(), PQsendQuery(), and WARNING.

Referenced by pgfdw_abort_cleanup_begin(), pgfdw_exec_cleanup_query(), and pgfdw_finish_abort_cleanup().

◆ pgfdw_exec_cleanup_query_end()

static bool pgfdw_exec_cleanup_query_end ( PGconn conn,
const char *  query,
TimestampTz  endtime,
bool  consume_input,
bool  ignore_errors 
)
static

Definition at line 1480 of file connection.c.

1483 {
1484  PGresult *result = NULL;
1485  bool timed_out;
1486 
1487  /*
1488  * If requested, consume whatever data is available from the socket. (Note
1489  * that if all data is available, this allows pgfdw_get_cleanup_result to
1490  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1491  * which would be large compared to the overhead of PQconsumeInput.)
1492  */
1493  if (consume_input && !PQconsumeInput(conn))
1494  {
1495  pgfdw_report_error(WARNING, NULL, conn, false, query);
1496  return false;
1497  }
1498 
1499  /* Get the result of the query. */
1500  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1501  {
1502  if (timed_out)
1503  ereport(WARNING,
1504  (errmsg("could not get query result due to timeout"),
1505  query ? errcontext("remote SQL command: %s", query) : 0));
1506  else
1507  pgfdw_report_error(WARNING, NULL, conn, false, query);
1508 
1509  return false;
1510  }
1511 
1512  /* Issue a warning if not successful. */
1513  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1514  {
1515  pgfdw_report_error(WARNING, result, conn, true, query);
1516  return ignore_errors;
1517  }
1518  PQclear(result);
1519 
1520  return true;
1521 }
#define errcontext
Definition: elog.h:196

References conn, ereport, errcontext, errmsg(), pgfdw_get_cleanup_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQconsumeInput(), PQresultStatus(), and WARNING.

Referenced by pgfdw_exec_cleanup_query(), and pgfdw_finish_abort_cleanup().

◆ pgfdw_exec_query()

PGresult* pgfdw_exec_query ( PGconn conn,
const char *  query,
PgFdwConnState state 
)

Definition at line 818 of file connection.c.

819 {
820  /* First, process a pending asynchronous request, if any. */
821  if (state && state->pendingAreq)
822  process_pending_request(state->pendingAreq);
823 
824  /*
825  * Submit a query. Since we don't use non-blocking mode, this also can
826  * block. But its risk is relatively small, so we ignore that for now.
827  */
828  if (!PQsendQuery(conn, query))
829  pgfdw_report_error(ERROR, NULL, conn, false, query);
830 
831  /* Wait for the result. */
832  return pgfdw_get_result(conn, query);
833 }

References conn, ERROR, pgfdw_get_result(), pgfdw_report_error(), PQsendQuery(), and process_pending_request().

Referenced by close_cursor(), deallocate_query(), fetch_more_data(), get_remote_estimate(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), and postgresReScanForeignScan().

◆ pgfdw_finish_abort_cleanup()

static void pgfdw_finish_abort_cleanup ( List pending_entries,
List cancel_requested,
bool  toplevel 
)
static

Definition at line 1855 of file connection.c.

1857 {
1858  List *pending_deallocs = NIL;
1859  ListCell *lc;
1860 
1861  /*
1862  * For each of the pending cancel requests (if any), get and discard the
1863  * result of the query, and submit an abort command to the remote server.
1864  */
1865  if (cancel_requested)
1866  {
1867  foreach(lc, cancel_requested)
1868  {
1869  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1870  TimestampTz endtime;
1871  char sql[100];
1872 
1873  Assert(entry->changing_xact_state);
1874 
1875  /*
1876  * Set end time. You might think we should do this before issuing
1877  * cancel request like in normal mode, but that is problematic,
1878  * because if, for example, it took longer than 30 seconds to
1879  * process the first few entries in the cancel_requested list, it
1880  * would cause a timeout error when processing each of the
1881  * remaining entries in the list, leading to slamming that entry's
1882  * connection shut.
1883  */
1886 
1887  if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
1888  {
1889  /* Unable to cancel running query */
1890  pgfdw_reset_xact_state(entry, toplevel);
1891  continue;
1892  }
1893 
1894  /* Send an abort command in parallel if needed */
1895  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1896  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1897  {
1898  /* Unable to abort remote (sub)transaction */
1899  pgfdw_reset_xact_state(entry, toplevel);
1900  }
1901  else
1902  pending_entries = lappend(pending_entries, entry);
1903  }
1904  }
1905 
1906  /* No further work if no pending entries */
1907  if (!pending_entries)
1908  return;
1909 
1910  /*
1911  * Get the result of the abort command for each of the pending entries
1912  */
1913  foreach(lc, pending_entries)
1914  {
1915  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1916  TimestampTz endtime;
1917  char sql[100];
1918 
1919  Assert(entry->changing_xact_state);
1920 
1921  /*
1922  * Set end time. We do this now, not before issuing the command like
1923  * in normal mode, for the same reason as for the cancel_requested
1924  * entries.
1925  */
1928 
1929  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1930  if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
1931  true, false))
1932  {
1933  /* Unable to abort remote (sub)transaction */
1934  pgfdw_reset_xact_state(entry, toplevel);
1935  continue;
1936  }
1937 
1938  if (toplevel)
1939  {
1940  /* Do a DEALLOCATE ALL in parallel if needed */
1941  if (entry->have_prep_stmt && entry->have_error)
1942  {
1944  "DEALLOCATE ALL"))
1945  {
1946  /* Trouble clearing prepared statements */
1947  pgfdw_reset_xact_state(entry, toplevel);
1948  }
1949  else
1950  pending_deallocs = lappend(pending_deallocs, entry);
1951  continue;
1952  }
1953  entry->have_prep_stmt = false;
1954  entry->have_error = false;
1955  }
1956 
1957  /* Reset the per-connection state if needed */
1958  if (entry->state.pendingAreq)
1959  memset(&entry->state, 0, sizeof(entry->state));
1960 
1961  /* We're done with this entry; unset the changing_xact_state flag */
1962  entry->changing_xact_state = false;
1963  pgfdw_reset_xact_state(entry, toplevel);
1964  }
1965 
1966  /* No further work if no pending entries */
1967  if (!pending_deallocs)
1968  return;
1969  Assert(toplevel);
1970 
1971  /*
1972  * Get the result of the DEALLOCATE command for each of the pending
1973  * entries
1974  */
1975  foreach(lc, pending_deallocs)
1976  {
1977  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1978  TimestampTz endtime;
1979 
1980  Assert(entry->changing_xact_state);
1981  Assert(entry->have_prep_stmt);
1982  Assert(entry->have_error);
1983 
1984  /*
1985  * Set end time. We do this now, not before issuing the command like
1986  * in normal mode, for the same reason as for the cancel_requested
1987  * entries.
1988  */
1991 
1992  if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
1993  endtime, true, true))
1994  {
1995  /* Trouble clearing prepared statements */
1996  pgfdw_reset_xact_state(entry, toplevel);
1997  continue;
1998  }
1999  entry->have_prep_stmt = false;
2000  entry->have_error = false;
2001 
2002  /* Reset the per-connection state if needed */
2003  if (entry->state.pendingAreq)
2004  memset(&entry->state, 0, sizeof(entry->state));
2005 
2006  /* We're done with this entry; unset the changing_xact_state flag */
2007  entry->changing_xact_state = false;
2008  pgfdw_reset_xact_state(entry, toplevel);
2009  }
2010 }
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1303
#define NIL
Definition: pg_list.h:68
Definition: pg_list.h:54

References Assert(), ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_CLEANUP_TIMEOUT, CONSTRUCT_ABORT_COMMAND, GetCurrentTimestamp(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, lappend(), lfirst, NIL, PgFdwConnState::pendingAreq, pgfdw_cancel_query_end(), pgfdw_exec_cleanup_query_begin(), pgfdw_exec_cleanup_query_end(), pgfdw_reset_xact_state(), ConnCacheEntry::state, and TimestampTzPlusMilliseconds.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_finish_pre_commit_cleanup()

static void pgfdw_finish_pre_commit_cleanup ( List pending_entries)
static

Definition at line 1747 of file connection.c.

1748 {
1749  ConnCacheEntry *entry;
1750  List *pending_deallocs = NIL;
1751  ListCell *lc;
1752 
1753  Assert(pending_entries);
1754 
1755  /*
1756  * Get the result of the COMMIT command for each of the pending entries
1757  */
1758  foreach(lc, pending_entries)
1759  {
1760  entry = (ConnCacheEntry *) lfirst(lc);
1761 
1762  Assert(entry->changing_xact_state);
1763 
1764  /*
1765  * We might already have received the result on the socket, so pass
1766  * consume_input=true to try to consume it first
1767  */
1768  do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1769  entry->changing_xact_state = false;
1770 
1771  /* Do a DEALLOCATE ALL in parallel if needed */
1772  if (entry->have_prep_stmt && entry->have_error)
1773  {
1774  /* Ignore errors (see notes in pgfdw_xact_callback) */
1775  if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1776  {
1777  pending_deallocs = lappend(pending_deallocs, entry);
1778  continue;
1779  }
1780  }
1781  entry->have_prep_stmt = false;
1782  entry->have_error = false;
1783 
1784  pgfdw_reset_xact_state(entry, true);
1785  }
1786 
1787  /* No further work if no pending entries */
1788  if (!pending_deallocs)
1789  return;
1790 
1791  /*
1792  * Get the result of the DEALLOCATE command for each of the pending
1793  * entries
1794  */
1795  foreach(lc, pending_deallocs)
1796  {
1797  PGresult *res;
1798 
1799  entry = (ConnCacheEntry *) lfirst(lc);
1800 
1801  /* Ignore errors (see notes in pgfdw_xact_callback) */
1802  while ((res = PQgetResult(entry->conn)) != NULL)
1803  {
1804  PQclear(res);
1805  /* Stop if the connection is lost (else we'll loop infinitely) */
1806  if (PQstatus(entry->conn) == CONNECTION_BAD)
1807  break;
1808  }
1809  entry->have_prep_stmt = false;
1810  entry->have_error = false;
1811 
1812  pgfdw_reset_xact_state(entry, true);
1813  }
1814 }
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2035

References Assert(), ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_BAD, do_sql_command_end(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, lappend(), lfirst, NIL, pgfdw_reset_xact_state(), PQclear(), PQgetResult(), PQsendQuery(), PQstatus(), and res.

Referenced by pgfdw_xact_callback().

◆ pgfdw_finish_pre_subcommit_cleanup()

static void pgfdw_finish_pre_subcommit_cleanup ( List pending_entries,
int  curlevel 
)
static

Definition at line 1821 of file connection.c.

1822 {
1823  ConnCacheEntry *entry;
1824  char sql[100];
1825  ListCell *lc;
1826 
1827  Assert(pending_entries);
1828 
1829  /*
1830  * Get the result of the RELEASE command for each of the pending entries
1831  */
1832  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1833  foreach(lc, pending_entries)
1834  {
1835  entry = (ConnCacheEntry *) lfirst(lc);
1836 
1837  Assert(entry->changing_xact_state);
1838 
1839  /*
1840  * We might already have received the result on the socket, so pass
1841  * consume_input=true to try to consume it first
1842  */
1843  do_sql_command_end(entry->conn, sql, true);
1844  entry->changing_xact_state = false;
1845 
1846  pgfdw_reset_xact_state(entry, false);
1847  }
1848 }

References Assert(), ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, do_sql_command_end(), lfirst, pgfdw_reset_xact_state(), and snprintf.

Referenced by pgfdw_subxact_callback().

◆ pgfdw_get_cleanup_result()

static bool pgfdw_get_cleanup_result ( PGconn conn,
TimestampTz  endtime,
PGresult **  result,
bool timed_out 
)
static

Definition at line 1535 of file connection.c.

1537 {
1538  volatile bool failed = false;
1539  PGresult *volatile last_res = NULL;
1540 
1541  *timed_out = false;
1542 
1543  /* In what follows, do not leak any PGresults on an error. */
1544  PG_TRY();
1545  {
1546  for (;;)
1547  {
1548  PGresult *res;
1549 
1550  while (PQisBusy(conn))
1551  {
1552  int wc;
1554  long cur_timeout;
1555 
1556  /* If timeout has expired, give up, else get sleep time. */
1557  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1558  if (cur_timeout <= 0)
1559  {
1560  *timed_out = true;
1561  failed = true;
1562  goto exit;
1563  }
1564 
1565  /* Sleep until there's something to do */
1569  PQsocket(conn),
1570  cur_timeout, WAIT_EVENT_EXTENSION);
1572 
1574 
1575  /* Data available in socket? */
1576  if (wc & WL_SOCKET_READABLE)
1577  {
1578  if (!PQconsumeInput(conn))
1579  {
1580  /* connection trouble */
1581  failed = true;
1582  goto exit;
1583  }
1584  }
1585  }
1586 
1587  res = PQgetResult(conn);
1588  if (res == NULL)
1589  break; /* query is complete */
1590 
1591  PQclear(last_res);
1592  last_res = res;
1593  }
1594 exit: ;
1595  }
1596  PG_CATCH();
1597  {
1598  PQclear(last_res);
1599  PG_RE_THROW();
1600  }
1601  PG_END_TRY();
1602 
1603  if (failed)
1604  PQclear(last_res);
1605  else
1606  *result = last_res;
1607  return failed;
1608 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1695
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7274
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2004
struct Latch * MyLatch
Definition: globals.c:58
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:538
void ResetLatch(Latch *latch)
Definition: latch.c:697
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
exit(1)
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121

References CHECK_FOR_INTERRUPTS, conn, exit(), GetCurrentTimestamp(), MyLatch, now(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), res, ResetLatch(), TimestampDifferenceMilliseconds(), WAIT_EVENT_EXTENSION, WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by pgfdw_cancel_query_end(), and pgfdw_exec_cleanup_query_end().

◆ pgfdw_get_result()

PGresult* pgfdw_get_result ( PGconn conn,
const char *  query 
)

Definition at line 846 of file connection.c.

847 {
848  PGresult *volatile last_res = NULL;
849 
850  /* In what follows, do not leak any PGresults on an error. */
851  PG_TRY();
852  {
853  for (;;)
854  {
855  PGresult *res;
856 
857  while (PQisBusy(conn))
858  {
859  int wc;
860 
861  /* Sleep until there's something to do */
865  PQsocket(conn),
866  -1L, WAIT_EVENT_EXTENSION);
868 
870 
871  /* Data available in socket? */
872  if (wc & WL_SOCKET_READABLE)
873  {
874  if (!PQconsumeInput(conn))
875  pgfdw_report_error(ERROR, NULL, conn, false, query);
876  }
877  }
878 
879  res = PQgetResult(conn);
880  if (res == NULL)
881  break; /* query is complete */
882 
883  PQclear(last_res);
884  last_res = res;
885  }
886  }
887  PG_CATCH();
888  {
889  PQclear(last_res);
890  PG_RE_THROW();
891  }
892  PG_END_TRY();
893 
894  return last_res;
895 }

References CHECK_FOR_INTERRUPTS, conn, ERROR, MyLatch, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_report_error(), PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), res, ResetLatch(), WAIT_EVENT_EXTENSION, WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by create_cursor(), do_sql_command_end(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), pgfdw_exec_query(), and prepare_foreign_modify().

◆ pgfdw_inval_callback()

static void pgfdw_inval_callback ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1229 of file connection.c.

1230 {
1231  HASH_SEQ_STATUS scan;
1232  ConnCacheEntry *entry;
1233 
1234  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1235 
1236  /* ConnectionHash must exist already, if we're registered */
1237  hash_seq_init(&scan, ConnectionHash);
1238  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1239  {
1240  /* Ignore invalid entries */
1241  if (entry->conn == NULL)
1242  continue;
1243 
1244  /* hashvalue == 0 means a cache reset, must clear all state */
1245  if (hashvalue == 0 ||
1246  (cacheid == FOREIGNSERVEROID &&
1247  entry->server_hashvalue == hashvalue) ||
1248  (cacheid == USERMAPPINGOID &&
1249  entry->mapping_hashvalue == hashvalue))
1250  {
1251  /*
1252  * Close the connection immediately if it's not used yet in this
1253  * transaction. Otherwise mark it as invalid so that
1254  * pgfdw_xact_callback() can close it at the end of this
1255  * transaction.
1256  */
1257  if (entry->xact_depth == 0)
1258  {
1259  elog(DEBUG3, "discarding connection %p", entry->conn);
1260  disconnect_pg_server(entry);
1261  }
1262  else
1263  entry->invalidated = true;
1264  }
1265  }
1266 }

References Assert(), ConnCacheEntry::conn, ConnectionHash, DEBUG3, disconnect_pg_server(), elog(), FOREIGNSERVEROID, hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, ConnCacheEntry::mapping_hashvalue, ConnCacheEntry::server_hashvalue, USERMAPPINGOID, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ pgfdw_reject_incomplete_xact_state_change()

static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry entry)
static

Definition at line 1279 of file connection.c.

1280 {
1281  ForeignServer *server;
1282 
1283  /* nothing to do for inactive entries and entries of sane state */
1284  if (entry->conn == NULL || !entry->changing_xact_state)
1285  return;
1286 
1287  /* make sure this entry is inactive */
1288  disconnect_pg_server(entry);
1289 
1290  /* find server name to be shown in the message below */
1291  server = GetForeignServer(entry->serverid);
1292 
1293  ereport(ERROR,
1294  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1295  errmsg("connection to server \"%s\" was lost",
1296  server->servername)));
1297 }

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, disconnect_pg_server(), ereport, errcode(), errmsg(), ERROR, GetForeignServer(), ConnCacheEntry::serverid, and ForeignServer::servername.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_report_error()

void pgfdw_report_error ( int  elevel,
PGresult res,
PGconn conn,
bool  clear,
const char *  sql 
)

Definition at line 911 of file connection.c.

913 {
914  /* If requested, PGresult must be released before leaving this function. */
915  PG_TRY();
916  {
917  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
918  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
919  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
920  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
921  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
922  int sqlstate;
923 
924  if (diag_sqlstate)
925  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
926  diag_sqlstate[1],
927  diag_sqlstate[2],
928  diag_sqlstate[3],
929  diag_sqlstate[4]);
930  else
931  sqlstate = ERRCODE_CONNECTION_FAILURE;
932 
933  /*
934  * If we don't get a message from the PGresult, try the PGconn. This
935  * is needed because for connection-level failures, PQexec may just
936  * return NULL, not a PGresult at all.
937  */
938  if (message_primary == NULL)
939  message_primary = pchomp(PQerrorMessage(conn));
940 
941  ereport(elevel,
942  (errcode(sqlstate),
943  (message_primary != NULL && message_primary[0] != '\0') ?
944  errmsg_internal("%s", message_primary) :
945  errmsg("could not obtain message string for remote error"),
946  message_detail ? errdetail_internal("%s", message_detail) : 0,
947  message_hint ? errhint("%s", message_hint) : 0,
948  message_context ? errcontext("%s", message_context) : 0,
949  sql ? errcontext("remote SQL command: %s", sql) : 0));
950  }
951  PG_FINALLY();
952  {
953  if (clear)
954  PQclear(res);
955  }
956  PG_END_TRY();
957 }
int errhint(const char *fmt,...)
Definition: elog.c:1316
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define PG_FINALLY(...)
Definition: elog.h:387
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3380
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:59
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:57
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:58
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:63

References conn, ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, pchomp(), PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_FINALLY, PG_TRY, PQclear(), PQerrorMessage(), PQresultErrorField(), and res.

Referenced by close_cursor(), create_cursor(), deallocate_query(), do_sql_command_begin(), do_sql_command_end(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), fetch_more_data_begin(), get_remote_estimate(), pgfdw_exec_cleanup_query_begin(), pgfdw_exec_cleanup_query_end(), pgfdw_exec_query(), pgfdw_get_result(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresForeignAsyncNotify(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), postgresReScanForeignScan(), and prepare_foreign_modify().

◆ pgfdw_reset_xact_state()

static void pgfdw_reset_xact_state ( ConnCacheEntry entry,
bool  toplevel 
)
static

Definition at line 1303 of file connection.c.

1304 {
1305  if (toplevel)
1306  {
1307  /* Reset state to show we're out of a transaction */
1308  entry->xact_depth = 0;
1309 
1310  /*
1311  * If the connection isn't in a good idle state, it is marked as
1312  * invalid or keep_connections option of its server is disabled, then
1313  * discard it to recover. Next GetConnection will open a new
1314  * connection.
1315  */
1316  if (PQstatus(entry->conn) != CONNECTION_OK ||
1317  PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1318  entry->changing_xact_state ||
1319  entry->invalidated ||
1320  !entry->keep_connections)
1321  {
1322  elog(DEBUG3, "discarding connection %p", entry->conn);
1323  disconnect_pg_server(entry);
1324  }
1325  }
1326  else
1327  {
1328  /* Reset state to show we're out of a subtransaction */
1329  entry->xact_depth--;
1330  }
1331 }
@ PQTRANS_IDLE
Definition: libpq-fe.h:118

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_OK, DEBUG3, disconnect_pg_server(), elog(), ConnCacheEntry::invalidated, ConnCacheEntry::keep_connections, PQstatus(), PQTRANS_IDLE, PQtransactionStatus(), and ConnCacheEntry::xact_depth.

Referenced by pgfdw_finish_abort_cleanup(), pgfdw_finish_pre_commit_cleanup(), pgfdw_finish_pre_subcommit_cleanup(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_security_check()

static void pgfdw_security_check ( const char **  keywords,
const char **  values,
UserMapping user,
PGconn conn 
)
static

Definition at line 397 of file connection.c.

398 {
399  /* Superusers bypass the check */
400  if (superuser_arg(user->userid))
401  return;
402 
403 #ifdef ENABLE_GSS
404  /* Connected via GSSAPI with delegated credentials- all good. */
406  return;
407 #endif
408 
409  /* Ok if superuser set PW required false. */
411  return;
412 
413  /* Connected via PW, with PW required true, and provided non-empty PW. */
415  {
416  /* ok if params contain a non-empty password */
417  for (int i = 0; keywords[i] != NULL; i++)
418  {
419  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
420  return;
421  }
422  }
423 
424  ereport(ERROR,
425  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
426  errmsg("password or GSSAPI delegated credentials required"),
427  errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
428  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
429 }
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7314
int PQconnectionUsedGSSAPI(const PGconn *conn)
Definition: fe-connect.c:7325

References be_gssapi_get_delegation(), conn, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, i, MyProcPort, PQconnectionUsedGSSAPI(), PQconnectionUsedPassword(), superuser_arg(), user, UserMappingPasswordRequired(), and values.

Referenced by connect_pg_server().

◆ pgfdw_subxact_callback()

static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
)
static

Definition at line 1116 of file connection.c.

1118 {
1119  HASH_SEQ_STATUS scan;
1120  ConnCacheEntry *entry;
1121  int curlevel;
1122  List *pending_entries = NIL;
1123  List *cancel_requested = NIL;
1124 
1125  /* Nothing to do at subxact start, nor after commit. */
1126  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1127  event == SUBXACT_EVENT_ABORT_SUB))
1128  return;
1129 
1130  /* Quick exit if no connections were touched in this transaction. */
1131  if (!xact_got_connection)
1132  return;
1133 
1134  /*
1135  * Scan all connection cache entries to find open remote subtransactions
1136  * of the current level, and close them.
1137  */
1138  curlevel = GetCurrentTransactionNestLevel();
1139  hash_seq_init(&scan, ConnectionHash);
1140  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1141  {
1142  char sql[100];
1143 
1144  /*
1145  * We only care about connections with open remote subtransactions of
1146  * the current level.
1147  */
1148  if (entry->conn == NULL || entry->xact_depth < curlevel)
1149  continue;
1150 
1151  if (entry->xact_depth > curlevel)
1152  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1153  entry->xact_depth);
1154 
1155  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1156  {
1157  /*
1158  * If abort cleanup previously failed for this connection, we
1159  * can't issue any more commands against it.
1160  */
1162 
1163  /* Commit all remote subtransactions during pre-commit */
1164  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1165  entry->changing_xact_state = true;
1166  if (entry->parallel_commit)
1167  {
1168  do_sql_command_begin(entry->conn, sql);
1169  pending_entries = lappend(pending_entries, entry);
1170  continue;
1171  }
1172  do_sql_command(entry->conn, sql);
1173  entry->changing_xact_state = false;
1174  }
1175  else
1176  {
1177  /* Rollback all remote subtransactions during abort */
1178  if (entry->parallel_abort)
1179  {
1180  if (pgfdw_abort_cleanup_begin(entry, false,
1181  &pending_entries,
1182  &cancel_requested))
1183  continue;
1184  }
1185  else
1186  pgfdw_abort_cleanup(entry, false);
1187  }
1188 
1189  /* OK, we're outta that level of subtransaction */
1190  pgfdw_reset_xact_state(entry, false);
1191  }
1192 
1193  /* If there are any pending connections, finish cleaning them up */
1194  if (pending_entries || cancel_requested)
1195  {
1196  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1197  {
1198  Assert(cancel_requested == NIL);
1199  pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1200  }
1201  else
1202  {
1203  Assert(event == SUBXACT_EVENT_ABORT_SUB);
1204  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1205  false);
1206  }
1207  }
1208 }
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
Definition: connection.c:1821
static void pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, bool toplevel)
Definition: connection.c:1855
static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
Definition: connection.c:1693
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1619
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition: xact.h:145
@ SUBXACT_EVENT_ABORT_SUB
Definition: xact.h:144

References Assert(), ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, ConnectionHash, do_sql_command(), do_sql_command_begin(), elog(), ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), lappend(), NIL, ConnCacheEntry::parallel_abort, ConnCacheEntry::parallel_commit, pgfdw_abort_cleanup(), pgfdw_abort_cleanup_begin(), pgfdw_finish_abort_cleanup(), pgfdw_finish_pre_subcommit_cleanup(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_reset_xact_state(), snprintf, SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

◆ pgfdw_xact_callback()

static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
)
static

Definition at line 967 of file connection.c.

968 {
969  HASH_SEQ_STATUS scan;
970  ConnCacheEntry *entry;
971  List *pending_entries = NIL;
972  List *cancel_requested = NIL;
973 
974  /* Quick exit if no connections were touched in this transaction. */
975  if (!xact_got_connection)
976  return;
977 
978  /*
979  * Scan all connection cache entries to find open remote transactions, and
980  * close them.
981  */
983  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
984  {
985  PGresult *res;
986 
987  /* Ignore cache entry if no open connection right now */
988  if (entry->conn == NULL)
989  continue;
990 
991  /* If it has an open remote transaction, try to close it */
992  if (entry->xact_depth > 0)
993  {
994  elog(DEBUG3, "closing remote transaction on connection %p",
995  entry->conn);
996 
997  switch (event)
998  {
1000  case XACT_EVENT_PRE_COMMIT:
1001 
1002  /*
1003  * If abort cleanup previously failed for this connection,
1004  * we can't issue any more commands against it.
1005  */
1007 
1008  /* Commit all remote transactions during pre-commit */
1009  entry->changing_xact_state = true;
1010  if (entry->parallel_commit)
1011  {
1012  do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
1013  pending_entries = lappend(pending_entries, entry);
1014  continue;
1015  }
1016  do_sql_command(entry->conn, "COMMIT TRANSACTION");
1017  entry->changing_xact_state = false;
1018 
1019  /*
1020  * If there were any errors in subtransactions, and we
1021  * made prepared statements, do a DEALLOCATE ALL to make
1022  * sure we get rid of all prepared statements. This is
1023  * annoying and not terribly bulletproof, but it's
1024  * probably not worth trying harder.
1025  *
1026  * DEALLOCATE ALL only exists in 8.3 and later, so this
1027  * constrains how old a server postgres_fdw can
1028  * communicate with. We intentionally ignore errors in
1029  * the DEALLOCATE, so that we can hobble along to some
1030  * extent with older servers (leaking prepared statements
1031  * as we go; but we don't really support update operations
1032  * pre-8.3 anyway).
1033  */
1034  if (entry->have_prep_stmt && entry->have_error)
1035  {
1036  res = PQexec(entry->conn, "DEALLOCATE ALL");
1037  PQclear(res);
1038  }
1039  entry->have_prep_stmt = false;
1040  entry->have_error = false;
1041  break;
1043 
1044  /*
1045  * We disallow any remote transactions, since it's not
1046  * very reasonable to hold them open until the prepared
1047  * transaction is committed. For the moment, throw error
1048  * unconditionally; later we might allow read-only cases.
1049  * Note that the error will cause us to come right back
1050  * here with event == XACT_EVENT_ABORT, so we'll clean up
1051  * the connection state at that point.
1052  */
1053  ereport(ERROR,
1054  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1055  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1056  break;
1058  case XACT_EVENT_COMMIT:
1059  case XACT_EVENT_PREPARE:
1060  /* Pre-commit should have closed the open transaction */
1061  elog(ERROR, "missed cleaning up connection during pre-commit");
1062  break;
1064  case XACT_EVENT_ABORT:
1065  /* Rollback all remote transactions during abort */
1066  if (entry->parallel_abort)
1067  {
1068  if (pgfdw_abort_cleanup_begin(entry, true,
1069  &pending_entries,
1070  &cancel_requested))
1071  continue;
1072  }
1073  else
1074  pgfdw_abort_cleanup(entry, true);
1075  break;
1076  }
1077  }
1078 
1079  /* Reset state to show we're out of a transaction */
1080  pgfdw_reset_xact_state(entry, true);
1081  }
1082 
1083  /* If there are any pending connections, finish cleaning them up */
1084  if (pending_entries || cancel_requested)
1085  {
1086  if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1087  event == XACT_EVENT_PRE_COMMIT)
1088  {
1089  Assert(cancel_requested == NIL);
1090  pgfdw_finish_pre_commit_cleanup(pending_entries);
1091  }
1092  else
1093  {
1094  Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1095  event == XACT_EVENT_ABORT);
1096  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1097  true);
1098  }
1099  }
1100 
1101  /*
1102  * Regardless of the event type, we can now mark ourselves as out of the
1103  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1104  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1105  */
1106  xact_got_connection = false;
1107 
1108  /* Also reset cursor numbering for next transaction */
1109  cursor_number = 0;
1110 }
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Definition: connection.c:1747
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2228
@ XACT_EVENT_PRE_PREPARE
Definition: xact.h:135
@ XACT_EVENT_COMMIT
Definition: xact.h:128
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition: xact.h:134
@ XACT_EVENT_PARALLEL_COMMIT
Definition: xact.h:129
@ XACT_EVENT_ABORT
Definition: xact.h:130
@ XACT_EVENT_PRE_COMMIT
Definition: xact.h:133
@ XACT_EVENT_PARALLEL_ABORT
Definition: xact.h:131
@ XACT_EVENT_PREPARE
Definition: xact.h:132

References Assert(), ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, ConnectionHash, cursor_number, DEBUG3, do_sql_command(), do_sql_command_begin(), elog(), ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, lappend(), NIL, ConnCacheEntry::parallel_abort, ConnCacheEntry::parallel_commit, pgfdw_abort_cleanup(), pgfdw_abort_cleanup_begin(), pgfdw_finish_abort_cleanup(), pgfdw_finish_pre_commit_cleanup(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_reset_xact_state(), PQclear(), PQexec(), res, ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

◆ postgres_fdw_disconnect()

Datum postgres_fdw_disconnect ( PG_FUNCTION_ARGS  )

Definition at line 2115 of file connection.c.

2116 {
2117  ForeignServer *server;
2118  char *servername;
2119 
2120  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2121  server = GetForeignServerByName(servername, false);
2122 
2124 }
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:2164
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:182
char * text_to_cstring(const text *t)
Definition: varlena.c:215

References disconnect_cached_connections(), GetForeignServerByName(), PG_GETARG_TEXT_PP, PG_RETURN_BOOL, ForeignServer::serverid, and text_to_cstring().

◆ postgres_fdw_disconnect_all()

Datum postgres_fdw_disconnect_all ( PG_FUNCTION_ARGS  )

Definition at line 2136 of file connection.c.

2137 {
2139 }
#define InvalidOid
Definition: postgres_ext.h:36

References disconnect_cached_connections(), InvalidOid, and PG_RETURN_BOOL.

◆ postgres_fdw_get_connections()

Datum postgres_fdw_get_connections ( PG_FUNCTION_ARGS  )

Definition at line 2026 of file connection.c.

2027 {
2028 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
2029  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2030  HASH_SEQ_STATUS scan;
2031  ConnCacheEntry *entry;
2032 
2033  InitMaterializedSRF(fcinfo, 0);
2034 
2035  /* If cache doesn't exist, we return no records */
2036  if (!ConnectionHash)
2037  PG_RETURN_VOID();
2038 
2039  hash_seq_init(&scan, ConnectionHash);
2040  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2041  {
2042  ForeignServer *server;
2044  bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2045 
2046  /* We only look for open remote connections */
2047  if (!entry->conn)
2048  continue;
2049 
2050  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2051 
2052  /*
2053  * The foreign server may have been dropped in current explicit
2054  * transaction. It is not possible to drop the server from another
2055  * session when the connection associated with it is in use in the
2056  * current transaction, if tried so, the drop query in another session
2057  * blocks until the current transaction finishes.
2058  *
2059  * Even though the server is dropped in the current transaction, the
2060  * cache can still have associated active connection entry, say we
2061  * call such connections dangling. Since we can not fetch the server
2062  * name from system catalogs for dangling connections, instead we show
2063  * NULL value for server name in output.
2064  *
2065  * We could have done better by storing the server name in the cache
2066  * entry instead of server oid so that it could be used in the output.
2067  * But the server name in each cache entry requires 64 bytes of
2068  * memory, which is huge, when there are many cached connections and
2069  * the use case i.e. dropping the foreign server within the explicit
2070  * current transaction seems rare. So, we chose to show NULL value for
2071  * server name in output.
2072  *
2073  * Such dangling connections get closed either in next use or at the
2074  * end of current explicit transaction in pgfdw_xact_callback.
2075  */
2076  if (!server)
2077  {
2078  /*
2079  * If the server has been dropped in the current explicit
2080  * transaction, then this entry would have been invalidated in
2081  * pgfdw_inval_callback at the end of drop server command. Note
2082  * that this connection would not have been closed in
2083  * pgfdw_inval_callback because it is still being used in the
2084  * current explicit transaction. So, assert that here.
2085  */
2086  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2087 
2088  /* Show null, if no server name was found */
2089  nulls[0] = true;
2090  }
2091  else
2092  values[0] = CStringGetTextDatum(server->servername);
2093 
2094  values[1] = BoolGetDatum(!entry->invalidated);
2095 
2096  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2097  }
2098 
2099  PG_RETURN_VOID();
2100 }
#define CStringGetTextDatum(s)
Definition: builtins.h:94
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
#define PG_RETURN_VOID()
Definition: fmgr.h:349
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
TupleDesc setDesc
Definition: execnodes.h:334
Tuplestorestate * setResult
Definition: execnodes.h:333
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750

References Assert(), BoolGetDatum(), ConnectionHash, CStringGetTextDatum, FSV_MISSING_OK, GetForeignServerExtended(), hash_seq_init(), hash_seq_search(), InitMaterializedSRF(), PG_RETURN_VOID, POSTGRES_FDW_GET_CONNECTIONS_COLS, ForeignServer::servername, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, tuplestore_putvalues(), and values.

◆ ReleaseConnection()

void ReleaseConnection ( PGconn conn)

Definition at line 770 of file connection.c.

771 {
772  /*
773  * Currently, we don't actually track connection references because all
774  * cleanup is managed on a transaction or subtransaction basis instead. So
775  * there's nothing to do here.
776  */
777 }

Referenced by estimate_path_cost_size(), finish_foreign_modify(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndDirectModify(), postgresEndForeignScan(), postgresGetAnalyzeInfoForForeignTable(), and postgresImportForeignSchema().

◆ UserMappingPasswordRequired()

static bool UserMappingPasswordRequired ( UserMapping user)
static

Definition at line 581 of file connection.c.

582 {
583  ListCell *cell;
584 
585  foreach(cell, user->options)
586  {
587  DefElem *def = (DefElem *) lfirst(cell);
588 
589  if (strcmp(def->defname, "password_required") == 0)
590  return defGetBoolean(def);
591  }
592 
593  return true;
594 }

References defGetBoolean(), DefElem::defname, lfirst, and user.

Referenced by check_conn_params(), and pgfdw_security_check().

Variable Documentation

◆ ConnectionHash

◆ cursor_number

unsigned int cursor_number = 0
static

◆ prep_stmt_number

unsigned int prep_stmt_number = 0
static

Definition at line 81 of file connection.c.

Referenced by GetPrepStmtNumber().

◆ xact_got_connection

bool xact_got_connection = false
static

Definition at line 84 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().