PostgreSQL Source Code  git master
connection.c File Reference
#include "postgres.h"
#include <poll.h>
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq-be.h"
#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postgres_fdw.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheEntry
 

Macros

#define CONNECTION_CLEANUP_TIMEOUT   30000
 
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
 
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1   2
 
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2   5
 
#define POSTGRES_FDW_GET_CONNECTIONS_COLS   5 /* maximum of above */
 

Typedefs

typedef Oid ConnCacheKey
 
typedef struct ConnCacheEntry ConnCacheEntry
 

Enumerations

enum  pgfdwVersion { PGFDW_V1_1 = 0 , PGFDW_V1_2 }
 

Functions

 PG_FUNCTION_INFO_V1 (postgres_fdw_get_connections)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_get_connections_1_2)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_disconnect)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_disconnect_all)
 
static void make_new_connection (ConnCacheEntry *entry, UserMapping *user)
 
static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
 
static void disconnect_pg_server (ConnCacheEntry *entry)
 
static void check_conn_params (const char **keywords, const char **values, UserMapping *user)
 
static void configure_remote_session (PGconn *conn)
 
static void do_sql_command_begin (PGconn *conn, const char *sql)
 
static void do_sql_command_end (PGconn *conn, const char *sql, bool consume_input)
 
static void begin_remote_xact (ConnCacheEntry *entry)
 
static void pgfdw_xact_callback (XactEvent event, void *arg)
 
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
 
static void pgfdw_inval_callback (Datum arg, int cacheid, uint32 hashvalue)
 
static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry *entry)
 
static void pgfdw_reset_xact_state (ConnCacheEntry *entry, bool toplevel)
 
static bool pgfdw_cancel_query (PGconn *conn)
 
static bool pgfdw_cancel_query_begin (PGconn *conn, TimestampTz endtime)
 
static bool pgfdw_cancel_query_end (PGconn *conn, TimestampTz endtime, bool consume_input)
 
static bool pgfdw_exec_cleanup_query (PGconn *conn, const char *query, bool ignore_errors)
 
static bool pgfdw_exec_cleanup_query_begin (PGconn *conn, const char *query)
 
static bool pgfdw_exec_cleanup_query_end (PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
 
static bool pgfdw_get_cleanup_result (PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
 
static void pgfdw_abort_cleanup (ConnCacheEntry *entry, bool toplevel)
 
static bool pgfdw_abort_cleanup_begin (ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
 
static void pgfdw_finish_pre_commit_cleanup (List *pending_entries)
 
static void pgfdw_finish_pre_subcommit_cleanup (List *pending_entries, int curlevel)
 
static void pgfdw_finish_abort_cleanup (List *pending_entries, List *cancel_requested, bool toplevel)
 
static void pgfdw_security_check (const char **keywords, const char **values, UserMapping *user, PGconn *conn)
 
static bool UserMappingPasswordRequired (UserMapping *user)
 
static bool disconnect_cached_connections (Oid serverid)
 
static void postgres_fdw_get_connections_internal (FunctionCallInfo fcinfo, enum pgfdwVersion api_version)
 
static int pgfdw_conn_check (PGconn *conn)
 
static bool pgfdw_conn_checkable (void)
 
PGconnGetConnection (UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
 
void do_sql_command (PGconn *conn, const char *sql)
 
void ReleaseConnection (PGconn *conn)
 
unsigned int GetCursorNumber (PGconn *conn)
 
unsigned int GetPrepStmtNumber (PGconn *conn)
 
PGresultpgfdw_exec_query (PGconn *conn, const char *query, PgFdwConnState *state)
 
PGresultpgfdw_get_result (PGconn *conn)
 
void pgfdw_report_error (int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
 
Datum postgres_fdw_get_connections_1_2 (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_get_connections (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_disconnect (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_disconnect_all (PG_FUNCTION_ARGS)
 

Variables

static HTABConnectionHash = NULL
 
static unsigned int cursor_number = 0
 
static unsigned int prep_stmt_number = 0
 
static bool xact_got_connection = false
 
static uint32 pgfdw_we_cleanup_result = 0
 
static uint32 pgfdw_we_connect = 0
 
static uint32 pgfdw_we_get_result = 0
 

Macro Definition Documentation

◆ CONNECTION_CLEANUP_TIMEOUT

#define CONNECTION_CLEANUP_TIMEOUT   30000

Definition at line 100 of file connection.c.

◆ CONSTRUCT_ABORT_COMMAND

#define CONSTRUCT_ABORT_COMMAND (   sql,
  entry,
  toplevel 
)
Value:
do { \
if (toplevel) \
snprintf((sql), sizeof(sql), \
"ABORT TRANSACTION"); \
snprintf((sql), sizeof(sql), \
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
(entry)->xact_depth, (entry)->xact_depth); \
} while(0)
#define snprintf
Definition: port.h:238

Definition at line 103 of file connection.c.

◆ POSTGRES_FDW_GET_CONNECTIONS_COLS

#define POSTGRES_FDW_GET_CONNECTIONS_COLS   5 /* maximum of above */

Definition at line 2001 of file connection.c.

◆ POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1

#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1   2

Definition at line 1999 of file connection.c.

◆ POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2

#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2   5

Definition at line 2000 of file connection.c.

Typedef Documentation

◆ ConnCacheEntry

◆ ConnCacheKey

typedef Oid ConnCacheKey

Definition at line 55 of file connection.c.

Enumeration Type Documentation

◆ pgfdwVersion

Enumerator
PGFDW_V1_1 
PGFDW_V1_2 

Definition at line 117 of file connection.c.

118 {
119  PGFDW_V1_1 = 0,
120  PGFDW_V1_2,
121 };
@ PGFDW_V1_1
Definition: connection.c:119
@ PGFDW_V1_2
Definition: connection.c:120

Function Documentation

◆ begin_remote_xact()

static void begin_remote_xact ( ConnCacheEntry entry)
static

Definition at line 760 of file connection.c.

761 {
762  int curlevel = GetCurrentTransactionNestLevel();
763 
764  /* Start main transaction if we haven't yet */
765  if (entry->xact_depth <= 0)
766  {
767  const char *sql;
768 
769  elog(DEBUG3, "starting remote transaction on connection %p",
770  entry->conn);
771 
773  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
774  else
775  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
776  entry->changing_xact_state = true;
777  do_sql_command(entry->conn, sql);
778  entry->xact_depth = 1;
779  entry->changing_xact_state = false;
780  }
781 
782  /*
783  * If we're in a subtransaction, stack up savepoints to match our level.
784  * This ensures we can rollback just the desired effects when a
785  * subtransaction aborts.
786  */
787  while (entry->xact_depth < curlevel)
788  {
789  char sql[64];
790 
791  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
792  entry->changing_xact_state = true;
793  do_sql_command(entry->conn, sql);
794  entry->xact_depth++;
795  entry->changing_xact_state = false;
796  }
797 }
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:717
#define DEBUG3
Definition: elog.h:28
#define elog(elevel,...)
Definition: elog.h:225
PGconn * conn
Definition: connection.c:60
bool changing_xact_state
Definition: connection.c:66
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:928
#define IsolationIsSerializable()
Definition: xact.h:52

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog, GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ check_conn_params()

static void check_conn_params ( const char **  keywords,
const char **  values,
UserMapping user 
)
static

Definition at line 636 of file connection.c.

637 {
638  int i;
639 
640  /* no check required if superuser */
641  if (superuser_arg(user->userid))
642  return;
643 
644 #ifdef ENABLE_GSS
645  /* ok if the user provided their own delegated credentials */
647  return;
648 #endif
649 
650  /* ok if params contain a non-empty password */
651  for (i = 0; keywords[i] != NULL; i++)
652  {
653  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
654  return;
655  }
656 
657  /* ok if the superuser explicitly said so at user mapping creation time */
659  return;
660 
661  ereport(ERROR,
662  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
663  errmsg("password or GSSAPI delegated credentials required"),
664  errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
665 }
bool be_gssapi_get_delegation(Port *port)
static Datum values[MAXATTR]
Definition: bootstrap.c:150
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:612
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
struct Port * MyProcPort
Definition: globals.c:50
int i
Definition: isn.c:73
static char * user
Definition: pg_regress.c:120
bool superuser_arg(Oid roleid)
Definition: superuser.c:56

References be_gssapi_get_delegation(), ereport, errcode(), errdetail(), errmsg(), ERROR, i, MyProcPort, superuser_arg(), user, UserMappingPasswordRequired(), and values.

Referenced by connect_pg_server().

◆ configure_remote_session()

static void configure_remote_session ( PGconn conn)
static

Definition at line 679 of file connection.c.

680 {
681  int remoteversion = PQserverVersion(conn);
682 
683  /* Force the search path to contain only pg_catalog (see deparse.c) */
684  do_sql_command(conn, "SET search_path = pg_catalog");
685 
686  /*
687  * Set remote timezone; this is basically just cosmetic, since all
688  * transmitted and returned timestamptzs should specify a zone explicitly
689  * anyway. However it makes the regression test outputs more predictable.
690  *
691  * We don't risk setting remote zone equal to ours, since the remote
692  * server might use a different timezone database. Instead, use GMT
693  * (quoted, because very old servers are picky about case). That's
694  * guaranteed to work regardless of the remote's timezone database,
695  * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
696  */
697  do_sql_command(conn, "SET timezone = 'GMT'");
698 
699  /*
700  * Set values needed to ensure unambiguous data output from remote. (This
701  * logic should match what pg_dump does. See also set_transmission_modes
702  * in postgres_fdw.c.)
703  */
704  do_sql_command(conn, "SET datestyle = ISO");
705  if (remoteversion >= 80400)
706  do_sql_command(conn, "SET intervalstyle = postgres");
707  if (remoteversion >= 90000)
708  do_sql_command(conn, "SET extra_float_digits = 3");
709  else
710  do_sql_command(conn, "SET extra_float_digits = 2");
711 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7202
PGconn * conn
Definition: streamutil.c:55

References conn, do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

◆ connect_pg_server()

static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
)
static

Definition at line 462 of file connection.c.

463 {
464  PGconn *volatile conn = NULL;
465 
466  /*
467  * Use PG_TRY block to ensure closing connection on error.
468  */
469  PG_TRY();
470  {
471  const char **keywords;
472  const char **values;
473  char *appname = NULL;
474  int n;
475 
476  /*
477  * Construct connection params from generic options of ForeignServer
478  * and UserMapping. (Some of them might not be libpq options, in
479  * which case we'll just waste a few array slots.) Add 4 extra slots
480  * for application_name, fallback_application_name, client_encoding,
481  * end marker.
482  */
483  n = list_length(server->options) + list_length(user->options) + 4;
484  keywords = (const char **) palloc(n * sizeof(char *));
485  values = (const char **) palloc(n * sizeof(char *));
486 
487  n = 0;
488  n += ExtractConnectionOptions(server->options,
489  keywords + n, values + n);
490  n += ExtractConnectionOptions(user->options,
491  keywords + n, values + n);
492 
493  /*
494  * Use pgfdw_application_name as application_name if set.
495  *
496  * PQconnectdbParams() processes the parameter arrays from start to
497  * end. If any key word is repeated, the last value is used. Therefore
498  * note that pgfdw_application_name must be added to the arrays after
499  * options of ForeignServer are, so that it can override
500  * application_name set in ForeignServer.
501  */
503  {
504  keywords[n] = "application_name";
506  n++;
507  }
508 
509  /*
510  * Search the parameter arrays to find application_name setting, and
511  * replace escape sequences in it with status information if found.
512  * The arrays are searched backwards because the last value is used if
513  * application_name is repeatedly set.
514  */
515  for (int i = n - 1; i >= 0; i--)
516  {
517  if (strcmp(keywords[i], "application_name") == 0 &&
518  *(values[i]) != '\0')
519  {
520  /*
521  * Use this application_name setting if it's not empty string
522  * even after any escape sequences in it are replaced.
523  */
524  appname = process_pgfdw_appname(values[i]);
525  if (appname[0] != '\0')
526  {
527  values[i] = appname;
528  break;
529  }
530 
531  /*
532  * This empty application_name is not used, so we set
533  * values[i] to NULL and keep searching the array to find the
534  * next one.
535  */
536  values[i] = NULL;
537  pfree(appname);
538  appname = NULL;
539  }
540  }
541 
542  /* Use "postgres_fdw" as fallback_application_name */
543  keywords[n] = "fallback_application_name";
544  values[n] = "postgres_fdw";
545  n++;
546 
547  /* Set client_encoding so that libpq can convert encoding properly. */
548  keywords[n] = "client_encoding";
550  n++;
551 
552  keywords[n] = values[n] = NULL;
553 
554  /* verify the set of connection parameters */
555  check_conn_params(keywords, values, user);
556 
557  /* first time, allocate or get the custom wait event */
558  if (pgfdw_we_connect == 0)
559  pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
560 
561  /* OK to make connection */
562  conn = libpqsrv_connect_params(keywords, values,
563  false, /* expand_dbname */
565 
566  if (!conn || PQstatus(conn) != CONNECTION_OK)
567  ereport(ERROR,
568  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
569  errmsg("could not connect to server \"%s\"",
570  server->servername),
572 
573  /* Perform post-connection security checks */
574  pgfdw_security_check(keywords, values, user, conn);
575 
576  /* Prepare new session for use */
578 
579  if (appname != NULL)
580  pfree(appname);
581  pfree(keywords);
582  pfree(values);
583  }
584  PG_CATCH();
585  {
587  PG_RE_THROW();
588  }
589  PG_END_TRY();
590 
591  return conn;
592 }
static void configure_remote_session(PGconn *conn)
Definition: connection.c:679
static void pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
Definition: connection.c:424
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:636
static uint32 pgfdw_we_connect
Definition: connection.c:92
char * process_pgfdw_appname(const char *appname)
Definition: option.c:491
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:414
char * pgfdw_application_name
Definition: option.c:52
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1230
#define PG_RE_THROW()
Definition: elog.h:412
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define PG_CATCH(...)
Definition: elog.h:381
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7212
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7149
static void libpqsrv_disconnect(PGconn *conn)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
@ CONNECTION_OK
Definition: libpq-fe.h:81
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1267
char * pchomp(const char *in)
Definition: mcxt.c:1724
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc(Size size)
Definition: mcxt.c:1317
static int list_length(const List *l)
Definition: pg_list.h:152
List * options
Definition: foreign.h:42
char * servername
Definition: foreign.h:39
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition: wait_event.c:164

References check_conn_params(), configure_remote_session(), conn, CONNECTION_OK, ereport, errcode(), errdetail_internal(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), i, libpqsrv_connect_params(), libpqsrv_disconnect(), list_length(), ForeignServer::options, palloc(), pchomp(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_application_name, pgfdw_security_check(), pgfdw_we_connect, PQerrorMessage(), PQstatus(), process_pgfdw_appname(), ForeignServer::servername, user, values, and WaitEventExtensionNew().

Referenced by make_new_connection().

◆ disconnect_cached_connections()

static bool disconnect_cached_connections ( Oid  serverid)
static

Definition at line 2254 of file connection.c.

2255 {
2256  HASH_SEQ_STATUS scan;
2257  ConnCacheEntry *entry;
2258  bool all = !OidIsValid(serverid);
2259  bool result = false;
2260 
2261  /*
2262  * Connection cache hashtable has not been initialized yet in this
2263  * session, so return false.
2264  */
2265  if (!ConnectionHash)
2266  return false;
2267 
2268  hash_seq_init(&scan, ConnectionHash);
2269  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2270  {
2271  /* Ignore cache entry if no open connection right now. */
2272  if (!entry->conn)
2273  continue;
2274 
2275  if (all || entry->serverid == serverid)
2276  {
2277  /*
2278  * Emit a warning because the connection to close is used in the
2279  * current transaction and cannot be disconnected right now.
2280  */
2281  if (entry->xact_depth > 0)
2282  {
2283  ForeignServer *server;
2284 
2285  server = GetForeignServerExtended(entry->serverid,
2286  FSV_MISSING_OK);
2287 
2288  if (!server)
2289  {
2290  /*
2291  * If the foreign server was dropped while its connection
2292  * was used in the current transaction, the connection
2293  * must have been marked as invalid by
2294  * pgfdw_inval_callback at the end of DROP SERVER command.
2295  */
2296  Assert(entry->invalidated);
2297 
2298  ereport(WARNING,
2299  (errmsg("cannot close dropped server connection because it is still in use")));
2300  }
2301  else
2302  ereport(WARNING,
2303  (errmsg("cannot close connection for server \"%s\" because it is still in use",
2304  server->servername)));
2305  }
2306  else
2307  {
2308  elog(DEBUG3, "discarding connection %p", entry->conn);
2309  disconnect_pg_server(entry);
2310  result = true;
2311  }
2312  }
2313  }
2314 
2315  return result;
2316 }
#define Assert(condition)
Definition: c.h:861
#define OidIsValid(objectId)
Definition: c.h:778
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:598
static HTAB * ConnectionHash
Definition: connection.c:81
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
#define WARNING
Definition: elog.h:36
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:123
#define FSV_MISSING_OK
Definition: foreign.h:61
bool invalidated
Definition: connection.c:69

References Assert, ConnCacheEntry::conn, ConnectionHash, DEBUG3, disconnect_pg_server(), elog, ereport, errmsg(), FSV_MISSING_OK, GetForeignServerExtended(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, OidIsValid, ConnCacheEntry::serverid, ForeignServer::servername, WARNING, and ConnCacheEntry::xact_depth.

Referenced by postgres_fdw_disconnect(), and postgres_fdw_disconnect_all().

◆ disconnect_pg_server()

static void disconnect_pg_server ( ConnCacheEntry entry)
static

Definition at line 598 of file connection.c.

599 {
600  if (entry->conn != NULL)
601  {
602  libpqsrv_disconnect(entry->conn);
603  entry->conn = NULL;
604  }
605 }

References ConnCacheEntry::conn, and libpqsrv_disconnect().

Referenced by disconnect_cached_connections(), GetConnection(), pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), and pgfdw_reset_xact_state().

◆ do_sql_command()

void do_sql_command ( PGconn conn,
const char *  sql 
)

Definition at line 717 of file connection.c.

718 {
720  do_sql_command_end(conn, sql, false);
721 }
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
Definition: connection.c:731
static void do_sql_command_begin(PGconn *conn, const char *sql)
Definition: connection.c:724

References conn, do_sql_command_begin(), and do_sql_command_end().

Referenced by begin_remote_xact(), configure_remote_session(), pgfdw_subxact_callback(), pgfdw_xact_callback(), and postgresExecForeignTruncate().

◆ do_sql_command_begin()

static void do_sql_command_begin ( PGconn conn,
const char *  sql 
)
static

Definition at line 724 of file connection.c.

725 {
726  if (!PQsendQuery(conn, sql))
727  pgfdw_report_error(ERROR, NULL, conn, false, sql);
728 }
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:889
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416

References conn, ERROR, pgfdw_report_error(), and PQsendQuery().

Referenced by do_sql_command(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ do_sql_command_end()

static void do_sql_command_end ( PGconn conn,
const char *  sql,
bool  consume_input 
)
static

Definition at line 731 of file connection.c.

732 {
733  PGresult *res;
734 
735  /*
736  * If requested, consume whatever data is available from the socket. (Note
737  * that if all data is available, this allows pgfdw_get_result to call
738  * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
739  * would be large compared to the overhead of PQconsumeInput.)
740  */
741  if (consume_input && !PQconsumeInput(conn))
742  pgfdw_report_error(ERROR, NULL, conn, false, sql);
745  pgfdw_report_error(ERROR, res, conn, true, sql);
746  PQclear(res);
747 }
PGresult * pgfdw_get_result(PGconn *conn)
Definition: connection.c:870
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120

References conn, ERROR, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQconsumeInput(), PQresultStatus(), and res.

Referenced by do_sql_command(), pgfdw_finish_pre_commit_cleanup(), and pgfdw_finish_pre_subcommit_cleanup().

◆ GetConnection()

PGconn* GetConnection ( UserMapping user,
bool  will_prep_stmt,
PgFdwConnState **  state 
)

Definition at line 195 of file connection.c.

196 {
197  bool found;
198  bool retry = false;
199  ConnCacheEntry *entry;
202 
203  /* First time through, initialize connection cache hashtable */
204  if (ConnectionHash == NULL)
205  {
206  HASHCTL ctl;
207 
208  if (pgfdw_we_get_result == 0)
210  WaitEventExtensionNew("PostgresFdwGetResult");
211 
212  ctl.keysize = sizeof(ConnCacheKey);
213  ctl.entrysize = sizeof(ConnCacheEntry);
214  ConnectionHash = hash_create("postgres_fdw connections", 8,
215  &ctl,
217 
218  /*
219  * Register some callback functions that manage connection cleanup.
220  * This should be done just once in each backend.
221  */
224  CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
226  CacheRegisterSyscacheCallback(USERMAPPINGOID,
228  }
229 
230  /* Set flag that we did GetConnection during the current transaction */
231  xact_got_connection = true;
232 
233  /* Create hash key for the entry. Assume no pad bytes in key struct */
234  key = user->umid;
235 
236  /*
237  * Find or create cached entry for requested connection.
238  */
239  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
240  if (!found)
241  {
242  /*
243  * We need only clear "conn" here; remaining fields will be filled
244  * later when "conn" is set.
245  */
246  entry->conn = NULL;
247  }
248 
249  /* Reject further use of connections which failed abort cleanup. */
251 
252  /*
253  * If the connection needs to be remade due to invalidation, disconnect as
254  * soon as we're out of all transactions.
255  */
256  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
257  {
258  elog(DEBUG3, "closing connection %p for option changes to take effect",
259  entry->conn);
260  disconnect_pg_server(entry);
261  }
262 
263  /*
264  * If cache entry doesn't have a connection, we have to establish a new
265  * connection. (If connect_pg_server throws an error, the cache entry
266  * will remain in a valid empty state, ie conn == NULL.)
267  */
268  if (entry->conn == NULL)
269  make_new_connection(entry, user);
270 
271  /*
272  * We check the health of the cached connection here when using it. In
273  * cases where we're out of all transactions, if a broken connection is
274  * detected, we try to reestablish a new connection later.
275  */
276  PG_TRY();
277  {
278  /* Process a pending asynchronous request if any. */
279  if (entry->state.pendingAreq)
281  /* Start a new transaction or subtransaction if needed. */
282  begin_remote_xact(entry);
283  }
284  PG_CATCH();
285  {
287  ErrorData *errdata = CopyErrorData();
288 
289  /*
290  * Determine whether to try to reestablish the connection.
291  *
292  * After a broken connection is detected in libpq, any error other
293  * than connection failure (e.g., out-of-memory) can be thrown
294  * somewhere between return from libpq and the expected ereport() call
295  * in pgfdw_report_error(). In this case, since PQstatus() indicates
296  * CONNECTION_BAD, checking only PQstatus() causes the false detection
297  * of connection failure. To avoid this, we also verify that the
298  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
299  * checking only the sqlstate can cause another false detection
300  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
301  * for any libpq-originated error condition.
302  */
303  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
304  PQstatus(entry->conn) != CONNECTION_BAD ||
305  entry->xact_depth > 0)
306  {
307  MemoryContextSwitchTo(ecxt);
308  PG_RE_THROW();
309  }
310 
311  /* Clean up the error state */
312  FlushErrorState();
313  FreeErrorData(errdata);
314  errdata = NULL;
315 
316  retry = true;
317  }
318  PG_END_TRY();
319 
320  /*
321  * If a broken connection is detected, disconnect it, reestablish a new
322  * connection and retry a new remote transaction. If connection failure is
323  * reported again, we give up getting a connection.
324  */
325  if (retry)
326  {
327  Assert(entry->xact_depth == 0);
328 
329  ereport(DEBUG3,
330  (errmsg_internal("could not start remote transaction on connection %p",
331  entry->conn)),
332  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
333 
334  elog(DEBUG3, "closing connection %p to reestablish a new one",
335  entry->conn);
336  disconnect_pg_server(entry);
337 
338  make_new_connection(entry, user);
339 
340  begin_remote_xact(entry);
341  }
342 
343  /* Remember if caller will prepare statements */
344  entry->have_prep_stmt |= will_prep_stmt;
345 
346  /* If caller needs access to the per-connection state, return it. */
347  if (state)
348  *state = &entry->state;
349 
350  return entry->conn;
351 }
Oid ConnCacheKey
Definition: connection.c:55
static uint32 pgfdw_we_get_result
Definition: connection.c:93
static bool xact_got_connection
Definition: connection.c:88
struct ConnCacheEntry ConnCacheEntry
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:358
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:1095
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1258
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1208
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:945
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:760
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1818
void FlushErrorState(void)
Definition: elog.c:1867
ErrorData * CopyErrorData(void)
Definition: elog.c:1746
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1516
@ CONNECTION_BAD
Definition: libpq-fe.h:82
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
uintptr_t Datum
Definition: postgres.h:64
void process_pending_request(AsyncRequest *areq)
MemoryContextSwitchTo(old_ctx)
tree ctl
Definition: radixtree.h:1853
bool have_prep_stmt
Definition: connection.c:64
PgFdwConnState state
Definition: connection.c:75
int sqlerrcode
Definition: elog.h:439
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:139
Definition: regguts.h:323
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3797
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3857

References Assert, begin_remote_xact(), CacheRegisterSyscacheCallback(), ConnCacheEntry::conn, CONNECTION_BAD, ConnectionHash, CopyErrorData(), ctl, CurrentMemoryContext, DEBUG3, disconnect_pg_server(), elog, ereport, errdetail_internal(), errmsg_internal(), FlushErrorState(), FreeErrorData(), HASH_BLOBS, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, sort-test::key, make_new_connection(), MemoryContextSwitchTo(), pchomp(), PgFdwConnState::pendingAreq, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_subxact_callback(), pgfdw_we_get_result, pgfdw_xact_callback(), PQerrorMessage(), PQstatus(), process_pending_request(), RegisterSubXactCallback(), RegisterXactCallback(), ErrorData::sqlerrcode, ConnCacheEntry::state, user, WaitEventExtensionNew(), ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by create_foreign_modify(), dumpDatabase(), dumpDatabaseConfig(), dumpLOs(), dumpTableData_copy(), estimate_path_cost_size(), expand_extension_name_patterns(), expand_foreign_server_name_patterns(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignScan(), postgresExecForeignTruncate(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

◆ GetCursorNumber()

unsigned int GetCursorNumber ( PGconn conn)

Definition at line 824 of file connection.c.

825 {
826  return ++cursor_number;
827 }
static unsigned int cursor_number
Definition: connection.c:84

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

◆ GetPrepStmtNumber()

unsigned int GetPrepStmtNumber ( PGconn conn)

Definition at line 838 of file connection.c.

839 {
840  return ++prep_stmt_number;
841 }
static unsigned int prep_stmt_number
Definition: connection.c:85

References prep_stmt_number.

Referenced by prepare_foreign_modify().

◆ make_new_connection()

static void make_new_connection ( ConnCacheEntry entry,
UserMapping user 
)
static

Definition at line 358 of file connection.c.

359 {
360  ForeignServer *server = GetForeignServer(user->serverid);
361  ListCell *lc;
362 
363  Assert(entry->conn == NULL);
364 
365  /* Reset all transient state fields, to be sure all are clean */
366  entry->xact_depth = 0;
367  entry->have_prep_stmt = false;
368  entry->have_error = false;
369  entry->changing_xact_state = false;
370  entry->invalidated = false;
371  entry->serverid = server->serverid;
372  entry->server_hashvalue =
373  GetSysCacheHashValue1(FOREIGNSERVEROID,
374  ObjectIdGetDatum(server->serverid));
375  entry->mapping_hashvalue =
376  GetSysCacheHashValue1(USERMAPPINGOID,
377  ObjectIdGetDatum(user->umid));
378  memset(&entry->state, 0, sizeof(entry->state));
379 
380  /*
381  * Determine whether to keep the connection that we're about to make here
382  * open even after the transaction using it ends, so that the subsequent
383  * transactions can re-use it.
384  *
385  * By default, all the connections to any foreign servers are kept open.
386  *
387  * Also determine whether to commit/abort (sub)transactions opened on the
388  * remote server in parallel at (sub)transaction end, which is disabled by
389  * default.
390  *
391  * Note: it's enough to determine these only when making a new connection
392  * because if these settings for it are changed, it will be closed and
393  * re-made later.
394  */
395  entry->keep_connections = true;
396  entry->parallel_commit = false;
397  entry->parallel_abort = false;
398  foreach(lc, server->options)
399  {
400  DefElem *def = (DefElem *) lfirst(lc);
401 
402  if (strcmp(def->defname, "keep_connections") == 0)
403  entry->keep_connections = defGetBoolean(def);
404  else if (strcmp(def->defname, "parallel_commit") == 0)
405  entry->parallel_commit = defGetBoolean(def);
406  else if (strcmp(def->defname, "parallel_abort") == 0)
407  entry->parallel_abort = defGetBoolean(def);
408  }
409 
410  /* Now try to make the connection */
411  entry->conn = connect_pg_server(server, user);
412 
413  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
414  entry->conn, server->servername, user->umid, user->userid);
415 }
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:462
bool defGetBoolean(DefElem *def)
Definition: define.c:107
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:111
#define lfirst(lc)
Definition: pg_list.h:172
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
bool parallel_commit
Definition: connection.c:67
uint32 server_hashvalue
Definition: connection.c:73
uint32 mapping_hashvalue
Definition: connection.c:74
bool keep_connections
Definition: connection.c:70
bool parallel_abort
Definition: connection.c:68
char * defname
Definition: parsenodes.h:817
Oid serverid
Definition: foreign.h:36
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:118

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, defGetBoolean(), DefElem::defname, elog, GetForeignServer(), GetSysCacheHashValue1, ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, ConnCacheEntry::keep_connections, lfirst, ConnCacheEntry::mapping_hashvalue, ObjectIdGetDatum(), ForeignServer::options, ConnCacheEntry::parallel_abort, ConnCacheEntry::parallel_commit, ConnCacheEntry::server_hashvalue, ConnCacheEntry::serverid, ForeignServer::serverid, ForeignServer::servername, ConnCacheEntry::state, user, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ PG_FUNCTION_INFO_V1() [1/4]

PG_FUNCTION_INFO_V1 ( postgres_fdw_disconnect  )

◆ PG_FUNCTION_INFO_V1() [2/4]

PG_FUNCTION_INFO_V1 ( postgres_fdw_disconnect_all  )

◆ PG_FUNCTION_INFO_V1() [3/4]

PG_FUNCTION_INFO_V1 ( postgres_fdw_get_connections  )

◆ PG_FUNCTION_INFO_V1() [4/4]

PG_FUNCTION_INFO_V1 ( postgres_fdw_get_connections_1_2  )

◆ pgfdw_abort_cleanup()

static void pgfdw_abort_cleanup ( ConnCacheEntry entry,
bool  toplevel 
)
static

Definition at line 1601 of file connection.c.

1602 {
1603  char sql[100];
1604 
1605  /*
1606  * Don't try to clean up the connection if we're already in error
1607  * recursion trouble.
1608  */
1610  entry->changing_xact_state = true;
1611 
1612  /*
1613  * If connection is already unsalvageable, don't touch it further.
1614  */
1615  if (entry->changing_xact_state)
1616  return;
1617 
1618  /*
1619  * Mark this connection as in the process of changing transaction state.
1620  */
1621  entry->changing_xact_state = true;
1622 
1623  /* Assume we might have lost track of prepared statements */
1624  entry->have_error = true;
1625 
1626  /*
1627  * If a command has been submitted to the remote server by using an
1628  * asynchronous execution function, the command might not have yet
1629  * completed. Check to see if a command is still being processed by the
1630  * remote server, and if so, request cancellation of the command.
1631  */
1632  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1633  !pgfdw_cancel_query(entry->conn))
1634  return; /* Unable to cancel running query */
1635 
1636  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1637  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1638  return; /* Unable to abort remote (sub)transaction */
1639 
1640  if (toplevel)
1641  {
1642  if (entry->have_prep_stmt && entry->have_error &&
1644  "DEALLOCATE ALL",
1645  true))
1646  return; /* Trouble clearing prepared statements */
1647 
1648  entry->have_prep_stmt = false;
1649  entry->have_error = false;
1650  }
1651 
1652  /*
1653  * If pendingAreq of the per-connection state is not NULL, it means that
1654  * an asynchronous fetch begun by fetch_more_data_begin() was not done
1655  * successfully and thus the per-connection state was not reset in
1656  * fetch_more_data(); in that case reset the per-connection state here.
1657  */
1658  if (entry->state.pendingAreq)
1659  memset(&entry->state, 0, sizeof(entry->state));
1660 
1661  /* Disarm changing_xact_state if it all worked */
1662  entry->changing_xact_state = false;
1663 }
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
Definition: connection.c:103
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1418
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1327
bool in_error_recursion_trouble(void)
Definition: elog.c:293
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:7157
@ PQTRANS_ACTIVE
Definition: libpq-fe.h:143

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONSTRUCT_ABORT_COMMAND, ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, in_error_recursion_trouble(), PgFdwConnState::pendingAreq, pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), PQTRANS_ACTIVE, PQtransactionStatus(), and ConnCacheEntry::state.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_abort_cleanup_begin()

static bool pgfdw_abort_cleanup_begin ( ConnCacheEntry entry,
bool  toplevel,
List **  pending_entries,
List **  cancel_requested 
)
static

Definition at line 1675 of file connection.c.

1677 {
1678  /*
1679  * Don't try to clean up the connection if we're already in error
1680  * recursion trouble.
1681  */
1683  entry->changing_xact_state = true;
1684 
1685  /*
1686  * If connection is already unsalvageable, don't touch it further.
1687  */
1688  if (entry->changing_xact_state)
1689  return false;
1690 
1691  /*
1692  * Mark this connection as in the process of changing transaction state.
1693  */
1694  entry->changing_xact_state = true;
1695 
1696  /* Assume we might have lost track of prepared statements */
1697  entry->have_error = true;
1698 
1699  /*
1700  * If a command has been submitted to the remote server by using an
1701  * asynchronous execution function, the command might not have yet
1702  * completed. Check to see if a command is still being processed by the
1703  * remote server, and if so, request cancellation of the command.
1704  */
1705  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1706  {
1707  TimestampTz endtime;
1708 
1711  if (!pgfdw_cancel_query_begin(entry->conn, endtime))
1712  return false; /* Unable to cancel running query */
1713  *cancel_requested = lappend(*cancel_requested, entry);
1714  }
1715  else
1716  {
1717  char sql[100];
1718 
1719  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1720  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1721  return false; /* Unable to abort remote transaction */
1722  *pending_entries = lappend(*pending_entries, entry);
1723  }
1724 
1725  return true;
1726 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
Definition: connection.c:1353
static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
Definition: connection.c:1438
#define CONNECTION_CLEANUP_TIMEOUT
Definition: connection.c:100
int64 TimestampTz
Definition: timestamp.h:39
List * lappend(List *list, void *datum)
Definition: list.c:339
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_CLEANUP_TIMEOUT, CONSTRUCT_ABORT_COMMAND, GetCurrentTimestamp(), ConnCacheEntry::have_error, in_error_recursion_trouble(), lappend(), pgfdw_cancel_query_begin(), pgfdw_exec_cleanup_query_begin(), PQTRANS_ACTIVE, PQtransactionStatus(), and TimestampTzPlusMilliseconds.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_cancel_query()

static bool pgfdw_cancel_query ( PGconn conn)
static

Definition at line 1327 of file connection.c.

1328 {
1329  TimestampTz endtime;
1330 
1331  /*
1332  * If it takes too long to cancel the query and discard the result, assume
1333  * the connection is dead.
1334  */
1337 
1338  if (!pgfdw_cancel_query_begin(conn, endtime))
1339  return false;
1340  return pgfdw_cancel_query_end(conn, endtime, false);
1341 }
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
Definition: connection.c:1366

References conn, CONNECTION_CLEANUP_TIMEOUT, GetCurrentTimestamp(), pgfdw_cancel_query_begin(), pgfdw_cancel_query_end(), and TimestampTzPlusMilliseconds.

Referenced by pgfdw_abort_cleanup().

◆ pgfdw_cancel_query_begin()

static bool pgfdw_cancel_query_begin ( PGconn conn,
TimestampTz  endtime 
)
static

Definition at line 1353 of file connection.c.

1354 {
1355  const char *errormsg = libpqsrv_cancel(conn, endtime);
1356 
1357  if (errormsg != NULL)
1358  ereport(WARNING,
1359  errcode(ERRCODE_CONNECTION_FAILURE),
1360  errmsg("could not send cancel request: %s", errormsg));
1361 
1362  return errormsg == NULL;
1363 }
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)

References conn, ereport, errcode(), errmsg(), libpqsrv_cancel(), and WARNING.

Referenced by pgfdw_abort_cleanup_begin(), and pgfdw_cancel_query().

◆ pgfdw_cancel_query_end()

static bool pgfdw_cancel_query_end ( PGconn conn,
TimestampTz  endtime,
bool  consume_input 
)
static

Definition at line 1366 of file connection.c.

1367 {
1368  PGresult *result = NULL;
1369  bool timed_out;
1370 
1371  /*
1372  * If requested, consume whatever data is available from the socket. (Note
1373  * that if all data is available, this allows pgfdw_get_cleanup_result to
1374  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1375  * which would be large compared to the overhead of PQconsumeInput.)
1376  */
1377  if (consume_input && !PQconsumeInput(conn))
1378  {
1379  ereport(WARNING,
1380  (errcode(ERRCODE_CONNECTION_FAILURE),
1381  errmsg("could not get result of cancel request: %s",
1382  pchomp(PQerrorMessage(conn)))));
1383  return false;
1384  }
1385 
1386  /* Get and discard the result of the query. */
1387  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1388  {
1389  if (timed_out)
1390  ereport(WARNING,
1391  (errmsg("could not get result of cancel request due to timeout")));
1392  else
1393  ereport(WARNING,
1394  (errcode(ERRCODE_CONNECTION_FAILURE),
1395  errmsg("could not get result of cancel request: %s",
1396  pchomp(PQerrorMessage(conn)))));
1397 
1398  return false;
1399  }
1400  PQclear(result);
1401 
1402  return true;
1403 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
Definition: connection.c:1513

References conn, ereport, errcode(), errmsg(), pchomp(), pgfdw_get_cleanup_result(), PQclear(), PQconsumeInput(), PQerrorMessage(), and WARNING.

Referenced by pgfdw_cancel_query(), and pgfdw_finish_abort_cleanup().

◆ pgfdw_conn_check()

static int pgfdw_conn_check ( PGconn conn)
static

Definition at line 2326 of file connection.c.

2327 {
2328  int sock = PQsocket(conn);
2329 
2330  if (PQstatus(conn) != CONNECTION_OK || sock == -1)
2331  return -1;
2332 
2333 #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2334  {
2335  struct pollfd input_fd;
2336  int result;
2337 
2338  input_fd.fd = sock;
2339  input_fd.events = POLLRDHUP;
2340  input_fd.revents = 0;
2341 
2342  do
2343  result = poll(&input_fd, 1, 0);
2344  while (result < 0 && errno == EINTR);
2345 
2346  if (result < 0)
2347  return -1;
2348 
2349  return (input_fd.revents &
2350  (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
2351  }
2352 #else
2353  return 0;
2354 #endif
2355 }
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7238
#define EINTR
Definition: win32_port.h:374

References conn, CONNECTION_OK, EINTR, PQsocket(), and PQstatus().

Referenced by postgres_fdw_get_connections_internal().

◆ pgfdw_conn_checkable()

static bool pgfdw_conn_checkable ( void  )
static

Definition at line 2363 of file connection.c.

2364 {
2365 #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2366  return true;
2367 #else
2368  return false;
2369 #endif
2370 }

Referenced by postgres_fdw_get_connections_internal().

◆ pgfdw_exec_cleanup_query()

static bool pgfdw_exec_cleanup_query ( PGconn conn,
const char *  query,
bool  ignore_errors 
)
static

Definition at line 1418 of file connection.c.

1419 {
1420  TimestampTz endtime;
1421 
1422  /*
1423  * If it takes too long to execute a cleanup query, assume the connection
1424  * is dead. It's fairly likely that this is why we aborted in the first
1425  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1426  * be too long.
1427  */
1430 
1431  if (!pgfdw_exec_cleanup_query_begin(conn, query))
1432  return false;
1433  return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1434  false, ignore_errors);
1435 }
static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
Definition: connection.c:1456

References conn, CONNECTION_CLEANUP_TIMEOUT, GetCurrentTimestamp(), pgfdw_exec_cleanup_query_begin(), pgfdw_exec_cleanup_query_end(), and TimestampTzPlusMilliseconds.

Referenced by pgfdw_abort_cleanup().

◆ pgfdw_exec_cleanup_query_begin()

static bool pgfdw_exec_cleanup_query_begin ( PGconn conn,
const char *  query 
)
static

Definition at line 1438 of file connection.c.

1439 {
1440  Assert(query != NULL);
1441 
1442  /*
1443  * Submit a query. Since we don't use non-blocking mode, this also can
1444  * block. But its risk is relatively small, so we ignore that for now.
1445  */
1446  if (!PQsendQuery(conn, query))
1447  {
1448  pgfdw_report_error(WARNING, NULL, conn, false, query);
1449  return false;
1450  }
1451 
1452  return true;
1453 }

References Assert, conn, pgfdw_report_error(), PQsendQuery(), and WARNING.

Referenced by pgfdw_abort_cleanup_begin(), pgfdw_exec_cleanup_query(), and pgfdw_finish_abort_cleanup().

◆ pgfdw_exec_cleanup_query_end()

static bool pgfdw_exec_cleanup_query_end ( PGconn conn,
const char *  query,
TimestampTz  endtime,
bool  consume_input,
bool  ignore_errors 
)
static

Definition at line 1456 of file connection.c.

1459 {
1460  PGresult *result = NULL;
1461  bool timed_out;
1462 
1463  Assert(query != NULL);
1464 
1465  /*
1466  * If requested, consume whatever data is available from the socket. (Note
1467  * that if all data is available, this allows pgfdw_get_cleanup_result to
1468  * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1469  * which would be large compared to the overhead of PQconsumeInput.)
1470  */
1471  if (consume_input && !PQconsumeInput(conn))
1472  {
1473  pgfdw_report_error(WARNING, NULL, conn, false, query);
1474  return false;
1475  }
1476 
1477  /* Get the result of the query. */
1478  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1479  {
1480  if (timed_out)
1481  ereport(WARNING,
1482  (errmsg("could not get query result due to timeout"),
1483  errcontext("remote SQL command: %s", query)));
1484  else
1485  pgfdw_report_error(WARNING, NULL, conn, false, query);
1486 
1487  return false;
1488  }
1489 
1490  /* Issue a warning if not successful. */
1491  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1492  {
1493  pgfdw_report_error(WARNING, result, conn, true, query);
1494  return ignore_errors;
1495  }
1496  PQclear(result);
1497 
1498  return true;
1499 }
#define errcontext
Definition: elog.h:196

References Assert, conn, ereport, errcontext, errmsg(), pgfdw_get_cleanup_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQconsumeInput(), PQresultStatus(), and WARNING.

Referenced by pgfdw_exec_cleanup_query(), and pgfdw_finish_abort_cleanup().

◆ pgfdw_exec_query()

PGresult* pgfdw_exec_query ( PGconn conn,
const char *  query,
PgFdwConnState state 
)

Definition at line 853 of file connection.c.

854 {
855  /* First, process a pending asynchronous request, if any. */
856  if (state && state->pendingAreq)
857  process_pending_request(state->pendingAreq);
858 
859  if (!PQsendQuery(conn, query))
860  return NULL;
861  return pgfdw_get_result(conn);
862 }

References conn, pgfdw_get_result(), PQsendQuery(), and process_pending_request().

Referenced by close_cursor(), deallocate_query(), fetch_more_data(), get_remote_estimate(), pgfdw_xact_callback(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), and postgresReScanForeignScan().

◆ pgfdw_finish_abort_cleanup()

static void pgfdw_finish_abort_cleanup ( List pending_entries,
List cancel_requested,
bool  toplevel 
)
static

Definition at line 1841 of file connection.c.

1843 {
1844  List *pending_deallocs = NIL;
1845  ListCell *lc;
1846 
1847  /*
1848  * For each of the pending cancel requests (if any), get and discard the
1849  * result of the query, and submit an abort command to the remote server.
1850  */
1851  if (cancel_requested)
1852  {
1853  foreach(lc, cancel_requested)
1854  {
1855  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1856  TimestampTz endtime;
1857  char sql[100];
1858 
1859  Assert(entry->changing_xact_state);
1860 
1861  /*
1862  * Set end time. You might think we should do this before issuing
1863  * cancel request like in normal mode, but that is problematic,
1864  * because if, for example, it took longer than 30 seconds to
1865  * process the first few entries in the cancel_requested list, it
1866  * would cause a timeout error when processing each of the
1867  * remaining entries in the list, leading to slamming that entry's
1868  * connection shut.
1869  */
1872 
1873  if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
1874  {
1875  /* Unable to cancel running query */
1876  pgfdw_reset_xact_state(entry, toplevel);
1877  continue;
1878  }
1879 
1880  /* Send an abort command in parallel if needed */
1881  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1882  if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1883  {
1884  /* Unable to abort remote (sub)transaction */
1885  pgfdw_reset_xact_state(entry, toplevel);
1886  }
1887  else
1888  pending_entries = lappend(pending_entries, entry);
1889  }
1890  }
1891 
1892  /* No further work if no pending entries */
1893  if (!pending_entries)
1894  return;
1895 
1896  /*
1897  * Get the result of the abort command for each of the pending entries
1898  */
1899  foreach(lc, pending_entries)
1900  {
1901  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1902  TimestampTz endtime;
1903  char sql[100];
1904 
1905  Assert(entry->changing_xact_state);
1906 
1907  /*
1908  * Set end time. We do this now, not before issuing the command like
1909  * in normal mode, for the same reason as for the cancel_requested
1910  * entries.
1911  */
1914 
1915  CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1916  if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
1917  true, false))
1918  {
1919  /* Unable to abort remote (sub)transaction */
1920  pgfdw_reset_xact_state(entry, toplevel);
1921  continue;
1922  }
1923 
1924  if (toplevel)
1925  {
1926  /* Do a DEALLOCATE ALL in parallel if needed */
1927  if (entry->have_prep_stmt && entry->have_error)
1928  {
1930  "DEALLOCATE ALL"))
1931  {
1932  /* Trouble clearing prepared statements */
1933  pgfdw_reset_xact_state(entry, toplevel);
1934  }
1935  else
1936  pending_deallocs = lappend(pending_deallocs, entry);
1937  continue;
1938  }
1939  entry->have_prep_stmt = false;
1940  entry->have_error = false;
1941  }
1942 
1943  /* Reset the per-connection state if needed */
1944  if (entry->state.pendingAreq)
1945  memset(&entry->state, 0, sizeof(entry->state));
1946 
1947  /* We're done with this entry; unset the changing_xact_state flag */
1948  entry->changing_xact_state = false;
1949  pgfdw_reset_xact_state(entry, toplevel);
1950  }
1951 
1952  /* No further work if no pending entries */
1953  if (!pending_deallocs)
1954  return;
1955  Assert(toplevel);
1956 
1957  /*
1958  * Get the result of the DEALLOCATE command for each of the pending
1959  * entries
1960  */
1961  foreach(lc, pending_deallocs)
1962  {
1963  ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1964  TimestampTz endtime;
1965 
1966  Assert(entry->changing_xact_state);
1967  Assert(entry->have_prep_stmt);
1968  Assert(entry->have_error);
1969 
1970  /*
1971  * Set end time. We do this now, not before issuing the command like
1972  * in normal mode, for the same reason as for the cancel_requested
1973  * entries.
1974  */
1977 
1978  if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
1979  endtime, true, true))
1980  {
1981  /* Trouble clearing prepared statements */
1982  pgfdw_reset_xact_state(entry, toplevel);
1983  continue;
1984  }
1985  entry->have_prep_stmt = false;
1986  entry->have_error = false;
1987 
1988  /* Reset the per-connection state if needed */
1989  if (entry->state.pendingAreq)
1990  memset(&entry->state, 0, sizeof(entry->state));
1991 
1992  /* We're done with this entry; unset the changing_xact_state flag */
1993  entry->changing_xact_state = false;
1994  pgfdw_reset_xact_state(entry, toplevel);
1995  }
1996 }
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1282
#define NIL
Definition: pg_list.h:68
Definition: pg_list.h:54

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_CLEANUP_TIMEOUT, CONSTRUCT_ABORT_COMMAND, GetCurrentTimestamp(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, lappend(), lfirst, NIL, PgFdwConnState::pendingAreq, pgfdw_cancel_query_end(), pgfdw_exec_cleanup_query_begin(), pgfdw_exec_cleanup_query_end(), pgfdw_reset_xact_state(), ConnCacheEntry::state, and TimestampTzPlusMilliseconds.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_finish_pre_commit_cleanup()

static void pgfdw_finish_pre_commit_cleanup ( List pending_entries)
static

Definition at line 1733 of file connection.c.

1734 {
1735  ConnCacheEntry *entry;
1736  List *pending_deallocs = NIL;
1737  ListCell *lc;
1738 
1739  Assert(pending_entries);
1740 
1741  /*
1742  * Get the result of the COMMIT command for each of the pending entries
1743  */
1744  foreach(lc, pending_entries)
1745  {
1746  entry = (ConnCacheEntry *) lfirst(lc);
1747 
1748  Assert(entry->changing_xact_state);
1749 
1750  /*
1751  * We might already have received the result on the socket, so pass
1752  * consume_input=true to try to consume it first
1753  */
1754  do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1755  entry->changing_xact_state = false;
1756 
1757  /* Do a DEALLOCATE ALL in parallel if needed */
1758  if (entry->have_prep_stmt && entry->have_error)
1759  {
1760  /* Ignore errors (see notes in pgfdw_xact_callback) */
1761  if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1762  {
1763  pending_deallocs = lappend(pending_deallocs, entry);
1764  continue;
1765  }
1766  }
1767  entry->have_prep_stmt = false;
1768  entry->have_error = false;
1769 
1770  pgfdw_reset_xact_state(entry, true);
1771  }
1772 
1773  /* No further work if no pending entries */
1774  if (!pending_deallocs)
1775  return;
1776 
1777  /*
1778  * Get the result of the DEALLOCATE command for each of the pending
1779  * entries
1780  */
1781  foreach(lc, pending_deallocs)
1782  {
1783  PGresult *res;
1784 
1785  entry = (ConnCacheEntry *) lfirst(lc);
1786 
1787  /* Ignore errors (see notes in pgfdw_xact_callback) */
1788  while ((res = PQgetResult(entry->conn)) != NULL)
1789  {
1790  PQclear(res);
1791  /* Stop if the connection is lost (else we'll loop infinitely) */
1792  if (PQstatus(entry->conn) == CONNECTION_BAD)
1793  break;
1794  }
1795  entry->have_prep_stmt = false;
1796  entry->have_error = false;
1797 
1798  pgfdw_reset_xact_state(entry, true);
1799  }
1800 }
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_BAD, do_sql_command_end(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, lappend(), lfirst, NIL, pgfdw_reset_xact_state(), PQclear(), PQgetResult(), PQsendQuery(), PQstatus(), and res.

Referenced by pgfdw_xact_callback().

◆ pgfdw_finish_pre_subcommit_cleanup()

static void pgfdw_finish_pre_subcommit_cleanup ( List pending_entries,
int  curlevel 
)
static

Definition at line 1807 of file connection.c.

1808 {
1809  ConnCacheEntry *entry;
1810  char sql[100];
1811  ListCell *lc;
1812 
1813  Assert(pending_entries);
1814 
1815  /*
1816  * Get the result of the RELEASE command for each of the pending entries
1817  */
1818  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1819  foreach(lc, pending_entries)
1820  {
1821  entry = (ConnCacheEntry *) lfirst(lc);
1822 
1823  Assert(entry->changing_xact_state);
1824 
1825  /*
1826  * We might already have received the result on the socket, so pass
1827  * consume_input=true to try to consume it first
1828  */
1829  do_sql_command_end(entry->conn, sql, true);
1830  entry->changing_xact_state = false;
1831 
1832  pgfdw_reset_xact_state(entry, false);
1833  }
1834 }

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, do_sql_command_end(), lfirst, pgfdw_reset_xact_state(), and snprintf.

Referenced by pgfdw_subxact_callback().

◆ pgfdw_get_cleanup_result()

static bool pgfdw_get_cleanup_result ( PGconn conn,
TimestampTz  endtime,
PGresult **  result,
bool timed_out 
)
static

Definition at line 1513 of file connection.c.

1515 {
1516  volatile bool failed = false;
1517  PGresult *volatile last_res = NULL;
1518 
1519  *timed_out = false;
1520 
1521  /* In what follows, do not leak any PGresults on an error. */
1522  PG_TRY();
1523  {
1524  for (;;)
1525  {
1526  PGresult *res;
1527 
1528  while (PQisBusy(conn))
1529  {
1530  int wc;
1532  long cur_timeout;
1533 
1534  /* If timeout has expired, give up, else get sleep time. */
1535  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1536  if (cur_timeout <= 0)
1537  {
1538  *timed_out = true;
1539  failed = true;
1540  goto exit;
1541  }
1542 
1543  /* first time, allocate or get the custom wait event */
1544  if (pgfdw_we_cleanup_result == 0)
1545  pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1546 
1547  /* Sleep until there's something to do */
1551  PQsocket(conn),
1552  cur_timeout, pgfdw_we_cleanup_result);
1554 
1556 
1557  /* Data available in socket? */
1558  if (wc & WL_SOCKET_READABLE)
1559  {
1560  if (!PQconsumeInput(conn))
1561  {
1562  /* connection trouble */
1563  failed = true;
1564  goto exit;
1565  }
1566  }
1567  }
1568 
1569  res = PQgetResult(conn);
1570  if (res == NULL)
1571  break; /* query is complete */
1572 
1573  PQclear(last_res);
1574  last_res = res;
1575  }
1576 exit: ;
1577  }
1578  PG_CATCH();
1579  {
1580  PQclear(last_res);
1581  PG_RE_THROW();
1582  }
1583  PG_END_TRY();
1584 
1585  if (failed)
1586  PQclear(last_res);
1587  else
1588  *result = last_res;
1589  return failed;
1590 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
static uint32 pgfdw_we_cleanup_result
Definition: connection.c:91
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
struct Latch * MyLatch
Definition: globals.c:62
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
exit(1)
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122

References CHECK_FOR_INTERRUPTS, conn, exit(), GetCurrentTimestamp(), MyLatch, now(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_we_cleanup_result, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), res, ResetLatch(), TimestampDifferenceMilliseconds(), WaitEventExtensionNew(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by pgfdw_cancel_query_end(), and pgfdw_exec_cleanup_query_end().

◆ pgfdw_get_result()

PGresult* pgfdw_get_result ( PGconn conn)

Definition at line 870 of file connection.c.

871 {
873 }
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)

References conn, libpqsrv_get_result_last(), and pgfdw_we_get_result.

Referenced by create_cursor(), do_sql_command_end(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), pgfdw_exec_query(), and prepare_foreign_modify().

◆ pgfdw_inval_callback()

static void pgfdw_inval_callback ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1208 of file connection.c.

1209 {
1210  HASH_SEQ_STATUS scan;
1211  ConnCacheEntry *entry;
1212 
1213  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1214 
1215  /* ConnectionHash must exist already, if we're registered */
1216  hash_seq_init(&scan, ConnectionHash);
1217  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1218  {
1219  /* Ignore invalid entries */
1220  if (entry->conn == NULL)
1221  continue;
1222 
1223  /* hashvalue == 0 means a cache reset, must clear all state */
1224  if (hashvalue == 0 ||
1225  (cacheid == FOREIGNSERVEROID &&
1226  entry->server_hashvalue == hashvalue) ||
1227  (cacheid == USERMAPPINGOID &&
1228  entry->mapping_hashvalue == hashvalue))
1229  {
1230  /*
1231  * Close the connection immediately if it's not used yet in this
1232  * transaction. Otherwise mark it as invalid so that
1233  * pgfdw_xact_callback() can close it at the end of this
1234  * transaction.
1235  */
1236  if (entry->xact_depth == 0)
1237  {
1238  elog(DEBUG3, "discarding connection %p", entry->conn);
1239  disconnect_pg_server(entry);
1240  }
1241  else
1242  entry->invalidated = true;
1243  }
1244  }
1245 }

References Assert, ConnCacheEntry::conn, ConnectionHash, DEBUG3, disconnect_pg_server(), elog, hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, ConnCacheEntry::mapping_hashvalue, ConnCacheEntry::server_hashvalue, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ pgfdw_reject_incomplete_xact_state_change()

static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry entry)
static

Definition at line 1258 of file connection.c.

1259 {
1260  ForeignServer *server;
1261 
1262  /* nothing to do for inactive entries and entries of sane state */
1263  if (entry->conn == NULL || !entry->changing_xact_state)
1264  return;
1265 
1266  /* make sure this entry is inactive */
1267  disconnect_pg_server(entry);
1268 
1269  /* find server name to be shown in the message below */
1270  server = GetForeignServer(entry->serverid);
1271 
1272  ereport(ERROR,
1273  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1274  errmsg("connection to server \"%s\" was lost",
1275  server->servername)));
1276 }

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, disconnect_pg_server(), ereport, errcode(), errmsg(), ERROR, GetForeignServer(), ConnCacheEntry::serverid, and ForeignServer::servername.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_report_error()

void pgfdw_report_error ( int  elevel,
PGresult res,
PGconn conn,
bool  clear,
const char *  sql 
)

Definition at line 889 of file connection.c.

891 {
892  /* If requested, PGresult must be released before leaving this function. */
893  PG_TRY();
894  {
895  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
896  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
897  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
898  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
899  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
900  int sqlstate;
901 
902  if (diag_sqlstate)
903  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
904  diag_sqlstate[1],
905  diag_sqlstate[2],
906  diag_sqlstate[3],
907  diag_sqlstate[4]);
908  else
909  sqlstate = ERRCODE_CONNECTION_FAILURE;
910 
911  /*
912  * If we don't get a message from the PGresult, try the PGconn. This
913  * is needed because for connection-level failures, PQgetResult may
914  * just return NULL, not a PGresult at all.
915  */
916  if (message_primary == NULL)
917  message_primary = pchomp(PQerrorMessage(conn));
918 
919  ereport(elevel,
920  (errcode(sqlstate),
921  (message_primary != NULL && message_primary[0] != '\0') ?
922  errmsg_internal("%s", message_primary) :
923  errmsg("could not obtain message string for remote error"),
924  message_detail ? errdetail_internal("%s", message_detail) : 0,
925  message_hint ? errhint("%s", message_hint) : 0,
926  message_context ? errcontext("%s", message_context) : 0,
927  sql ? errcontext("remote SQL command: %s", sql) : 0));
928  }
929  PG_FINALLY();
930  {
931  if (clear)
932  PQclear(res);
933  }
934  PG_END_TRY();
935 }
int errhint(const char *fmt,...)
Definition: elog.c:1317
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define PG_FINALLY(...)
Definition: elog.h:388
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:59
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:57
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:58
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:63

References conn, ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, pchomp(), PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_FINALLY, PG_TRY, PQclear(), PQerrorMessage(), PQresultErrorField(), and res.

Referenced by close_cursor(), create_cursor(), deallocate_query(), do_sql_command_begin(), do_sql_command_end(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), fetch_more_data_begin(), get_remote_estimate(), pgfdw_exec_cleanup_query_begin(), pgfdw_exec_cleanup_query_end(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresForeignAsyncNotify(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), postgresReScanForeignScan(), and prepare_foreign_modify().

◆ pgfdw_reset_xact_state()

static void pgfdw_reset_xact_state ( ConnCacheEntry entry,
bool  toplevel 
)
static

Definition at line 1282 of file connection.c.

1283 {
1284  if (toplevel)
1285  {
1286  /* Reset state to show we're out of a transaction */
1287  entry->xact_depth = 0;
1288 
1289  /*
1290  * If the connection isn't in a good idle state, it is marked as
1291  * invalid or keep_connections option of its server is disabled, then
1292  * discard it to recover. Next GetConnection will open a new
1293  * connection.
1294  */
1295  if (PQstatus(entry->conn) != CONNECTION_OK ||
1296  PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1297  entry->changing_xact_state ||
1298  entry->invalidated ||
1299  !entry->keep_connections)
1300  {
1301  elog(DEBUG3, "discarding connection %p", entry->conn);
1302  disconnect_pg_server(entry);
1303  }
1304  }
1305  else
1306  {
1307  /* Reset state to show we're out of a subtransaction */
1308  entry->xact_depth--;
1309  }
1310 }
@ PQTRANS_IDLE
Definition: libpq-fe.h:142

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_OK, DEBUG3, disconnect_pg_server(), elog, ConnCacheEntry::invalidated, ConnCacheEntry::keep_connections, PQstatus(), PQTRANS_IDLE, PQtransactionStatus(), and ConnCacheEntry::xact_depth.

Referenced by pgfdw_finish_abort_cleanup(), pgfdw_finish_pre_commit_cleanup(), pgfdw_finish_pre_subcommit_cleanup(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_security_check()

static void pgfdw_security_check ( const char **  keywords,
const char **  values,
UserMapping user,
PGconn conn 
)
static

Definition at line 424 of file connection.c.

425 {
426  /* Superusers bypass the check */
427  if (superuser_arg(user->userid))
428  return;
429 
430 #ifdef ENABLE_GSS
431  /* Connected via GSSAPI with delegated credentials- all good. */
433  return;
434 #endif
435 
436  /* Ok if superuser set PW required false. */
438  return;
439 
440  /* Connected via PW, with PW required true, and provided non-empty PW. */
442  {
443  /* ok if params contain a non-empty password */
444  for (int i = 0; keywords[i] != NULL; i++)
445  {
446  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
447  return;
448  }
449  }
450 
451  ereport(ERROR,
452  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
453  errmsg("password or GSSAPI delegated credentials required"),
454  errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
455  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
456 }
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7278
int PQconnectionUsedGSSAPI(const PGconn *conn)
Definition: fe-connect.c:7289

References be_gssapi_get_delegation(), conn, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, i, MyProcPort, PQconnectionUsedGSSAPI(), PQconnectionUsedPassword(), superuser_arg(), user, UserMappingPasswordRequired(), and values.

Referenced by connect_pg_server().

◆ pgfdw_subxact_callback()

static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
)
static

Definition at line 1095 of file connection.c.

1097 {
1098  HASH_SEQ_STATUS scan;
1099  ConnCacheEntry *entry;
1100  int curlevel;
1101  List *pending_entries = NIL;
1102  List *cancel_requested = NIL;
1103 
1104  /* Nothing to do at subxact start, nor after commit. */
1105  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1106  event == SUBXACT_EVENT_ABORT_SUB))
1107  return;
1108 
1109  /* Quick exit if no connections were touched in this transaction. */
1110  if (!xact_got_connection)
1111  return;
1112 
1113  /*
1114  * Scan all connection cache entries to find open remote subtransactions
1115  * of the current level, and close them.
1116  */
1117  curlevel = GetCurrentTransactionNestLevel();
1118  hash_seq_init(&scan, ConnectionHash);
1119  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1120  {
1121  char sql[100];
1122 
1123  /*
1124  * We only care about connections with open remote subtransactions of
1125  * the current level.
1126  */
1127  if (entry->conn == NULL || entry->xact_depth < curlevel)
1128  continue;
1129 
1130  if (entry->xact_depth > curlevel)
1131  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1132  entry->xact_depth);
1133 
1134  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1135  {
1136  /*
1137  * If abort cleanup previously failed for this connection, we
1138  * can't issue any more commands against it.
1139  */
1141 
1142  /* Commit all remote subtransactions during pre-commit */
1143  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1144  entry->changing_xact_state = true;
1145  if (entry->parallel_commit)
1146  {
1147  do_sql_command_begin(entry->conn, sql);
1148  pending_entries = lappend(pending_entries, entry);
1149  continue;
1150  }
1151  do_sql_command(entry->conn, sql);
1152  entry->changing_xact_state = false;
1153  }
1154  else
1155  {
1156  /* Rollback all remote subtransactions during abort */
1157  if (entry->parallel_abort)
1158  {
1159  if (pgfdw_abort_cleanup_begin(entry, false,
1160  &pending_entries,
1161  &cancel_requested))
1162  continue;
1163  }
1164  else
1165  pgfdw_abort_cleanup(entry, false);
1166  }
1167 
1168  /* OK, we're outta that level of subtransaction */
1169  pgfdw_reset_xact_state(entry, false);
1170  }
1171 
1172  /* If there are any pending connections, finish cleaning them up */
1173  if (pending_entries || cancel_requested)
1174  {
1175  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1176  {
1177  Assert(cancel_requested == NIL);
1178  pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1179  }
1180  else
1181  {
1182  Assert(event == SUBXACT_EVENT_ABORT_SUB);
1183  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1184  false);
1185  }
1186  }
1187 }
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
Definition: connection.c:1807
static void pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, bool toplevel)
Definition: connection.c:1841
static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
Definition: connection.c:1675
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
Definition: connection.c:1601
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition: xact.h:145
@ SUBXACT_EVENT_ABORT_SUB
Definition: xact.h:144

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, ConnectionHash, do_sql_command(), do_sql_command_begin(), elog, ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), lappend(), NIL, ConnCacheEntry::parallel_abort, ConnCacheEntry::parallel_commit, pgfdw_abort_cleanup(), pgfdw_abort_cleanup_begin(), pgfdw_finish_abort_cleanup(), pgfdw_finish_pre_subcommit_cleanup(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_reset_xact_state(), snprintf, SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

◆ pgfdw_xact_callback()

static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
)
static

Definition at line 945 of file connection.c.

946 {
947  HASH_SEQ_STATUS scan;
948  ConnCacheEntry *entry;
949  List *pending_entries = NIL;
950  List *cancel_requested = NIL;
951 
952  /* Quick exit if no connections were touched in this transaction. */
953  if (!xact_got_connection)
954  return;
955 
956  /*
957  * Scan all connection cache entries to find open remote transactions, and
958  * close them.
959  */
961  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
962  {
963  PGresult *res;
964 
965  /* Ignore cache entry if no open connection right now */
966  if (entry->conn == NULL)
967  continue;
968 
969  /* If it has an open remote transaction, try to close it */
970  if (entry->xact_depth > 0)
971  {
972  elog(DEBUG3, "closing remote transaction on connection %p",
973  entry->conn);
974 
975  switch (event)
976  {
979 
980  /*
981  * If abort cleanup previously failed for this connection,
982  * we can't issue any more commands against it.
983  */
985 
986  /* Commit all remote transactions during pre-commit */
987  entry->changing_xact_state = true;
988  if (entry->parallel_commit)
989  {
990  do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
991  pending_entries = lappend(pending_entries, entry);
992  continue;
993  }
994  do_sql_command(entry->conn, "COMMIT TRANSACTION");
995  entry->changing_xact_state = false;
996 
997  /*
998  * If there were any errors in subtransactions, and we
999  * made prepared statements, do a DEALLOCATE ALL to make
1000  * sure we get rid of all prepared statements. This is
1001  * annoying and not terribly bulletproof, but it's
1002  * probably not worth trying harder.
1003  *
1004  * DEALLOCATE ALL only exists in 8.3 and later, so this
1005  * constrains how old a server postgres_fdw can
1006  * communicate with. We intentionally ignore errors in
1007  * the DEALLOCATE, so that we can hobble along to some
1008  * extent with older servers (leaking prepared statements
1009  * as we go; but we don't really support update operations
1010  * pre-8.3 anyway).
1011  */
1012  if (entry->have_prep_stmt && entry->have_error)
1013  {
1014  res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
1015  NULL);
1016  PQclear(res);
1017  }
1018  entry->have_prep_stmt = false;
1019  entry->have_error = false;
1020  break;
1022 
1023  /*
1024  * We disallow any remote transactions, since it's not
1025  * very reasonable to hold them open until the prepared
1026  * transaction is committed. For the moment, throw error
1027  * unconditionally; later we might allow read-only cases.
1028  * Note that the error will cause us to come right back
1029  * here with event == XACT_EVENT_ABORT, so we'll clean up
1030  * the connection state at that point.
1031  */
1032  ereport(ERROR,
1033  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1034  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1035  break;
1037  case XACT_EVENT_COMMIT:
1038  case XACT_EVENT_PREPARE:
1039  /* Pre-commit should have closed the open transaction */
1040  elog(ERROR, "missed cleaning up connection during pre-commit");
1041  break;
1043  case XACT_EVENT_ABORT:
1044  /* Rollback all remote transactions during abort */
1045  if (entry->parallel_abort)
1046  {
1047  if (pgfdw_abort_cleanup_begin(entry, true,
1048  &pending_entries,
1049  &cancel_requested))
1050  continue;
1051  }
1052  else
1053  pgfdw_abort_cleanup(entry, true);
1054  break;
1055  }
1056  }
1057 
1058  /* Reset state to show we're out of a transaction */
1059  pgfdw_reset_xact_state(entry, true);
1060  }
1061 
1062  /* If there are any pending connections, finish cleaning them up */
1063  if (pending_entries || cancel_requested)
1064  {
1065  if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1066  event == XACT_EVENT_PRE_COMMIT)
1067  {
1068  Assert(cancel_requested == NIL);
1069  pgfdw_finish_pre_commit_cleanup(pending_entries);
1070  }
1071  else
1072  {
1073  Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1074  event == XACT_EVENT_ABORT);
1075  pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1076  true);
1077  }
1078  }
1079 
1080  /*
1081  * Regardless of the event type, we can now mark ourselves as out of the
1082  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1083  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1084  */
1085  xact_got_connection = false;
1086 
1087  /* Also reset cursor numbering for next transaction */
1088  cursor_number = 0;
1089 }
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
Definition: connection.c:853
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Definition: connection.c:1733
@ XACT_EVENT_PRE_PREPARE
Definition: xact.h:135
@ XACT_EVENT_COMMIT
Definition: xact.h:128
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition: xact.h:134
@ XACT_EVENT_PARALLEL_COMMIT
Definition: xact.h:129
@ XACT_EVENT_ABORT
Definition: xact.h:130
@ XACT_EVENT_PRE_COMMIT
Definition: xact.h:133
@ XACT_EVENT_PARALLEL_ABORT
Definition: xact.h:131
@ XACT_EVENT_PREPARE
Definition: xact.h:132

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, ConnectionHash, cursor_number, DEBUG3, do_sql_command(), do_sql_command_begin(), elog, ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, lappend(), NIL, ConnCacheEntry::parallel_abort, ConnCacheEntry::parallel_commit, pgfdw_abort_cleanup(), pgfdw_abort_cleanup_begin(), pgfdw_exec_query(), pgfdw_finish_abort_cleanup(), pgfdw_finish_pre_commit_cleanup(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_reset_xact_state(), PQclear(), res, ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

◆ postgres_fdw_disconnect()

Datum postgres_fdw_disconnect ( PG_FUNCTION_ARGS  )

Definition at line 2205 of file connection.c.

2206 {
2207  ForeignServer *server;
2208  char *servername;
2209 
2210  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2211  server = GetForeignServerByName(servername, false);
2212 
2214 }
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:2254
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:182
char * text_to_cstring(const text *t)
Definition: varlena.c:217

References disconnect_cached_connections(), GetForeignServerByName(), PG_GETARG_TEXT_PP, PG_RETURN_BOOL, ForeignServer::serverid, and text_to_cstring().

◆ postgres_fdw_disconnect_all()

Datum postgres_fdw_disconnect_all ( PG_FUNCTION_ARGS  )

Definition at line 2226 of file connection.c.

2227 {
2229 }
#define InvalidOid
Definition: postgres_ext.h:36

References disconnect_cached_connections(), InvalidOid, and PG_RETURN_BOOL.

◆ postgres_fdw_get_connections()

Datum postgres_fdw_get_connections ( PG_FUNCTION_ARGS  )

Definition at line 2185 of file connection.c.

2186 {
2188 
2189  PG_RETURN_VOID();
2190 }
static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo, enum pgfdwVersion api_version)
Definition: connection.c:2028
#define PG_RETURN_VOID()
Definition: fmgr.h:349

References PG_RETURN_VOID, PGFDW_V1_1, and postgres_fdw_get_connections_internal().

◆ postgres_fdw_get_connections_1_2()

Datum postgres_fdw_get_connections_1_2 ( PG_FUNCTION_ARGS  )

Definition at line 2177 of file connection.c.

2178 {
2180 
2181  PG_RETURN_VOID();
2182 }

References PG_RETURN_VOID, PGFDW_V1_2, and postgres_fdw_get_connections_internal().

◆ postgres_fdw_get_connections_internal()

static void postgres_fdw_get_connections_internal ( FunctionCallInfo  fcinfo,
enum pgfdwVersion  api_version 
)
static

Definition at line 2028 of file connection.c.

2030 {
2031  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2032  HASH_SEQ_STATUS scan;
2033  ConnCacheEntry *entry;
2034 
2035  InitMaterializedSRF(fcinfo, 0);
2036 
2037  /* If cache doesn't exist, we return no records */
2038  if (!ConnectionHash)
2039  return;
2040 
2041  /* Check we have the expected number of output arguments */
2042  switch (rsinfo->setDesc->natts)
2043  {
2045  if (api_version != PGFDW_V1_1)
2046  elog(ERROR, "incorrect number of output arguments");
2047  break;
2049  if (api_version != PGFDW_V1_2)
2050  elog(ERROR, "incorrect number of output arguments");
2051  break;
2052  default:
2053  elog(ERROR, "incorrect number of output arguments");
2054  }
2055 
2056  hash_seq_init(&scan, ConnectionHash);
2057  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2058  {
2059  ForeignServer *server;
2061  bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2062  int i = 0;
2063 
2064  /* We only look for open remote connections */
2065  if (!entry->conn)
2066  continue;
2067 
2068  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2069 
2070  /*
2071  * The foreign server may have been dropped in current explicit
2072  * transaction. It is not possible to drop the server from another
2073  * session when the connection associated with it is in use in the
2074  * current transaction, if tried so, the drop query in another session
2075  * blocks until the current transaction finishes.
2076  *
2077  * Even though the server is dropped in the current transaction, the
2078  * cache can still have associated active connection entry, say we
2079  * call such connections dangling. Since we can not fetch the server
2080  * name from system catalogs for dangling connections, instead we show
2081  * NULL value for server name in output.
2082  *
2083  * We could have done better by storing the server name in the cache
2084  * entry instead of server oid so that it could be used in the output.
2085  * But the server name in each cache entry requires 64 bytes of
2086  * memory, which is huge, when there are many cached connections and
2087  * the use case i.e. dropping the foreign server within the explicit
2088  * current transaction seems rare. So, we chose to show NULL value for
2089  * server name in output.
2090  *
2091  * Such dangling connections get closed either in next use or at the
2092  * end of current explicit transaction in pgfdw_xact_callback.
2093  */
2094  if (!server)
2095  {
2096  /*
2097  * If the server has been dropped in the current explicit
2098  * transaction, then this entry would have been invalidated in
2099  * pgfdw_inval_callback at the end of drop server command. Note
2100  * that this connection would not have been closed in
2101  * pgfdw_inval_callback because it is still being used in the
2102  * current explicit transaction. So, assert that here.
2103  */
2104  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2105 
2106  /* Show null, if no server name was found */
2107  nulls[i++] = true;
2108  }
2109  else
2110  values[i++] = CStringGetTextDatum(server->servername);
2111 
2112  if (api_version >= PGFDW_V1_2)
2113  {
2114  HeapTuple tp;
2115 
2116  /* Use the system cache to obtain the user mapping */
2117  tp = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key));
2118 
2119  /*
2120  * Just like in the foreign server case, user mappings can also be
2121  * dropped in the current explicit transaction. Therefore, the
2122  * similar check as in the server case is required.
2123  */
2124  if (!HeapTupleIsValid(tp))
2125  {
2126  /*
2127  * If we reach here, this entry must have been invalidated in
2128  * pgfdw_inval_callback, same as in the server case.
2129  */
2130  Assert(entry->conn && entry->xact_depth > 0 &&
2131  entry->invalidated);
2132 
2133  nulls[i++] = true;
2134  }
2135  else
2136  {
2137  Oid userid;
2138 
2139  userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
2141  ReleaseSysCache(tp);
2142  }
2143  }
2144 
2145  values[i++] = BoolGetDatum(!entry->invalidated);
2146 
2147  if (api_version >= PGFDW_V1_2)
2148  {
2149  bool check_conn = PG_GETARG_BOOL(0);
2150 
2151  /* Is this connection used in the current transaction? */
2152  values[i++] = BoolGetDatum(entry->xact_depth > 0);
2153 
2154  /*
2155  * If a connection status check is requested and supported, return
2156  * whether the connection is closed. Otherwise, return NULL.
2157  */
2158  if (check_conn && pgfdw_conn_checkable())
2159  values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
2160  else
2161  nulls[i++] = true;
2162  }
2163 
2164  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2165  }
2166 }
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2
Definition: connection.c:2000
static int pgfdw_conn_check(PGconn *conn)
Definition: connection.c:2326
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1
Definition: connection.c:1999
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
Definition: connection.c:2001
static bool pgfdw_conn_checkable(void)
Definition: connection.c:2363
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define MappingUserName(userid)
Definition: foreign.h:20
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
FormData_pg_user_mapping * Form_pg_user_mapping
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
unsigned int Oid
Definition: postgres_ext.h:31
fmNodePtr resultinfo
Definition: fmgr.h:89
TupleDesc setDesc
Definition: execnodes.h:343
Tuplestorestate * setResult
Definition: execnodes.h:342
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784

References Assert, BoolGetDatum(), ConnectionHash, CStringGetTextDatum, elog, ERROR, FSV_MISSING_OK, GetForeignServerExtended(), GETSTRUCT, hash_seq_init(), hash_seq_search(), HeapTupleIsValid, i, InitMaterializedSRF(), MappingUserName, TupleDescData::natts, ObjectIdGetDatum(), PG_GETARG_BOOL, pgfdw_conn_check(), pgfdw_conn_checkable(), PGFDW_V1_1, PGFDW_V1_2, POSTGRES_FDW_GET_CONNECTIONS_COLS, POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1, POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2, ReleaseSysCache(), FunctionCallInfoBaseData::resultinfo, SearchSysCache1(), ForeignServer::servername, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, tuplestore_putvalues(), and values.

Referenced by postgres_fdw_get_connections(), and postgres_fdw_get_connections_1_2().

◆ ReleaseConnection()

void ReleaseConnection ( PGconn conn)

Definition at line 803 of file connection.c.

804 {
805  /*
806  * Currently, we don't actually track connection references because all
807  * cleanup is managed on a transaction or subtransaction basis instead. So
808  * there's nothing to do here.
809  */
810 }

Referenced by estimate_path_cost_size(), finish_foreign_modify(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndDirectModify(), postgresEndForeignScan(), postgresGetAnalyzeInfoForForeignTable(), and postgresImportForeignSchema().

◆ UserMappingPasswordRequired()

static bool UserMappingPasswordRequired ( UserMapping user)
static

Definition at line 612 of file connection.c.

613 {
614  ListCell *cell;
615 
616  foreach(cell, user->options)
617  {
618  DefElem *def = (DefElem *) lfirst(cell);
619 
620  if (strcmp(def->defname, "password_required") == 0)
621  return defGetBoolean(def);
622  }
623 
624  return true;
625 }

References defGetBoolean(), DefElem::defname, lfirst, and user.

Referenced by check_conn_params(), and pgfdw_security_check().

Variable Documentation

◆ ConnectionHash

◆ cursor_number

unsigned int cursor_number = 0
static

◆ pgfdw_we_cleanup_result

uint32 pgfdw_we_cleanup_result = 0
static

Definition at line 91 of file connection.c.

Referenced by pgfdw_get_cleanup_result().

◆ pgfdw_we_connect

uint32 pgfdw_we_connect = 0
static

Definition at line 92 of file connection.c.

Referenced by connect_pg_server().

◆ pgfdw_we_get_result

uint32 pgfdw_we_get_result = 0
static

Definition at line 93 of file connection.c.

Referenced by GetConnection(), and pgfdw_get_result().

◆ prep_stmt_number

unsigned int prep_stmt_number = 0
static

Definition at line 85 of file connection.c.

Referenced by GetPrepStmtNumber().

◆ xact_got_connection

bool xact_got_connection = false
static

Definition at line 88 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().