PostgreSQL Source Code  git master
connection.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq-be.h"
#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postgres_fdw.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheEntry
 

Macros

#define CONNECTION_CLEANUP_TIMEOUT   30000
 
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
 
#define POSTGRES_FDW_GET_CONNECTIONS_COLS   2
 

Typedefs

typedef Oid ConnCacheKey
 
typedef struct ConnCacheEntry ConnCacheEntry
 

Functions

 PG_FUNCTION_INFO_V1 (postgres_fdw_get_connections)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_disconnect)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_disconnect_all)
 
static void make_new_connection (ConnCacheEntry *entry, UserMapping *user)
 
static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
 
static void disconnect_pg_server (ConnCacheEntry *entry)
 
static void check_conn_params (const char **keywords, const char **values, UserMapping *user)
 
static void configure_remote_session (PGconn *conn)
 
static void do_sql_command_begin (PGconn *conn, const char *sql)
 
static void do_sql_command_end (PGconn *conn, const char *sql, bool consume_input)
 
static void begin_remote_xact (ConnCacheEntry *entry)
 
static void pgfdw_xact_callback (XactEvent event, void *arg)
 
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
 
static void pgfdw_inval_callback (Datum arg, int cacheid, uint32 hashvalue)
 
static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry *entry)
 
static void pgfdw_reset_xact_state (ConnCacheEntry *entry, bool toplevel)
 
static bool pgfdw_cancel_query (PGconn *conn)
 
static bool pgfdw_cancel_query_begin (PGconn *conn, TimestampTz endtime)
 
static bool pgfdw_cancel_query_end (PGconn *conn, TimestampTz endtime, bool consume_input)
 
static bool pgfdw_exec_cleanup_query (PGconn *conn, const char *query, bool ignore_errors)
 
static bool pgfdw_exec_cleanup_query_begin (PGconn *conn, const char *query)
 
static bool pgfdw_exec_cleanup_query_end (PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
 
static bool pgfdw_get_cleanup_result (PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
 
static void pgfdw_abort_cleanup (ConnCacheEntry *entry, bool toplevel)
 
static bool pgfdw_abort_cleanup_begin (ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
 
static void pgfdw_finish_pre_commit_cleanup (List *pending_entries)
 
static void pgfdw_finish_pre_subcommit_cleanup (List *pending_entries, int curlevel)
 
static void pgfdw_finish_abort_cleanup (List *pending_entries, List *cancel_requested, bool toplevel)
 
static void pgfdw_security_check (const char **keywords, const char **values, UserMapping *user, PGconn *conn)
 
static bool UserMappingPasswordRequired (UserMapping *user)
 
static bool disconnect_cached_connections (Oid serverid)
 
PGconnGetConnection (UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
 
void do_sql_command (PGconn *conn, const char *sql)
 
void ReleaseConnection (PGconn *conn)
 
unsigned int GetCursorNumber (PGconn *conn)
 
unsigned int GetPrepStmtNumber (PGconn *conn)
 
PGresultpgfdw_exec_query (PGconn *conn, const char *query, PgFdwConnState *state)
 
PGresultpgfdw_get_result (PGconn *conn)
 
void pgfdw_report_error (int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
 
Datum postgres_fdw_get_connections (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_disconnect (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_disconnect_all (PG_FUNCTION_ARGS)
 

Variables

static HTABConnectionHash = NULL
 
static unsigned int cursor_number = 0
 
static unsigned int prep_stmt_number = 0
 
static bool xact_got_connection = false
 
static uint32 pgfdw_we_cleanup_result = 0
 
static uint32 pgfdw_we_connect = 0
 
static uint32 pgfdw_we_get_result = 0
 

Macro Definition Documentation

◆ CONNECTION_CLEANUP_TIMEOUT

#define CONNECTION_CLEANUP_TIMEOUT   30000

Definition at line 96 of file connection.c.

◆ CONSTRUCT_ABORT_COMMAND

#define CONSTRUCT_ABORT_COMMAND (   sql,
  entry,
  toplevel 
)
Value:
do { \
if (toplevel) \
snprintf((sql), sizeof(sql), \
"ABORT TRANSACTION"); \
snprintf((sql), sizeof(sql), \
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
(entry)->xact_depth, (entry)->xact_depth); \
} while(0)
#define snprintf
Definition: port.h:238

Definition at line 99 of file connection.c.

◆ POSTGRES_FDW_GET_CONNECTIONS_COLS

#define POSTGRES_FDW_GET_CONNECTIONS_COLS   2

Typedef Documentation

◆ ConnCacheEntry

◆ ConnCacheKey

typedef Oid ConnCacheKey

Definition at line 51 of file connection.c.

Function Documentation

◆ begin_remote_xact()

static void begin_remote_xact ( ConnCacheEntry entry)
static

Definition at line 742 of file connection.c.

743 {
744  int curlevel = GetCurrentTransactionNestLevel();
745 
746  /* Start main transaction if we haven't yet */
747  if (entry->xact_depth <= 0)
748  {
749  const char *sql;
750 
751  elog(DEBUG3, "starting remote transaction on connection %p",
752  entry->conn);
753 
755  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
756  else
757  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
758  entry->changing_xact_state = true;
759  do_sql_command(entry->conn, sql);
760  entry->xact_depth = 1;
761  entry->changing_xact_state = false;
762  }
763 
764  /*
765  * If we're in a subtransaction, stack up savepoints to match our level.
766  * This ensures we can rollback just the desired effects when a
767  * subtransaction aborts.
768  */
769  while (entry->xact_depth < curlevel)
770  {
771  char sql[64];
772 
773  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
774  entry->changing_xact_state = true;
775  do_sql_command(entry->conn, sql);
776  entry->xact_depth++;
777  entry->changing_xact_state = false;
778  }
779 }
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:699
#define DEBUG3
Definition: elog.h:28
#define elog(elevel,...)
Definition: elog.h:224
PGconn * conn
Definition: connection.c:56
bool changing_xact_state
Definition: connection.c:62
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:927
#define IsolationIsSerializable()
Definition: xact.h:52

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog, GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ check_conn_params()

static void check_conn_params ( const char **  keywords,
const char **  values,
UserMapping user 
)
static

Definition at line 618 of file connection.c.

619 {
620  int i;
621 
622  /* no check required if superuser */
623  if (superuser_arg(user->userid))
624  return;
625 
626 #ifdef ENABLE_GSS
627  /* ok if the user provided their own delegated credentials */
629  return;
630 #endif
631 
632  /* ok if params contain a non-empty password */
633  for (i = 0; keywords[i] != NULL; i++)
634  {
635  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
636  return;
637  }
638 
639  /* ok if the superuser explicitly said so at user mapping creation time */
641  return;
642 
643  ereport(ERROR,
644  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
645  errmsg("password or GSSAPI delegated credentials required"),
646  errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
647 }
bool be_gssapi_get_delegation(Port *port)
static Datum values[MAXATTR]
Definition: bootstrap.c:150
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:594
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
struct Port * MyProcPort
Definition: globals.c:50
int i
Definition: isn.c:73
static char * user
Definition: pg_regress.c:120
bool superuser_arg(Oid roleid)
Definition: superuser.c:56

References be_gssapi_get_delegation(), ereport, errcode(), errdetail(), errmsg(), ERROR, i, MyProcPort, superuser_arg(), user, UserMappingPasswordRequired(), and values.

Referenced by connect_pg_server().

◆ configure_remote_session()

static void configure_remote_session ( PGconn conn)
static

Definition at line 661 of file connection.c.

662 {
663  int remoteversion = PQserverVersion(conn);
664 
665  /* Force the search path to contain only pg_catalog (see deparse.c) */
666  do_sql_command(conn, "SET search_path = pg_catalog");
667 
668  /*
669  * Set remote timezone; this is basically just cosmetic, since all
670  * transmitted and returned timestamptzs should specify a zone explicitly
671  * anyway. However it makes the regression test outputs more predictable.
672  *
673  * We don't risk setting remote zone equal to ours, since the remote
674  * server might use a different timezone database. Instead, use GMT
675  * (quoted, because very old servers are picky about case). That's
676  * guaranteed to work regardless of the remote's timezone database,
677  * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
678  */
679  do_sql_command(conn, "SET timezone = 'GMT'");
680 
681  /*
682  * Set values needed to ensure unambiguous data output from remote. (This
683  * logic should match what pg_dump does. See also set_transmission_modes
684  * in postgres_fdw.c.)
685  */
686  do_sql_command(conn, "SET datestyle = ISO");
687  if (remoteversion >= 80400)
688  do_sql_command(conn, "SET intervalstyle = postgres");
689  if (remoteversion >= 90000)
690  do_sql_command(conn, "SET extra_float_digits = 3");
691  else
692  do_sql_command(conn, "SET extra_float_digits = 2");
693 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7144
PGconn * conn
Definition: streamutil.c:55

References conn, do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

◆ connect_pg_server()

static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
)
static

Definition at line 444 of file connection.c.

445 {
446  PGconn *volatile conn = NULL;
447 
448  /*
449  * Use PG_TRY block to ensure closing connection on error.
450  */
451  PG_TRY();
452  {
453  const char **keywords;
454  const char **values;
455  char *appname = NULL;
456  int n;
457 
458  /*
459  * Construct connection params from generic options of ForeignServer
460  * and UserMapping. (Some of them might not be libpq options, in
461  * which case we'll just waste a few array slots.) Add 4 extra slots
462  * for application_name, fallback_application_name, client_encoding,
463  * end marker.
464  */
465  n = list_length(server->options) + list_length(user->options) + 4;
466  keywords = (const char **) palloc(n * sizeof(char *));
467  values = (const char **) palloc(n * sizeof(char *));
468 
469  n = 0;
470  n += ExtractConnectionOptions(server->options,
471  keywords + n, values + n);
472  n += ExtractConnectionOptions(user->options,
473  keywords + n, values + n);
474 
475  /*
476  * Use pgfdw_application_name as application_name if set.
477  *
478  * PQconnectdbParams() processes the parameter arrays from start to
479  * end. If any key word is repeated, the last value is used. Therefore
480  * note that pgfdw_application_name must be added to the arrays after
481  * options of ForeignServer are, so that it can override
482  * application_name set in ForeignServer.
483  */
485  {
486  keywords[n] = "application_name";
488  n++;
489  }
490 
491  /*
492  * Search the parameter arrays to find application_name setting, and
493  * replace escape sequences in it with status information if found.
494  * The arrays are searched backwards because the last value is used if
495  * application_name is repeatedly set.
496  */
497  for (int i = n - 1; i >= 0; i--)
498  {
499  if (strcmp(keywords[i], "application_name") == 0 &&
500  *(values[i]) != '\0')
501  {
502  /*
503  * Use this application_name setting if it's not empty string
504  * even after any escape sequences in it are replaced.
505  */
506  appname = process_pgfdw_appname(values[i]);
507  if (appname[0] != '\0')
508  {
509  values[i] = appname;
510  break;
511  }
512 
513  /*
514  * This empty application_name is not used, so we set
515  * values[i] to NULL and keep searching the array to find the
516  * next one.
517  */
518  values[i] = NULL;
519  pfree(appname);
520  appname = NULL;
521  }
522  }
523 
524  /* Use "postgres_fdw" as fallback_application_name */
525  keywords[n] = "fallback_application_name";
526  values[n] = "postgres_fdw";
527  n++;
528 
529  /* Set client_encoding so that libpq can convert encoding properly. */
530  keywords[n] = "client_encoding";
532  n++;
533 
534  keywords[n] = values[n] = NULL;
535 
536  /* verify the set of connection parameters */
537  check_conn_params(keywords, values, user);
538 
539  /* first time, allocate or get the custom wait event */
540  if (pgfdw_we_connect == 0)
541  pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
542 
543  /* OK to make connection */
544  conn = libpqsrv_connect_params(keywords, values,
545  false, /* expand_dbname */
547 
548  if (!conn || PQstatus(conn) != CONNECTION_OK)
549  ereport(ERROR,
550  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
551  errmsg("could not connect to server \"%s\"",
552  server->servername),
554 
555  /* Perform post-connection security checks */
556  pgfdw_security_check(keywords, values, user, conn);
557 
558  /* Prepare new session for use */
560 
561  if (appname != NULL)
562  pfree(appname);
563  pfree(keywords);
564  pfree(values);
565  }
566  PG_CATCH();
567  {
569  PG_RE_THROW();
570  }
571  PG_END_TRY();
572 
573  return conn;
574 }
static void configure_remote_session(PGconn *conn)
Definition: connection.c:661
static void pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
Definition: connection.c:406
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:618
static uint32 pgfdw_we_connect
Definition: connection.c:88
char * process_pgfdw_appname(const char *appname)
Definition: option.c:491
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:414
char * pgfdw_application_name
Definition: option.c:52
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1230
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7154
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7101
static void libpqsrv_disconnect(PGconn *conn)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
@ CONNECTION_OK
Definition: libpq-fe.h:60
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1267
char * pchomp(const char *in)
Definition: mcxt.c:1724
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc(Size size)
Definition: mcxt.c:1317
static int list_length(const List *l)
Definition: pg_list.h:152
List * options
Definition: foreign.h:42
char * servername
Definition: foreign.h:39
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition: wait_event.c:164

References check_conn_params(), configure_remote_session(), conn, CONNECTION_OK, ereport, errcode(), errdetail_internal(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), i, libpqsrv_connect_params(), libpqsrv_disconnect(), list_length(), ForeignServer::options, palloc(), pchomp(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_application_name, pgfdw_security_check(), pgfdw_we_connect, PQerrorMessage(), PQstatus(), process_pgfdw_appname(), ForeignServer::servername, user, values, and WaitEventExtensionNew().

Referenced by make_new_connection().

◆ disconnect_cached_connections()

static bool disconnect_cached_connections ( Oid  serverid)
static

Definition at line 2132 of file connection.c.

2133 {
2134  HASH_SEQ_STATUS scan;
2135  ConnCacheEntry *entry;
2136  bool all = !OidIsValid(serverid);
2137  bool result = false;
2138 
2139  /*
2140  * Connection cache hashtable has not been initialized yet in this
2141  * session, so return false.
2142  */
2143  if (!ConnectionHash)
2144  return false;
2145 
2146  hash_seq_init(&scan, ConnectionHash);
2147  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2148  {
2149  /* Ignore cache entry if no open connection right now. */
2150  if (!entry->conn)
2151  continue;
2152 
2153  if (all || entry->serverid == serverid)
2154  {
2155  /*
2156  * Emit a warning because the connection to close is used in the
2157  * current transaction and cannot be disconnected right now.
2158  */
2159  if (entry->xact_depth > 0)
2160  {
2161  ForeignServer *server;
2162 
2163  server = GetForeignServerExtended(entry->serverid,
2164  FSV_MISSING_OK);
2165 
2166  if (!server)
2167  {
2168  /*
2169  * If the foreign server was dropped while its connection
2170  * was used in the current transaction, the connection
2171  * must have been marked as invalid by
2172  * pgfdw_inval_callback at the end of DROP SERVER command.
2173  */
2174  Assert(entry->invalidated);
2175 
2176  ereport(WARNING,
2177  (errmsg("cannot close dropped server connection because it is still in use")));
2178  }
2179  else
2180  ereport(WARNING,
2181  (errmsg("cannot close connection for server \"%s\" because it is still in use",
2182  server->servername)));
2183  }
2184  else
2185  {
2186  elog(DEBUG3, "discarding connection %p", entry->conn);
2187  disconnect_pg_server(entry);
2188  result = true;
2189  }
2190  }
2191  }
2192 
2193  return result;
2194 }
#define Assert(condition)
Definition: c.h:858
#define OidIsValid(objectId)
Definition: c.h:775
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:580
static HTAB * ConnectionHash
Definition: connection.c:77
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1395
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
#define WARNING
Definition: elog.h:36
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:122
#define FSV_MISSING_OK
Definition: foreign.h:61
bool invalidated
Definition: connection.c:65

References Assert, ConnCacheEntry::conn, ConnectionHash, DEBUG3, disconnect_pg_server(), elog, ereport, errmsg(), FSV_MISSING_OK, GetForeignServerExtended(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, OidIsValid, ConnCacheEntry::serverid, ForeignServer::servername, WARNING, and ConnCacheEntry::xact_depth.

Referenced by postgres_fdw_disconnect(), and postgres_fdw_disconnect_all().

◆ disconnect_pg_server()

static void disconnect_pg_server ( ConnCacheEntry entry)
static

Definition at line 580 of file connection.c.

581 {
582  if (entry->conn != NULL)
583  {
584  libpqsrv_disconnect(entry->conn);
585  entry->conn = NULL;
586  }
587 }

References ConnCacheEntry::conn, and libpqsrv_disconnect().

Referenced by disconnect_cached_connections(), GetConnection(), pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), and pgfdw_reset_xact_state().

◆ do_sql_command()

void do_sql_command ( PGconn conn,
const char *  sql 
)

Definition at line 699 of file connection.c.

700 {
702  do_sql_command_end(conn, sql, false);
703 }
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
Definition: connection.c:713
static void do_sql_command_begin(PGconn *conn, const char *sql)
Definition: connection.c:706

References conn, do_sql_command_begin(), and do_sql_command_end().

Referenced by begin_remote_xact(), configure_remote_session(), pgfdw_subxact_callback(), pgfdw_xact_callback(), and postgresExecForeignTruncate().

◆ do_sql_command_begin()

static void do_sql_command_begin ( PGconn conn,
const char *  sql 
)
static

Definition at line 706 of file connection.c.

707 {
708  if (!PQsendQuery(conn, sql))
709  pgfdw_report_error(ERROR, NULL, conn, false, sql);
710 }
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:871
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416

References conn, ERROR, pgfdw_report_error(), and PQsendQuery().

Referenced by do_sql_command(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ do_sql_command_end()

static void do_sql_command_end ( PGconn conn,
const char *  sql,
bool  consume_input 
)
static

Definition at line 713 of file connection.c.

714 {
715  PGresult *res;
716 
717  /*
718  * If requested, consume whatever data is available from the socket. (Note
719  * that if all data is available, this allows pgfdw_get_result to call
720  * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
721  * would be large compared to the overhead of PQconsumeInput.)
722  */
723  if (consume_input && !PQconsumeInput(conn))
724  pgfdw_report_error(ERROR, NULL, conn, false, sql);
727  pgfdw_report_error(ERROR, res, conn, true, sql);
728  PQclear(res);
729 }
PGresult * pgfdw_get_result(PGconn *conn)
Definition: connection.c:852
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:99

References conn, ERROR, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQconsumeInput(), PQresultStatus(), and res.

Referenced by do_sql_command(), pgfdw_finish_pre_commit_cleanup(), and pgfdw_finish_pre_subcommit_cleanup().

◆ GetConnection()

PGconn* GetConnection ( UserMapping user,
bool  will_prep_stmt,
PgFdwConnState **  state 
)

Definition at line 177 of file connection.c.

178 {
179  bool found;
180  bool retry = false;
181  ConnCacheEntry *entry;
184 
185  /* First time through, initialize connection cache hashtable */
186  if (ConnectionHash == NULL)
187  {
188  HASHCTL ctl;
189 
190  if (pgfdw_we_get_result == 0)
192  WaitEventExtensionNew("PostgresFdwGetResult");
193 
194  ctl.keysize = sizeof(ConnCacheKey);
195  ctl.entrysize = sizeof(ConnCacheEntry);
196  ConnectionHash = hash_create("postgres_fdw connections", 8,
197  &ctl,
199 
200  /*
201  * Register some callback functions that manage connection cleanup.
202  * This should be done just once in each backend.
203  */
206  CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
208  CacheRegisterSyscacheCallback(USERMAPPINGOID,
210  }
211 
212  /* Set flag that we did GetConnection during the current transaction */
213  xact_got_connection = true;
214 
215  /* Create hash key for the entry. Assume no pad bytes in key struct */
216  key = user->umid;
217 
218  /*
219  * Find or create cached entry for requested connection.
220  */
221  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
222  if (!found)
223  {
224  /*
225  * We need only clear "conn" here; remaining fields will be filled
226  * later when "conn" is set.
227  */
228  entry->conn = NULL;
229  }
230 
231  /* Reject further use of connections which failed abort cleanup. */
233 
234  /*
235  * If the connection needs to be remade due to invalidation, disconnect as
236  * soon as we're out of all transactions.
237  */
238  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
239  {
240  elog(DEBUG3, "closing connection %p for option changes to take effect",
241  entry->conn);
242  disconnect_pg_server(entry);
243  }
244 
245  /*
246  * If cache entry doesn't have a connection, we have to establish a new
247  * connection. (If connect_pg_server throws an error, the cache entry
248  * will remain in a valid empty state, ie conn == NULL.)
249  */
250  if (entry->conn == NULL)
251  make_new_connection(entry, user);
252 
253  /*
254  * We check the health of the cached connection here when using it. In
255  * cases where we're out of all transactions, if a broken connection is
256  * detected, we try to reestablish a new connection later.
257  */
258  PG_TRY();
259  {
260  /* Process a pending asynchronous request if any. */
261  if (entry->state.pendingAreq)
263  /* Start a new transaction or subtransaction if needed. */
264  begin_remote_xact(entry);
265  }
266  PG_CATCH();
267  {
269  ErrorData *errdata = CopyErrorData();
270 
271  /*
272  * Determine whether to try to reestablish the connection.
273  *
274  * After a broken connection is detected in libpq, any error other
275  * than connection failure (e.g., out-of-memory) can be thrown
276  * somewhere between return from libpq and the expected ereport() call
277  * in pgfdw_report_error(). In this case, since PQstatus() indicates
278  * CONNECTION_BAD, checking only PQstatus() causes the false detection
279  * of connection failure. To avoid this, we also verify that the
280  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
281  * checking only the sqlstate can cause another false detection
282  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
283  * for any libpq-originated error condition.
284  */
285  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
286  PQstatus(entry->conn) != CONNECTION_BAD ||
287  entry->xact_depth > 0)
288  {
289  MemoryContextSwitchTo(ecxt);
290  PG_RE_THROW();
291  }
292 
293  /* Clean up the error state */
294  FlushErrorState();
295  FreeErrorData(errdata);
296  errdata = NULL;
297 
298  retry = true;
299  }
300  PG_END_TRY();
301 
302  /*
303  * If a broken connection is detected, disconnect it, reestablish a new
304  * connection and retry a new remote transaction. If connection failure is
305  * reported again, we give up getting a connection.
306  */
307  if (retry)
308  {
309  Assert(entry->xact_depth == 0);
310 
311  ereport(DEBUG3,
312  (errmsg_internal("could not start remote transaction on connection %p",
313  entry->conn)),
314  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
315 
316  elog(DEBUG3, "closing connection %p to reestablish a new one",
317  entry->conn);
318  disconnect_pg_server(entry);
319 
320  make_new_connection(entry, user);
321 
322  begin_remote_xact(entry);
323  }
324 
325  /* Remember if caller will prepare statements */
326  entry->have_prep_stmt |= will_prep_stmt;
327 
328  /* If caller needs access to the per-connection state, return it. */
329  if (state)
330  *state = &entry->state;
331 
332  return entry->conn;
333 }
Oid ConnCacheKey
Definition: connection.c:51
static uint32 pgfdw_we_get_result
Definition: connection.c:89
static bool xact_got_connection
Definition: connection.c:84
struct ConnCacheEntry ConnCacheEntry
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:340
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:1077
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1240
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1190
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:927
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:742
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1801
void FlushErrorState(void)
Definition: elog.c:1850
ErrorData * CopyErrorData(void)
Definition: elog.c:1729
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1516
@ CONNECTION_BAD
Definition: libpq-fe.h:61
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
uintptr_t Datum
Definition: postgres.h:64
void process_pending_request(AsyncRequest *areq)
MemoryContextSwitchTo(old_ctx)
tree ctl
Definition: radixtree.h:1853
bool have_prep_stmt
Definition: connection.c:60
PgFdwConnState state
Definition: connection.c:71
int sqlerrcode
Definition: elog.h:438
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:138
Definition: regguts.h:323
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3791
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3851

References Assert, begin_remote_xact(), CacheRegisterSyscacheCallback(), ConnCacheEntry::conn, CONNECTION_BAD, ConnectionHash, CopyErrorData(), ctl, CurrentMemoryContext, DEBUG3, disconnect_pg_server(), elog, ereport, errdetail_internal(), errmsg_internal(), FlushErrorState(), FreeErrorData(), HASH_BLOBS, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, sort-test::key, make_new_connection(), MemoryContextSwitchTo(), pchomp(), PgFdwConnState::pendingAreq, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_subxact_callback(), pgfdw_we_get_result, pgfdw_xact_callback(), PQerrorMessage(), PQstatus(), process_pending_request(), RegisterSubXactCallback(), RegisterXactCallback(), ErrorData::sqlerrcode, ConnCacheEntry::state, user, WaitEventExtensionNew(), ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by create_foreign_modify(), dumpDatabase(), dumpDatabaseConfig(), dumpLOs(), dumpTableData_copy(), estimate_path_cost_size(), expand_extension_name_patterns(), expand_foreign_server_name_patterns(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignScan(), postgresExecForeignTruncate(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

◆ GetCursorNumber()

unsigned int GetCursorNumber ( PGconn conn)

Definition at line 806 of file connection.c.

807 {
808  return ++cursor_number;
809 }
static unsigned int cursor_number
Definition: connection.c:80

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

◆ GetPrepStmtNumber()

unsigned int GetPrepStmtNumber ( PGconn conn)

Definition at line 820 of file connection.c.

821 {
822  return ++prep_stmt_number;
823 }
static unsigned int prep_stmt_number
Definition: connection.c:81

References prep_stmt_number.

Referenced by prepare_foreign_modify().

◆ make_new_connection()

static void make_new_connection ( ConnCacheEntry entry,
UserMapping user 
)
static

Definition at line 340 of file connection.c.

341 {
342  ForeignServer *server = GetForeignServer(user->serverid);
343  ListCell *lc;
344 
345  Assert(entry->conn == NULL);
346 
347  /* Reset all transient state fields, to be sure all are clean */
348  entry->xact_depth = 0;
349  entry->have_prep_stmt = false;
350  entry->have_error = false;
351  entry->changing_xact_state = false;
352  entry->invalidated = false;
353  entry->serverid = server->serverid;
354  entry->server_hashvalue =
355  GetSysCacheHashValue1(FOREIGNSERVEROID,
356  ObjectIdGetDatum(server->serverid));
357  entry->mapping_hashvalue =
358  GetSysCacheHashValue1(USERMAPPINGOID,
359  ObjectIdGetDatum(user->umid));
360  memset(&entry->state, 0, sizeof(entry->state));
361 
362  /*
363  * Determine whether to keep the connection that we're about to make here
364  * open even after the transaction using it ends, so that the subsequent
365  * transactions can re-use it.
366  *
367  * By default, all the connections to any foreign servers are kept open.
368  *
369  * Also determine whether to commit/abort (sub)transactions opened on the
370  * remote server in parallel at (sub)transaction end, which is disabled by
371  * default.
372  *
373  * Note: it's enough to determine these only when making a new connection
374  * because if these settings for it are changed, it will be closed and
375  * re-made later.
376  */
377  entry->keep_connections = true;
378  entry->parallel_commit = false;
379  entry->parallel_abort = false;
380  foreach(lc, server->options)
381  {
382  DefElem *def = (DefElem *) lfirst(lc);
383 
384  if (strcmp(def->defname, "keep_connections") == 0)
385  entry->keep_connections = defGetBoolean(def);
386  else if (strcmp(def->defname, "parallel_commit") == 0)
387  entry->parallel_commit = defGetBoolean(def);
388  else if (strcmp(def->defname, "parallel_abort") == 0)
389  entry->parallel_abort = defGetBoolean(def);
390  }
391 
392  /* Now try to make the connection */
393  entry->conn = connect_pg_server(server, user);
394 
395  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
396  entry->conn, server->servername, user->umid, user->userid);
397 }
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:444
bool defGetBoolean(DefElem *def)
Definition: define.c:107
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:110
#define lfirst(lc)
Definition: pg_list.h:172
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
bool parallel_commit
Definition: connection.c:63
uint32 server_hashvalue
Definition: connection.c:69
uint32 mapping_hashvalue
Definition: connection.c:70
bool keep_connections
Definition: connection.c:66
bool parallel_abort
Definition: connection.c:64
char * defname
Definition: parsenodes.h:815
Oid serverid
Definition: foreign.h:36
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:113

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, defGetBoolean(), DefElem::defname, elog, GetForeignServer(), GetSysCacheHashValue1, ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, ConnCacheEntry::keep_connections, lfirst, ConnCacheEntry::mapping_hashvalue, ObjectIdGetDatum(), ForeignServer::options, ConnCacheEntry::parallel_abort, ConnCacheEntry::parallel_commit, ConnCacheEntry::server_hashvalue, ConnCacheEntry::serverid, ForeignServer::serverid, ForeignServer::servername, ConnCacheEntry::state, user, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ PG_FUNCTION_INFO_V1() [1/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_disconnect  )

◆ PG_FUNCTION_INFO_V1() [2/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_disconnect_all  )

◆ PG_FUNCTION_INFO_V1() [3/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_get_connections  )

◆ pgfdw_abort_cleanup()

static void pgfdw_abort_cleanup ( ConnCacheEntry entry,
bool  toplevel 
)
static

Definition at line 1583 of file connection.c.

1584 {
1585  char sql[100];
1586 
1587  /*
1588  * Don't try to clean up the connection if we're already in error
1589  * recursion trouble.
1590  */
1592  entry->changing_xact_state = true;
1593 
1594  /*
1595  * If connection is already unsalvageable, don't touch it further.
1596  */
1597  if (entry->changing_xact_state)
1598  return;
1599 
1600  /*
1601  * Mark this connection as in the process of changing transaction state.
1602  */
1603  entry->changing_xact_state = true;
1604 
1605  /* Assume we might have lost track of prepared statements */
1606  entry->have_error = true;
1607 
1608  /*
1609  * If a command has been submitted to the remote server by using an
1610  * asynchronous execution function, the command might not have yet
1611  * completed. Check to see if a command is still being processed by the
1612  * remote server, and if so, request cancellation of the command.
1613  */
1614  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1615  !pgfdw_cancel_query(entry->conn))
1616  return; /* Unable to cancel running query */
1617 
1618  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1619  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1620  return; /* Unable to abort remote (sub)transaction */
1621 
1622  if (toplevel)
1623  {
1624  if (entry->have_prep_stmt && entry->have_error &&
1626  "DEALLOCATE ALL",
1627  true))
1628  return; /* Trouble clearing prepared statements */
1629 
1630  entry->have_prep_stmt = false;
1631  entry->have_error = false;
1632  }
1633 
1634  /*
1635  * If pendingAreq of the per-connection state is not NULL, it means that
1636  * an asynchronous fetch begun by fetch_more_data_begin() was not done
1637  * successfully and thus the per-connection state was not reset in
1638  * fetch_more_data(); in that case reset the per-connection state here.
1639  */
1640  if (entry->state.pendingAreq)
1641  memset(&entry->state, 0, sizeof(entry->state));
1642 
1643  /* Disarm changing_xact_state if it all worked */
1644  entry->changing_xact_state = false;
1645 }
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
Definition: connection.c:99
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1400
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1309
bool in_error_recursion_trouble(void)
Definition: elog.c:293
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:7109
@ PQTRANS_ACTIVE
Definition: libpq-fe.h:122

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONSTRUCT_ABORT_COMMAND, ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, in_error_recursion_trouble(), PgFdwConnState::pendingAreq, pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), PQTRANS_ACTIVE, PQtransactionStatus(), and ConnCacheEntry::state.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_abort_cleanup_begin()

static bool pgfdw_abort_cleanup_begin ( ConnCacheEntry entry,
bool  toplevel,
List **  pending_entries,
List **  cancel_requested 
)
static

Definition at line 1657 of file connection.c.

1659 {
1660  /*
1661  * Don't try to clean up the connection if we're already in error
1662  * recursion trouble.
1663  */
1665  entry->changing_xact_state = true;
1666 
1667  /*
1668  * If connection is already unsalvageable, don't touch it further.
1669  */
1670  if (entry->changing_xact_state)
1671  return false;
1672 
1673  /*
1674  * Mark this connection as in the process of changing transaction state.
1675  */
1676  entry->changing_xact_state = true;
1677 
1678  /* Assume we might have lost track of prepared statements */
1679  entry->have_error = true;
1680 
1681  /*
1682  * If a command has been submitted to the remote server by using an
1683  * asynchronous execution function, the command might not have yet
1684  * completed. Check to see if a command is still being processed by the
1685  * remote server, and if so, request cancellation of the command.
1686  */
1687  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1688  {
1689  TimestampTz endtime;
1690 
1693  if (!pgfdw_cancel_query_begin(entry->conn, endtime))
1694  return false; /* Unable to cancel running query */
1695  *cancel_requested = lappend(*cancel_requested, entry);
1696  }
1697  else
1698  {
1699  char sql[100];
1700 
1701  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1702  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1703  return false; /* Unable to abort remote transaction */
1704  *pending_entries = lappend(*pending_entries, entry);
1705  }
1706 
1707  return true;
1708 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1655
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
Definition: connection.c:1335
static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
Definition: connection.c:1420
#define CONNECTION_CLEANUP_TIMEOUT
Definition: connection.c:96
int64 TimestampTz
Definition: timestamp.h:39
List * lappend(List *list, void *datum)
Definition: list.c:339
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_CLEANUP_TIMEOUT, CONSTRUCT_ABORT_COMMAND, GetCurrentTimestamp(), ConnCacheEntry::have_error, in_error_recursion_trouble(), lappend(), pgfdw_cancel_query_begin(), pgfdw_exec_cleanup_query_begin(), PQTRANS_ACTIVE, PQtransactionStatus(), and TimestampTzPlusMilliseconds.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_cancel_query()

static bool pgfdw_cancel_query ( PGconn conn)
static

Definition at line 1309 of file connection.c.

1310 {
1311  TimestampTz endtime;
1312 
1313  /*
1314  * If it takes too long to cancel the query and discard the result, assume
1315  * the connection is dead.
1316  */
1319 
1320  if (!pgfdw_cancel_query_begin(conn, endtime))
1321  return false;
1322  return pgfdw_cancel_query_end(conn, endtime, false);
1323 }
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
Definition: connection.c:1348

References conn, CONNECTION_CLEANUP_TIMEOUT, GetCurrentTimestamp(), pgfdw_cancel_query_begin(), pgfdw_cancel_query_end(), and TimestampTzPlusMilliseconds.

Referenced by pgfdw_abort_cleanup().

◆ pgfdw_cancel_query_begin()

static bool pgfdw_cancel_query_begin ( PGconn conn,
TimestampTz  endtime 
)
static

Definition at line 1335 of file connection.c.

1336 {
1337  const char *errormsg = libpqsrv_cancel(conn, endtime);
1338 
1339  if (errormsg != NULL)
1340  ereport(WARNING,
1341  errcode(ERRCODE_CONNECTION_FAILURE),
1342  errmsg("could not send cancel request: %s", errormsg));
1343 
1344  return errormsg == NULL;
1345 }
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)

References conn, ereport, errcode(), errmsg(), libpqsrv_cancel(), and WARNING.

Referenced by pgfdw_abort_cleanup_begin(), and pgfdw_cancel_query().

◆ pgfdw_cancel_query_end()

static bool pgfdw_cancel_query_end ( PGconn conn,
TimestampTz  endtime,
bool  consume_input 
)
static

Definition at line 1348 of file connection.c.

1349 {
1350  PGresult *result = NULL;
1351  bool timed_out;
1352 
1353  /*
1354  * If requested, consume whatever data is available from the socket. (Note
1355  * that if all data is available, this allows pgfdw_get_cleanup_result to
1356  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1357  * which would be large compared to the overhead of PQconsumeInput.)
1358  */
1359  if (consume_input && !PQconsumeInput(conn))
1360  {
1361  ereport(WARNING,
1362  (errcode(ERRCODE_CONNECTION_FAILURE),
1363  errmsg("could not get result of cancel request: %s",
1364  pchomp(PQerrorMessage(conn)))));
1365  return false;
1366  }
1367 
1368  /* Get and discard the result of the query. */
1369  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1370  {
1371  if (timed_out)
1372  ereport(WARNING,
1373  (errmsg("could not get result of cancel request due to timeout")));
1374  else
1375  ereport(WARNING,
1376  (errcode(ERRCODE_CONNECTION_FAILURE),
1377  errmsg("could not get result of cancel request: %s",
1378  pchomp(PQerrorMessage(conn)))));
1379 
1380  return false;
1381  }
1382  PQclear(result);
1383 
1384  return true;
1385 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
Definition: connection.c:1495

References conn, ereport, errcode(), errmsg(), pchomp(), pgfdw_get_cleanup_result(), PQclear(), PQconsumeInput(), PQerrorMessage(), and WARNING.

Referenced by pgfdw_cancel_query(), and pgfdw_finish_abort_cleanup().

◆ pgfdw_exec_cleanup_query()

static bool pgfdw_exec_cleanup_query ( PGconn conn,
const char *  query,
bool  ignore_errors 
)
static

Definition at line 1400 of file connection.c.

1401 {
1402  TimestampTz endtime;
1403 
1404  /*
1405  * If it takes too long to execute a cleanup query, assume the connection
1406  * is dead. It's fairly likely that this is why we aborted in the first
1407  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1408  * be too long.
1409  */
1412 
1413  if (!pgfdw_exec_cleanup_query_begin(conn, query))
1414  return false;
1415  return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1416  false, ignore_errors);
1417 }
static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
Definition: connection.c:1438

References conn, CONNECTION_CLEANUP_TIMEOUT, GetCurrentTimestamp(), pgfdw_exec_cleanup_query_begin(), pgfdw_exec_cleanup_query_end(), and TimestampTzPlusMilliseconds.

Referenced by pgfdw_abort_cleanup().

◆ pgfdw_exec_cleanup_query_begin()

static bool pgfdw_exec_cleanup_query_begin ( PGconn conn,
const char *  query 
)
static

Definition at line 1420 of file connection.c.

1421 {
1422  Assert(query != NULL);
1423 
1424  /*
1425  * Submit a query. Since we don't use non-blocking mode, this also can
1426  * block. But its risk is relatively small, so we ignore that for now.
1427  */
1428  if (!PQsendQuery(conn, query))
1429  {
1430  pgfdw_report_error(WARNING, NULL, conn, false, query);
1431  return false;
1432  }
1433 
1434  return true;
1435 }

References Assert, conn, pgfdw_report_error(), PQsendQuery(), and WARNING.

Referenced by pgfdw_abort_cleanup_begin(), pgfdw_exec_cleanup_query(), and pgfdw_finish_abort_cleanup().

◆ pgfdw_exec_cleanup_query_end()

static bool pgfdw_exec_cleanup_query_end ( PGconn conn,
const char *  query,
TimestampTz  endtime,
bool  consume_input,
bool  ignore_errors 
)
static

Definition at line 1438 of file connection.c.

1441 {
1442  PGresult *result = NULL;
1443  bool timed_out;
1444 
1445  Assert(query != NULL);
1446 
1447  /*
1448  * If requested, consume whatever data is available from the socket. (Note
1449  * that if all data is available, this allows pgfdw_get_cleanup_result to
1450  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1451  * which would be large compared to the overhead of PQconsumeInput.)
1452  */
1453  if (consume_input && !PQconsumeInput(conn))
1454  {
1455  pgfdw_report_error(WARNING, NULL, conn, false, query);
1456  return false;
1457  }
1458 
1459  /* Get the result of the query. */
1460  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1461  {
1462  if (timed_out)
1463  ereport(WARNING,
1464  (errmsg("could not get query result due to timeout"),
1465  errcontext("remote SQL command: %s", query)));
1466  else
1467  pgfdw_report_error(WARNING, NULL, conn, false, query);
1468 
1469  return false;
1470  }
1471 
1472  /* Issue a warning if not successful. */
1473  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1474  {
1475  pgfdw_report_error(WARNING, result, conn, true, query);
1476  return ignore_errors;
1477  }
1478  PQclear(result);
1479 
1480  return true;
1481 }
#define errcontext
Definition: elog.h:196

References Assert, conn, ereport, errcontext, errmsg(), pgfdw_get_cleanup_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQconsumeInput(), PQresultStatus(), and WARNING.

Referenced by pgfdw_exec_cleanup_query(), and pgfdw_finish_abort_cleanup().

◆ pgfdw_exec_query()

PGresult* pgfdw_exec_query ( PGconn conn,
const char *  query,
PgFdwConnState state 
)

Definition at line 835 of file connection.c.

836 {
837  /* First, process a pending asynchronous request, if any. */
838  if (state && state->pendingAreq)
839  process_pending_request(state->pendingAreq);
840 
841  if (!PQsendQuery(conn, query))
842  return NULL;
843  return pgfdw_get_result(conn);
844 }

References conn, pgfdw_get_result(), PQsendQuery(), and process_pending_request().

Referenced by close_cursor(), deallocate_query(), fetch_more_data(), get_remote_estimate(), pgfdw_xact_callback(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), and postgresReScanForeignScan().

◆ pgfdw_finish_abort_cleanup()

static void pgfdw_finish_abort_cleanup ( List pending_entries,
List cancel_requested,
bool  toplevel 
)
static

Definition at line 1823 of file connection.c.

1825 {
1826  List *pending_deallocs = NIL;
1827  ListCell *lc;
1828 
1829  /*
1830  * For each of the pending cancel requests (if any), get and discard the
1831  * result of the query, and submit an abort command to the remote server.
1832  */
1833  if (cancel_requested)
1834  {
1835  foreach(lc, cancel_requested)
1836  {
1837  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1838  TimestampTz endtime;
1839  char sql[100];
1840 
1841  Assert(entry->changing_xact_state);
1842 
1843  /*
1844  * Set end time. You might think we should do this before issuing
1845  * cancel request like in normal mode, but that is problematic,
1846  * because if, for example, it took longer than 30 seconds to
1847  * process the first few entries in the cancel_requested list, it
1848  * would cause a timeout error when processing each of the
1849  * remaining entries in the list, leading to slamming that entry's
1850  * connection shut.
1851  */
1854 
1855  if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
1856  {
1857  /* Unable to cancel running query */
1858  pgfdw_reset_xact_state(entry, toplevel);
1859  continue;
1860  }
1861 
1862  /* Send an abort command in parallel if needed */
1863  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1864  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1865  {
1866  /* Unable to abort remote (sub)transaction */
1867  pgfdw_reset_xact_state(entry, toplevel);
1868  }
1869  else
1870  pending_entries = lappend(pending_entries, entry);
1871  }
1872  }
1873 
1874  /* No further work if no pending entries */
1875  if (!pending_entries)
1876  return;
1877 
1878  /*
1879  * Get the result of the abort command for each of the pending entries
1880  */
1881  foreach(lc, pending_entries)
1882  {
1883  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1884  TimestampTz endtime;
1885  char sql[100];
1886 
1887  Assert(entry->changing_xact_state);
1888 
1889  /*
1890  * Set end time. We do this now, not before issuing the command like
1891  * in normal mode, for the same reason as for the cancel_requested
1892  * entries.
1893  */
1896 
1897  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1898  if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
1899  true, false))
1900  {
1901  /* Unable to abort remote (sub)transaction */
1902  pgfdw_reset_xact_state(entry, toplevel);
1903  continue;
1904  }
1905 
1906  if (toplevel)
1907  {
1908  /* Do a DEALLOCATE ALL in parallel if needed */
1909  if (entry->have_prep_stmt && entry->have_error)
1910  {
1912  "DEALLOCATE ALL"))
1913  {
1914  /* Trouble clearing prepared statements */
1915  pgfdw_reset_xact_state(entry, toplevel);
1916  }
1917  else
1918  pending_deallocs = lappend(pending_deallocs, entry);
1919  continue;
1920  }
1921  entry->have_prep_stmt = false;
1922  entry->have_error = false;
1923  }
1924 
1925  /* Reset the per-connection state if needed */
1926  if (entry->state.pendingAreq)
1927  memset(&entry->state, 0, sizeof(entry->state));
1928 
1929  /* We're done with this entry; unset the changing_xact_state flag */
1930  entry->changing_xact_state = false;
1931  pgfdw_reset_xact_state(entry, toplevel);
1932  }
1933 
1934  /* No further work if no pending entries */
1935  if (!pending_deallocs)
1936  return;
1937  Assert(toplevel);
1938 
1939  /*
1940  * Get the result of the DEALLOCATE command for each of the pending
1941  * entries
1942  */
1943  foreach(lc, pending_deallocs)
1944  {
1945  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1946  TimestampTz endtime;
1947 
1948  Assert(entry->changing_xact_state);
1949  Assert(entry->have_prep_stmt);
1950  Assert(entry->have_error);
1951 
1952  /*
1953  * Set end time. We do this now, not before issuing the command like
1954  * in normal mode, for the same reason as for the cancel_requested
1955  * entries.
1956  */
1959 
1960  if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
1961  endtime, true, true))
1962  {
1963  /* Trouble clearing prepared statements */
1964  pgfdw_reset_xact_state(entry, toplevel);
1965  continue;
1966  }
1967  entry->have_prep_stmt = false;
1968  entry->have_error = false;
1969 
1970  /* Reset the per-connection state if needed */
1971  if (entry->state.pendingAreq)
1972  memset(&entry->state, 0, sizeof(entry->state));
1973 
1974  /* We're done with this entry; unset the changing_xact_state flag */
1975  entry->changing_xact_state = false;
1976  pgfdw_reset_xact_state(entry, toplevel);
1977  }
1978 }
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1264
#define NIL
Definition: pg_list.h:68
Definition: pg_list.h:54

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_CLEANUP_TIMEOUT, CONSTRUCT_ABORT_COMMAND, GetCurrentTimestamp(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, lappend(), lfirst, NIL, PgFdwConnState::pendingAreq, pgfdw_cancel_query_end(), pgfdw_exec_cleanup_query_begin(), pgfdw_exec_cleanup_query_end(), pgfdw_reset_xact_state(), ConnCacheEntry::state, and TimestampTzPlusMilliseconds.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_finish_pre_commit_cleanup()

static void pgfdw_finish_pre_commit_cleanup ( List pending_entries)
static

Definition at line 1715 of file connection.c.

1716 {
1717  ConnCacheEntry *entry;
1718  List *pending_deallocs = NIL;
1719  ListCell *lc;
1720 
1721  Assert(pending_entries);
1722 
1723  /*
1724  * Get the result of the COMMIT command for each of the pending entries
1725  */
1726  foreach(lc, pending_entries)
1727  {
1728  entry = (ConnCacheEntry *) lfirst(lc);
1729 
1730  Assert(entry->changing_xact_state);
1731 
1732  /*
1733  * We might already have received the result on the socket, so pass
1734  * consume_input=true to try to consume it first
1735  */
1736  do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1737  entry->changing_xact_state = false;
1738 
1739  /* Do a DEALLOCATE ALL in parallel if needed */
1740  if (entry->have_prep_stmt && entry->have_error)
1741  {
1742  /* Ignore errors (see notes in pgfdw_xact_callback) */
1743  if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1744  {
1745  pending_deallocs = lappend(pending_deallocs, entry);
1746  continue;
1747  }
1748  }
1749  entry->have_prep_stmt = false;
1750  entry->have_error = false;
1751 
1752  pgfdw_reset_xact_state(entry, true);
1753  }
1754 
1755  /* No further work if no pending entries */
1756  if (!pending_deallocs)
1757  return;
1758 
1759  /*
1760  * Get the result of the DEALLOCATE command for each of the pending
1761  * entries
1762  */
1763  foreach(lc, pending_deallocs)
1764  {
1765  PGresult *res;
1766 
1767  entry = (ConnCacheEntry *) lfirst(lc);
1768 
1769  /* Ignore errors (see notes in pgfdw_xact_callback) */
1770  while ((res = PQgetResult(entry->conn)) != NULL)
1771  {
1772  PQclear(res);
1773  /* Stop if the connection is lost (else we'll loop infinitely) */
1774  if (PQstatus(entry->conn) == CONNECTION_BAD)
1775  break;
1776  }
1777  entry->have_prep_stmt = false;
1778  entry->have_error = false;
1779 
1780  pgfdw_reset_xact_state(entry, true);
1781  }
1782 }
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_BAD, do_sql_command_end(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, lappend(), lfirst, NIL, pgfdw_reset_xact_state(), PQclear(), PQgetResult(), PQsendQuery(), PQstatus(), and res.

Referenced by pgfdw_xact_callback().

◆ pgfdw_finish_pre_subcommit_cleanup()

static void pgfdw_finish_pre_subcommit_cleanup ( List pending_entries,
int  curlevel 
)
static

Definition at line 1789 of file connection.c.

1790 {
1791  ConnCacheEntry *entry;
1792  char sql[100];
1793  ListCell *lc;
1794 
1795  Assert(pending_entries);
1796 
1797  /*
1798  * Get the result of the RELEASE command for each of the pending entries
1799  */
1800  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1801  foreach(lc, pending_entries)
1802  {
1803  entry = (ConnCacheEntry *) lfirst(lc);
1804 
1805  Assert(entry->changing_xact_state);
1806 
1807  /*
1808  * We might already have received the result on the socket, so pass
1809  * consume_input=true to try to consume it first
1810  */
1811  do_sql_command_end(entry->conn, sql, true);
1812  entry->changing_xact_state = false;
1813 
1814  pgfdw_reset_xact_state(entry, false);
1815  }
1816 }

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, do_sql_command_end(), lfirst, pgfdw_reset_xact_state(), and snprintf.

Referenced by pgfdw_subxact_callback().

◆ pgfdw_get_cleanup_result()

static bool pgfdw_get_cleanup_result ( PGconn conn,
TimestampTz  endtime,
PGresult **  result,
bool timed_out 
)
static

Definition at line 1495 of file connection.c.

1497 {
1498  volatile bool failed = false;
1499  PGresult *volatile last_res = NULL;
1500 
1501  *timed_out = false;
1502 
1503  /* In what follows, do not leak any PGresults on an error. */
1504  PG_TRY();
1505  {
1506  for (;;)
1507  {
1508  PGresult *res;
1509 
1510  while (PQisBusy(conn))
1511  {
1512  int wc;
1514  long cur_timeout;
1515 
1516  /* If timeout has expired, give up, else get sleep time. */
1517  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1518  if (cur_timeout <= 0)
1519  {
1520  *timed_out = true;
1521  failed = true;
1522  goto exit;
1523  }
1524 
1525  /* first time, allocate or get the custom wait event */
1526  if (pgfdw_we_cleanup_result == 0)
1527  pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1528 
1529  /* Sleep until there's something to do */
1533  PQsocket(conn),
1534  cur_timeout, pgfdw_we_cleanup_result);
1536 
1538 
1539  /* Data available in socket? */
1540  if (wc & WL_SOCKET_READABLE)
1541  {
1542  if (!PQconsumeInput(conn))
1543  {
1544  /* connection trouble */
1545  failed = true;
1546  goto exit;
1547  }
1548  }
1549  }
1550 
1551  res = PQgetResult(conn);
1552  if (res == NULL)
1553  break; /* query is complete */
1554 
1555  PQclear(last_res);
1556  last_res = res;
1557  }
1558 exit: ;
1559  }
1560  PG_CATCH();
1561  {
1562  PQclear(last_res);
1563  PG_RE_THROW();
1564  }
1565  PG_END_TRY();
1566 
1567  if (failed)
1568  PQclear(last_res);
1569  else
1570  *result = last_res;
1571  return failed;
1572 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1767
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1619
static uint32 pgfdw_we_cleanup_result
Definition: connection.c:87
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7180
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
struct Latch * MyLatch
Definition: globals.c:61
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
exit(1)
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122

References CHECK_FOR_INTERRUPTS, conn, exit(), GetCurrentTimestamp(), MyLatch, now(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_we_cleanup_result, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), res, ResetLatch(), TimestampDifferenceMilliseconds(), WaitEventExtensionNew(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by pgfdw_cancel_query_end(), and pgfdw_exec_cleanup_query_end().

◆ pgfdw_get_result()

PGresult* pgfdw_get_result ( PGconn conn)

Definition at line 852 of file connection.c.

853 {
855 }
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)

References conn, libpqsrv_get_result_last(), and pgfdw_we_get_result.

Referenced by create_cursor(), do_sql_command_end(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), pgfdw_exec_query(), and prepare_foreign_modify().

◆ pgfdw_inval_callback()

static void pgfdw_inval_callback ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1190 of file connection.c.

1191 {
1192  HASH_SEQ_STATUS scan;
1193  ConnCacheEntry *entry;
1194 
1195  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1196 
1197  /* ConnectionHash must exist already, if we're registered */
1198  hash_seq_init(&scan, ConnectionHash);
1199  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1200  {
1201  /* Ignore invalid entries */
1202  if (entry->conn == NULL)
1203  continue;
1204 
1205  /* hashvalue == 0 means a cache reset, must clear all state */
1206  if (hashvalue == 0 ||
1207  (cacheid == FOREIGNSERVEROID &&
1208  entry->server_hashvalue == hashvalue) ||
1209  (cacheid == USERMAPPINGOID &&
1210  entry->mapping_hashvalue == hashvalue))
1211  {
1212  /*
1213  * Close the connection immediately if it's not used yet in this
1214  * transaction. Otherwise mark it as invalid so that
1215  * pgfdw_xact_callback() can close it at the end of this
1216  * transaction.
1217  */
1218  if (entry->xact_depth == 0)
1219  {
1220  elog(DEBUG3, "discarding connection %p", entry->conn);
1221  disconnect_pg_server(entry);
1222  }
1223  else
1224  entry->invalidated = true;
1225  }
1226  }
1227 }

References Assert, ConnCacheEntry::conn, ConnectionHash, DEBUG3, disconnect_pg_server(), elog, hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, ConnCacheEntry::mapping_hashvalue, ConnCacheEntry::server_hashvalue, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ pgfdw_reject_incomplete_xact_state_change()

static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry entry)
static

Definition at line 1240 of file connection.c.

1241 {
1242  ForeignServer *server;
1243 
1244  /* nothing to do for inactive entries and entries of sane state */
1245  if (entry->conn == NULL || !entry->changing_xact_state)
1246  return;
1247 
1248  /* make sure this entry is inactive */
1249  disconnect_pg_server(entry);
1250 
1251  /* find server name to be shown in the message below */
1252  server = GetForeignServer(entry->serverid);
1253 
1254  ereport(ERROR,
1255  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1256  errmsg("connection to server \"%s\" was lost",
1257  server->servername)));
1258 }

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, disconnect_pg_server(), ereport, errcode(), errmsg(), ERROR, GetForeignServer(), ConnCacheEntry::serverid, and ForeignServer::servername.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_report_error()

void pgfdw_report_error ( int  elevel,
PGresult res,
PGconn conn,
bool  clear,
const char *  sql 
)

Definition at line 871 of file connection.c.

873 {
874  /* If requested, PGresult must be released before leaving this function. */
875  PG_TRY();
876  {
877  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
878  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
879  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
880  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
881  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
882  int sqlstate;
883 
884  if (diag_sqlstate)
885  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
886  diag_sqlstate[1],
887  diag_sqlstate[2],
888  diag_sqlstate[3],
889  diag_sqlstate[4]);
890  else
891  sqlstate = ERRCODE_CONNECTION_FAILURE;
892 
893  /*
894  * If we don't get a message from the PGresult, try the PGconn. This
895  * is needed because for connection-level failures, PQgetResult may
896  * just return NULL, not a PGresult at all.
897  */
898  if (message_primary == NULL)
899  message_primary = pchomp(PQerrorMessage(conn));
900 
901  ereport(elevel,
902  (errcode(sqlstate),
903  (message_primary != NULL && message_primary[0] != '\0') ?
904  errmsg_internal("%s", message_primary) :
905  errmsg("could not obtain message string for remote error"),
906  message_detail ? errdetail_internal("%s", message_detail) : 0,
907  message_hint ? errhint("%s", message_hint) : 0,
908  message_context ? errcontext("%s", message_context) : 0,
909  sql ? errcontext("remote SQL command: %s", sql) : 0));
910  }
911  PG_FINALLY();
912  {
913  if (clear)
914  PQclear(res);
915  }
916  PG_END_TRY();
917 }
int errhint(const char *fmt,...)
Definition: elog.c:1317
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define PG_FINALLY(...)
Definition: elog.h:387
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:59
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:57
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:58
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:63

References conn, ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, pchomp(), PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_FINALLY, PG_TRY, PQclear(), PQerrorMessage(), PQresultErrorField(), and res.

Referenced by close_cursor(), create_cursor(), deallocate_query(), do_sql_command_begin(), do_sql_command_end(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), fetch_more_data_begin(), get_remote_estimate(), pgfdw_exec_cleanup_query_begin(), pgfdw_exec_cleanup_query_end(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresForeignAsyncNotify(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), postgresReScanForeignScan(), and prepare_foreign_modify().

◆ pgfdw_reset_xact_state()

static void pgfdw_reset_xact_state ( ConnCacheEntry entry,
bool  toplevel 
)
static

Definition at line 1264 of file connection.c.

1265 {
1266  if (toplevel)
1267  {
1268  /* Reset state to show we're out of a transaction */
1269  entry->xact_depth = 0;
1270 
1271  /*
1272  * If the connection isn't in a good idle state, it is marked as
1273  * invalid or keep_connections option of its server is disabled, then
1274  * discard it to recover. Next GetConnection will open a new
1275  * connection.
1276  */
1277  if (PQstatus(entry->conn) != CONNECTION_OK ||
1278  PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1279  entry->changing_xact_state ||
1280  entry->invalidated ||
1281  !entry->keep_connections)
1282  {
1283  elog(DEBUG3, "discarding connection %p", entry->conn);
1284  disconnect_pg_server(entry);
1285  }
1286  }
1287  else
1288  {
1289  /* Reset state to show we're out of a subtransaction */
1290  entry->xact_depth--;
1291  }
1292 }
@ PQTRANS_IDLE
Definition: libpq-fe.h:121

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_OK, DEBUG3, disconnect_pg_server(), elog, ConnCacheEntry::invalidated, ConnCacheEntry::keep_connections, PQstatus(), PQTRANS_IDLE, PQtransactionStatus(), and ConnCacheEntry::xact_depth.

Referenced by pgfdw_finish_abort_cleanup(), pgfdw_finish_pre_commit_cleanup(), pgfdw_finish_pre_subcommit_cleanup(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_security_check()

static void pgfdw_security_check ( const char **  keywords,
const char **  values,
UserMapping user,
PGconn conn 
)
static

Definition at line 406 of file connection.c.

407 {
408  /* Superusers bypass the check */
409  if (superuser_arg(user->userid))
410  return;
411 
412 #ifdef ENABLE_GSS
413  /* Connected via GSSAPI with delegated credentials- all good. */
415  return;
416 #endif
417 
418  /* Ok if superuser set PW required false. */
420  return;
421 
422  /* Connected via PW, with PW required true, and provided non-empty PW. */
424  {
425  /* ok if params contain a non-empty password */
426  for (int i = 0; keywords[i] != NULL; i++)
427  {
428  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
429  return;
430  }
431  }
432 
433  ereport(ERROR,
434  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
435  errmsg("password or GSSAPI delegated credentials required"),
436  errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
437  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
438 }
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7220
int PQconnectionUsedGSSAPI(const PGconn *conn)
Definition: fe-connect.c:7231

References be_gssapi_get_delegation(), conn, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, i, MyProcPort, PQconnectionUsedGSSAPI(), PQconnectionUsedPassword(), superuser_arg(), user, UserMappingPasswordRequired(), and values.

Referenced by connect_pg_server().

◆ pgfdw_subxact_callback()

static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
)
static

Definition at line 1077 of file connection.c.

1079 {
1080  HASH_SEQ_STATUS scan;
1081  ConnCacheEntry *entry;
1082  int curlevel;
1083  List *pending_entries = NIL;
1084  List *cancel_requested = NIL;
1085 
1086  /* Nothing to do at subxact start, nor after commit. */
1087  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1088  event == SUBXACT_EVENT_ABORT_SUB))
1089  return;
1090 
1091  /* Quick exit if no connections were touched in this transaction. */
1092  if (!xact_got_connection)
1093  return;
1094 
1095  /*
1096  * Scan all connection cache entries to find open remote subtransactions
1097  * of the current level, and close them.
1098  */
1099  curlevel = GetCurrentTransactionNestLevel();
1100  hash_seq_init(&scan, ConnectionHash);
1101  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1102  {
1103  char sql[100];
1104 
1105  /*
1106  * We only care about connections with open remote subtransactions of
1107  * the current level.
1108  */
1109  if (entry->conn == NULL || entry->xact_depth < curlevel)
1110  continue;
1111 
1112  if (entry->xact_depth > curlevel)
1113  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1114  entry->xact_depth);
1115 
1116  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1117  {
1118  /*
1119  * If abort cleanup previously failed for this connection, we
1120  * can't issue any more commands against it.
1121  */
1123 
1124  /* Commit all remote subtransactions during pre-commit */
1125  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1126  entry->changing_xact_state = true;
1127  if (entry->parallel_commit)
1128  {
1129  do_sql_command_begin(entry->conn, sql);
1130  pending_entries = lappend(pending_entries, entry);
1131  continue;
1132  }
1133  do_sql_command(entry->conn, sql);
1134  entry->changing_xact_state = false;
1135  }
1136  else
1137  {
1138  /* Rollback all remote subtransactions during abort */
1139  if (entry->parallel_abort)
1140  {
1141  if (pgfdw_abort_cleanup_begin(entry, false,
1142  &pending_entries,
1143  &cancel_requested))
1144  continue;
1145  }
1146  else
1147  pgfdw_abort_cleanup(entry, false);
1148  }
1149 
1150  /* OK, we're outta that level of subtransaction */
1151  pgfdw_reset_xact_state(entry, false);
1152  }
1153 
1154  /* If there are any pending connections, finish cleaning them up */
1155  if (pending_entries || cancel_requested)
1156  {
1157  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1158  {
1159  Assert(cancel_requested == NIL);
1160  pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1161  }
1162  else
1163  {
1164  Assert(event == SUBXACT_EVENT_ABORT_SUB);
1165  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1166  false);
1167  }
1168  }
1169 }
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
Definition: connection.c:1789
static void pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, bool toplevel)
Definition: connection.c:1823
static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
Definition: connection.c:1657
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1583
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition: xact.h:145
@ SUBXACT_EVENT_ABORT_SUB
Definition: xact.h:144

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, ConnectionHash, do_sql_command(), do_sql_command_begin(), elog, ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), lappend(), NIL, ConnCacheEntry::parallel_abort, ConnCacheEntry::parallel_commit, pgfdw_abort_cleanup(), pgfdw_abort_cleanup_begin(), pgfdw_finish_abort_cleanup(), pgfdw_finish_pre_subcommit_cleanup(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_reset_xact_state(), snprintf, SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

◆ pgfdw_xact_callback()

static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
)
static

Definition at line 927 of file connection.c.

928 {
929  HASH_SEQ_STATUS scan;
930  ConnCacheEntry *entry;
931  List *pending_entries = NIL;
932  List *cancel_requested = NIL;
933 
934  /* Quick exit if no connections were touched in this transaction. */
935  if (!xact_got_connection)
936  return;
937 
938  /*
939  * Scan all connection cache entries to find open remote transactions, and
940  * close them.
941  */
943  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
944  {
945  PGresult *res;
946 
947  /* Ignore cache entry if no open connection right now */
948  if (entry->conn == NULL)
949  continue;
950 
951  /* If it has an open remote transaction, try to close it */
952  if (entry->xact_depth > 0)
953  {
954  elog(DEBUG3, "closing remote transaction on connection %p",
955  entry->conn);
956 
957  switch (event)
958  {
961 
962  /*
963  * If abort cleanup previously failed for this connection,
964  * we can't issue any more commands against it.
965  */
967 
968  /* Commit all remote transactions during pre-commit */
969  entry->changing_xact_state = true;
970  if (entry->parallel_commit)
971  {
972  do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
973  pending_entries = lappend(pending_entries, entry);
974  continue;
975  }
976  do_sql_command(entry->conn, "COMMIT TRANSACTION");
977  entry->changing_xact_state = false;
978 
979  /*
980  * If there were any errors in subtransactions, and we
981  * made prepared statements, do a DEALLOCATE ALL to make
982  * sure we get rid of all prepared statements. This is
983  * annoying and not terribly bulletproof, but it's
984  * probably not worth trying harder.
985  *
986  * DEALLOCATE ALL only exists in 8.3 and later, so this
987  * constrains how old a server postgres_fdw can
988  * communicate with. We intentionally ignore errors in
989  * the DEALLOCATE, so that we can hobble along to some
990  * extent with older servers (leaking prepared statements
991  * as we go; but we don't really support update operations
992  * pre-8.3 anyway).
993  */
994  if (entry->have_prep_stmt && entry->have_error)
995  {
996  res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
997  NULL);
998  PQclear(res);
999  }
1000  entry->have_prep_stmt = false;
1001  entry->have_error = false;
1002  break;
1004 
1005  /*
1006  * We disallow any remote transactions, since it's not
1007  * very reasonable to hold them open until the prepared
1008  * transaction is committed. For the moment, throw error
1009  * unconditionally; later we might allow read-only cases.
1010  * Note that the error will cause us to come right back
1011  * here with event == XACT_EVENT_ABORT, so we'll clean up
1012  * the connection state at that point.
1013  */
1014  ereport(ERROR,
1015  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1016  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1017  break;
1019  case XACT_EVENT_COMMIT:
1020  case XACT_EVENT_PREPARE:
1021  /* Pre-commit should have closed the open transaction */
1022  elog(ERROR, "missed cleaning up connection during pre-commit");
1023  break;
1025  case XACT_EVENT_ABORT:
1026  /* Rollback all remote transactions during abort */
1027  if (entry->parallel_abort)
1028  {
1029  if (pgfdw_abort_cleanup_begin(entry, true,
1030  &pending_entries,
1031  &cancel_requested))
1032  continue;
1033  }
1034  else
1035  pgfdw_abort_cleanup(entry, true);
1036  break;
1037  }
1038  }
1039 
1040  /* Reset state to show we're out of a transaction */
1041  pgfdw_reset_xact_state(entry, true);
1042  }
1043 
1044  /* If there are any pending connections, finish cleaning them up */
1045  if (pending_entries || cancel_requested)
1046  {
1047  if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1048  event == XACT_EVENT_PRE_COMMIT)
1049  {
1050  Assert(cancel_requested == NIL);
1051  pgfdw_finish_pre_commit_cleanup(pending_entries);
1052  }
1053  else
1054  {
1055  Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1056  event == XACT_EVENT_ABORT);
1057  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1058  true);
1059  }
1060  }
1061 
1062  /*
1063  * Regardless of the event type, we can now mark ourselves as out of the
1064  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1065  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1066  */
1067  xact_got_connection = false;
1068 
1069  /* Also reset cursor numbering for next transaction */
1070  cursor_number = 0;
1071 }
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
Definition: connection.c:835
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Definition: connection.c:1715
@ XACT_EVENT_PRE_PREPARE
Definition: xact.h:135
@ XACT_EVENT_COMMIT
Definition: xact.h:128
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition: xact.h:134
@ XACT_EVENT_PARALLEL_COMMIT
Definition: xact.h:129
@ XACT_EVENT_ABORT
Definition: xact.h:130
@ XACT_EVENT_PRE_COMMIT
Definition: xact.h:133
@ XACT_EVENT_PARALLEL_ABORT
Definition: xact.h:131
@ XACT_EVENT_PREPARE
Definition: xact.h:132

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, ConnectionHash, cursor_number, DEBUG3, do_sql_command(), do_sql_command_begin(), elog, ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, lappend(), NIL, ConnCacheEntry::parallel_abort, ConnCacheEntry::parallel_commit, pgfdw_abort_cleanup(), pgfdw_abort_cleanup_begin(), pgfdw_exec_query(), pgfdw_finish_abort_cleanup(), pgfdw_finish_pre_commit_cleanup(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_reset_xact_state(), PQclear(), res, ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

◆ postgres_fdw_disconnect()

Datum postgres_fdw_disconnect ( PG_FUNCTION_ARGS  )

Definition at line 2083 of file connection.c.

2084 {
2085  ForeignServer *server;
2086  char *servername;
2087 
2088  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2089  server = GetForeignServerByName(servername, false);
2090 
2092 }
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:2132
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:181
char * text_to_cstring(const text *t)
Definition: varlena.c:217

References disconnect_cached_connections(), GetForeignServerByName(), PG_GETARG_TEXT_PP, PG_RETURN_BOOL, ForeignServer::serverid, and text_to_cstring().

◆ postgres_fdw_disconnect_all()

Datum postgres_fdw_disconnect_all ( PG_FUNCTION_ARGS  )

Definition at line 2104 of file connection.c.

2105 {
2107 }
#define InvalidOid
Definition: postgres_ext.h:36

References disconnect_cached_connections(), InvalidOid, and PG_RETURN_BOOL.

◆ postgres_fdw_get_connections()

Datum postgres_fdw_get_connections ( PG_FUNCTION_ARGS  )

Definition at line 1994 of file connection.c.

1995 {
1996 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1997  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1998  HASH_SEQ_STATUS scan;
1999  ConnCacheEntry *entry;
2000 
2001  InitMaterializedSRF(fcinfo, 0);
2002 
2003  /* If cache doesn't exist, we return no records */
2004  if (!ConnectionHash)
2005  PG_RETURN_VOID();
2006 
2007  hash_seq_init(&scan, ConnectionHash);
2008  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2009  {
2010  ForeignServer *server;
2012  bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2013 
2014  /* We only look for open remote connections */
2015  if (!entry->conn)
2016  continue;
2017 
2018  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2019 
2020  /*
2021  * The foreign server may have been dropped in current explicit
2022  * transaction. It is not possible to drop the server from another
2023  * session when the connection associated with it is in use in the
2024  * current transaction, if tried so, the drop query in another session
2025  * blocks until the current transaction finishes.
2026  *
2027  * Even though the server is dropped in the current transaction, the
2028  * cache can still have associated active connection entry, say we
2029  * call such connections dangling. Since we can not fetch the server
2030  * name from system catalogs for dangling connections, instead we show
2031  * NULL value for server name in output.
2032  *
2033  * We could have done better by storing the server name in the cache
2034  * entry instead of server oid so that it could be used in the output.
2035  * But the server name in each cache entry requires 64 bytes of
2036  * memory, which is huge, when there are many cached connections and
2037  * the use case i.e. dropping the foreign server within the explicit
2038  * current transaction seems rare. So, we chose to show NULL value for
2039  * server name in output.
2040  *
2041  * Such dangling connections get closed either in next use or at the
2042  * end of current explicit transaction in pgfdw_xact_callback.
2043  */
2044  if (!server)
2045  {
2046  /*
2047  * If the server has been dropped in the current explicit
2048  * transaction, then this entry would have been invalidated in
2049  * pgfdw_inval_callback at the end of drop server command. Note
2050  * that this connection would not have been closed in
2051  * pgfdw_inval_callback because it is still being used in the
2052  * current explicit transaction. So, assert that here.
2053  */
2054  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2055 
2056  /* Show null, if no server name was found */
2057  nulls[0] = true;
2058  }
2059  else
2060  values[0] = CStringGetTextDatum(server->servername);
2061 
2062  values[1] = BoolGetDatum(!entry->invalidated);
2063 
2064  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2065  }
2066 
2067  PG_RETURN_VOID();
2068 }
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
#define PG_RETURN_VOID()
Definition: fmgr.h:349
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:782

References Assert, BoolGetDatum(), ConnectionHash, CStringGetTextDatum, FSV_MISSING_OK, GetForeignServerExtended(), hash_seq_init(), hash_seq_search(), InitMaterializedSRF(), PG_RETURN_VOID, POSTGRES_FDW_GET_CONNECTIONS_COLS, ForeignServer::servername, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, tuplestore_putvalues(), and values.

◆ ReleaseConnection()

void ReleaseConnection ( PGconn conn)

Definition at line 785 of file connection.c.

786 {
787  /*
788  * Currently, we don't actually track connection references because all
789  * cleanup is managed on a transaction or subtransaction basis instead. So
790  * there's nothing to do here.
791  */
792 }

Referenced by estimate_path_cost_size(), finish_foreign_modify(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndDirectModify(), postgresEndForeignScan(), postgresGetAnalyzeInfoForForeignTable(), and postgresImportForeignSchema().

◆ UserMappingPasswordRequired()

static bool UserMappingPasswordRequired ( UserMapping user)
static

Definition at line 594 of file connection.c.

595 {
596  ListCell *cell;
597 
598  foreach(cell, user->options)
599  {
600  DefElem *def = (DefElem *) lfirst(cell);
601 
602  if (strcmp(def->defname, "password_required") == 0)
603  return defGetBoolean(def);
604  }
605 
606  return true;
607 }

References defGetBoolean(), DefElem::defname, lfirst, and user.

Referenced by check_conn_params(), and pgfdw_security_check().

Variable Documentation

◆ ConnectionHash

◆ cursor_number

unsigned int cursor_number = 0
static

◆ pgfdw_we_cleanup_result

uint32 pgfdw_we_cleanup_result = 0
static

Definition at line 87 of file connection.c.

Referenced by pgfdw_get_cleanup_result().

◆ pgfdw_we_connect

uint32 pgfdw_we_connect = 0
static

Definition at line 88 of file connection.c.

Referenced by connect_pg_server().

◆ pgfdw_we_get_result

uint32 pgfdw_we_get_result = 0
static

Definition at line 89 of file connection.c.

Referenced by GetConnection(), and pgfdw_get_result().

◆ prep_stmt_number

unsigned int prep_stmt_number = 0
static

Definition at line 81 of file connection.c.

Referenced by GetPrepStmtNumber().

◆ xact_got_connection

bool xact_got_connection = false
static

Definition at line 84 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().