PostgreSQL Source Code  git master
connection.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postgres_fdw.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheEntry
 

Macros

#define POSTGRES_FDW_GET_CONNECTIONS_COLS   2
 

Typedefs

typedef Oid ConnCacheKey
 
typedef struct ConnCacheEntry ConnCacheEntry
 

Functions

 PG_FUNCTION_INFO_V1 (postgres_fdw_get_connections)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_disconnect)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_disconnect_all)
 
static void make_new_connection (ConnCacheEntry *entry, UserMapping *user)
 
static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
 
static void disconnect_pg_server (ConnCacheEntry *entry)
 
static void check_conn_params (const char **keywords, const char **values, UserMapping *user)
 
static void configure_remote_session (PGconn *conn)
 
static void begin_remote_xact (ConnCacheEntry *entry)
 
static void pgfdw_xact_callback (XactEvent event, void *arg)
 
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
 
static void pgfdw_inval_callback (Datum arg, int cacheid, uint32 hashvalue)
 
static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry *entry)
 
static bool pgfdw_cancel_query (PGconn *conn)
 
static bool pgfdw_exec_cleanup_query (PGconn *conn, const char *query, bool ignore_errors)
 
static bool pgfdw_get_cleanup_result (PGconn *conn, TimestampTz endtime, PGresult **result)
 
static bool UserMappingPasswordRequired (UserMapping *user)
 
static bool disconnect_cached_connections (Oid serverid)
 
PGconnGetConnection (UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
 
void do_sql_command (PGconn *conn, const char *sql)
 
void ReleaseConnection (PGconn *conn)
 
unsigned int GetCursorNumber (PGconn *conn)
 
unsigned int GetPrepStmtNumber (PGconn *conn)
 
PGresultpgfdw_exec_query (PGconn *conn, const char *query, PgFdwConnState *state)
 
PGresultpgfdw_get_result (PGconn *conn, const char *query)
 
void pgfdw_report_error (int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
 
Datum postgres_fdw_get_connections (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_disconnect (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_disconnect_all (PG_FUNCTION_ARGS)
 

Variables

static HTABConnectionHash = NULL
 
static unsigned int cursor_number = 0
 
static unsigned int prep_stmt_number = 0
 
static bool xact_got_connection = false
 

Macro Definition Documentation

◆ POSTGRES_FDW_GET_CONNECTIONS_COLS

#define POSTGRES_FDW_GET_CONNECTIONS_COLS   2

Typedef Documentation

◆ ConnCacheEntry

◆ ConnCacheKey

typedef Oid ConnCacheKey

Definition at line 49 of file connection.c.

Function Documentation

◆ begin_remote_xact()

static void begin_remote_xact ( ConnCacheEntry entry)
static

Definition at line 594 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog, GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

595 {
596  int curlevel = GetCurrentTransactionNestLevel();
597 
598  /* Start main transaction if we haven't yet */
599  if (entry->xact_depth <= 0)
600  {
601  const char *sql;
602 
603  elog(DEBUG3, "starting remote transaction on connection %p",
604  entry->conn);
605 
607  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
608  else
609  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
610  entry->changing_xact_state = true;
611  do_sql_command(entry->conn, sql);
612  entry->xact_depth = 1;
613  entry->changing_xact_state = false;
614  }
615 
616  /*
617  * If we're in a subtransaction, stack up savepoints to match our level.
618  * This ensures we can rollback just the desired effects when a
619  * subtransaction aborts.
620  */
621  while (entry->xact_depth < curlevel)
622  {
623  char sql[64];
624 
625  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
626  entry->changing_xact_state = true;
627  do_sql_command(entry->conn, sql);
628  entry->xact_depth++;
629  entry->changing_xact_state = false;
630  }
631 }
#define DEBUG3
Definition: elog.h:23
bool changing_xact_state
Definition: connection.c:60
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
PGconn * conn
Definition: connection.c:54
#define IsolationIsSerializable()
Definition: xact.h:52
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:571
#define elog(elevel,...)
Definition: elog.h:232
#define snprintf
Definition: port.h:216

◆ check_conn_params()

static void check_conn_params ( const char **  keywords,
const char **  values,
UserMapping user 
)
static

Definition at line 498 of file connection.c.

References ereport, errcode(), errdetail(), errmsg(), ERROR, i, superuser_arg(), UserMapping::userid, and UserMappingPasswordRequired().

Referenced by connect_pg_server().

499 {
500  int i;
501 
502  /* no check required if superuser */
503  if (superuser_arg(user->userid))
504  return;
505 
506  /* ok if params contain a non-empty password */
507  for (i = 0; keywords[i] != NULL; i++)
508  {
509  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
510  return;
511  }
512 
513  /* ok if the superuser explicitly said so at user mapping creation time */
514  if (!UserMappingPasswordRequired(user))
515  return;
516 
517  ereport(ERROR,
518  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
519  errmsg("password is required"),
520  errdetail("Non-superusers must provide a password in the user mapping.")));
521 }
int errcode(int sqlerrcode)
Definition: elog.c:698
Oid userid
Definition: foreign.h:48
#define ERROR
Definition: elog.h:46
int errdetail(const char *fmt,...)
Definition: elog.c:1042
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:475
#define ereport(elevel,...)
Definition: elog.h:157
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int errmsg(const char *fmt,...)
Definition: elog.c:909
int i

◆ configure_remote_session()

static void configure_remote_session ( PGconn conn)
static

Definition at line 535 of file connection.c.

References do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

536 {
537  int remoteversion = PQserverVersion(conn);
538 
539  /* Force the search path to contain only pg_catalog (see deparse.c) */
540  do_sql_command(conn, "SET search_path = pg_catalog");
541 
542  /*
543  * Set remote timezone; this is basically just cosmetic, since all
544  * transmitted and returned timestamptzs should specify a zone explicitly
545  * anyway. However it makes the regression test outputs more predictable.
546  *
547  * We don't risk setting remote zone equal to ours, since the remote
548  * server might use a different timezone database. Instead, use UTC
549  * (quoted, because very old servers are picky about case).
550  */
551  do_sql_command(conn, "SET timezone = 'UTC'");
552 
553  /*
554  * Set values needed to ensure unambiguous data output from remote. (This
555  * logic should match what pg_dump does. See also set_transmission_modes
556  * in postgres_fdw.c.)
557  */
558  do_sql_command(conn, "SET datestyle = ISO");
559  if (remoteversion >= 80400)
560  do_sql_command(conn, "SET intervalstyle = postgres");
561  if (remoteversion >= 90000)
562  do_sql_command(conn, "SET extra_float_digits = 3");
563  else
564  do_sql_command(conn, "SET extra_float_digits = 2");
565 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6717
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:571

◆ connect_pg_server()

static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
)
static

Definition at line 340 of file connection.c.

References AcquireExternalFD(), check_conn_params(), configure_remote_session(), ConnCacheEntry::conn, CONNECTION_OK, ereport, errcode(), errdetail(), errdetail_internal(), errhint(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), list_length(), ForeignServer::options, UserMapping::options, palloc(), pchomp(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQconnectdbParams(), PQconnectionUsedPassword(), PQerrorMessage(), PQfinish(), PQstatus(), ReleaseExternalFD(), ForeignServer::servername, superuser_arg(), UserMapping::userid, UserMappingPasswordRequired(), and values.

Referenced by make_new_connection().

341 {
342  PGconn *volatile conn = NULL;
343 
344  /*
345  * Use PG_TRY block to ensure closing connection on error.
346  */
347  PG_TRY();
348  {
349  const char **keywords;
350  const char **values;
351  int n;
352 
353  /*
354  * Construct connection params from generic options of ForeignServer
355  * and UserMapping. (Some of them might not be libpq options, in
356  * which case we'll just waste a few array slots.) Add 3 extra slots
357  * for fallback_application_name, client_encoding, end marker.
358  */
359  n = list_length(server->options) + list_length(user->options) + 3;
360  keywords = (const char **) palloc(n * sizeof(char *));
361  values = (const char **) palloc(n * sizeof(char *));
362 
363  n = 0;
364  n += ExtractConnectionOptions(server->options,
365  keywords + n, values + n);
367  keywords + n, values + n);
368 
369  /* Use "postgres_fdw" as fallback_application_name. */
370  keywords[n] = "fallback_application_name";
371  values[n] = "postgres_fdw";
372  n++;
373 
374  /* Set client_encoding so that libpq can convert encoding properly. */
375  keywords[n] = "client_encoding";
376  values[n] = GetDatabaseEncodingName();
377  n++;
378 
379  keywords[n] = values[n] = NULL;
380 
381  /* verify the set of connection parameters */
382  check_conn_params(keywords, values, user);
383 
384  /*
385  * We must obey fd.c's limit on non-virtual file descriptors. Assume
386  * that a PGconn represents one long-lived FD. (Doing this here also
387  * ensures that VFDs are closed if needed to make room.)
388  */
389  if (!AcquireExternalFD())
390  {
391 #ifndef WIN32 /* can't write #if within ereport() macro */
392  ereport(ERROR,
393  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
394  errmsg("could not connect to server \"%s\"",
395  server->servername),
396  errdetail("There are too many open files on the local server."),
397  errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
398 #else
399  ereport(ERROR,
400  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
401  errmsg("could not connect to server \"%s\"",
402  server->servername),
403  errdetail("There are too many open files on the local server."),
404  errhint("Raise the server's max_files_per_process setting.")));
405 #endif
406  }
407 
408  /* OK to make connection */
409  conn = PQconnectdbParams(keywords, values, false);
410 
411  if (!conn)
412  ReleaseExternalFD(); /* because the PG_CATCH block won't */
413 
414  if (!conn || PQstatus(conn) != CONNECTION_OK)
415  ereport(ERROR,
416  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
417  errmsg("could not connect to server \"%s\"",
418  server->servername),
419  errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
420 
421  /*
422  * Check that non-superuser has used password to establish connection;
423  * otherwise, he's piggybacking on the postgres server's user
424  * identity. See also dblink_security_check() in contrib/dblink and
425  * check_conn_params.
426  */
427  if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
429  ereport(ERROR,
430  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
431  errmsg("password is required"),
432  errdetail("Non-superuser cannot connect if the server does not request a password."),
433  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
434 
435  /* Prepare new session for use */
437 
438  pfree(keywords);
439  pfree(values);
440  }
441  PG_CATCH();
442  {
443  /* Release PGconn data structure if we managed to create one */
444  if (conn)
445  {
446  PQfinish(conn);
448  }
449  PG_RE_THROW();
450  }
451  PG_END_TRY();
452 
453  return conn;
454 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6727
static void configure_remote_session(PGconn *conn)
Definition: connection.c:535
int errhint(const char *fmt,...)
Definition: elog.c:1156
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:498
int errcode(int sqlerrcode)
Definition: elog.c:698
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4221
Oid userid
Definition: foreign.h:48
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:662
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1069
char * pchomp(const char *in)
Definition: mcxt.c:1327
void pfree(void *pointer)
Definition: mcxt.c:1169
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:357
#define ERROR
Definition: elog.h:46
PGconn * conn
Definition: streamutil.c:54
int errdetail(const char *fmt,...)
Definition: elog.c:1042
List * options
Definition: foreign.h:50
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:475
bool AcquireExternalFD(void)
Definition: fd.c:1095
#define ereport(elevel,...)
Definition: elog.h:157
#define PG_CATCH()
Definition: elog.h:323
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1216
static int list_length(const List *l)
Definition: pg_list.h:149
#define PG_RE_THROW()
Definition: elog.h:354
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6785
void ReleaseExternalFD(void)
Definition: fd.c:1148
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
char * servername
Definition: foreign.h:39
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6674
#define PG_TRY()
Definition: elog.h:313
List * options
Definition: foreign.h:42
#define PG_END_TRY()
Definition: elog.h:338

◆ disconnect_cached_connections()

static bool disconnect_cached_connections ( Oid  serverid)
static

Definition at line 1585 of file connection.c.

References Assert, ConnCacheEntry::conn, DEBUG3, disconnect_pg_server(), elog, ereport, errmsg(), FSV_MISSING_OK, GetForeignServerExtended(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, OidIsValid, ConnCacheEntry::serverid, ForeignServer::servername, WARNING, and ConnCacheEntry::xact_depth.

Referenced by postgres_fdw_disconnect(), and postgres_fdw_disconnect_all().

1586 {
1587  HASH_SEQ_STATUS scan;
1588  ConnCacheEntry *entry;
1589  bool all = !OidIsValid(serverid);
1590  bool result = false;
1591 
1592  /*
1593  * Connection cache hashtable has not been initialized yet in this
1594  * session, so return false.
1595  */
1596  if (!ConnectionHash)
1597  return false;
1598 
1599  hash_seq_init(&scan, ConnectionHash);
1600  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1601  {
1602  /* Ignore cache entry if no open connection right now. */
1603  if (!entry->conn)
1604  continue;
1605 
1606  if (all || entry->serverid == serverid)
1607  {
1608  /*
1609  * Emit a warning because the connection to close is used in the
1610  * current transaction and cannot be disconnected right now.
1611  */
1612  if (entry->xact_depth > 0)
1613  {
1614  ForeignServer *server;
1615 
1616  server = GetForeignServerExtended(entry->serverid,
1617  FSV_MISSING_OK);
1618 
1619  if (!server)
1620  {
1621  /*
1622  * If the foreign server was dropped while its connection
1623  * was used in the current transaction, the connection
1624  * must have been marked as invalid by
1625  * pgfdw_inval_callback at the end of DROP SERVER command.
1626  */
1627  Assert(entry->invalidated);
1628 
1629  ereport(WARNING,
1630  (errmsg("cannot close dropped server connection because it is still in use")));
1631  }
1632  else
1633  ereport(WARNING,
1634  (errmsg("cannot close connection for server \"%s\" because it is still in use",
1635  server->servername)));
1636  }
1637  else
1638  {
1639  elog(DEBUG3, "discarding connection %p", entry->conn);
1640  disconnect_pg_server(entry);
1641  result = true;
1642  }
1643  }
1644  }
1645 
1646  return result;
1647 }
#define DEBUG3
Definition: elog.h:23
#define OidIsValid(objectId)
Definition: c.h:710
bool invalidated
Definition: connection.c:61
#define FSV_MISSING_OK
Definition: foreign.h:61
#define WARNING
Definition: elog.h:40
static HTAB * ConnectionHash
Definition: connection.c:73
#define ereport(elevel,...)
Definition: elog.h:157
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:121
PGconn * conn
Definition: connection.c:54
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:460
#define Assert(condition)
Definition: c.h:804
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
int errmsg(const char *fmt,...)
Definition: elog.c:909
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:232

◆ disconnect_pg_server()

static void disconnect_pg_server ( ConnCacheEntry entry)
static

Definition at line 460 of file connection.c.

References ConnCacheEntry::conn, PQfinish(), and ReleaseExternalFD().

Referenced by disconnect_cached_connections(), GetConnection(), pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), and pgfdw_xact_callback().

461 {
462  if (entry->conn != NULL)
463  {
464  PQfinish(entry->conn);
465  entry->conn = NULL;
467  }
468 }
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4221
PGconn * conn
Definition: connection.c:54
void ReleaseExternalFD(void)
Definition: fd.c:1148

◆ do_sql_command()

void do_sql_command ( PGconn conn,
const char *  sql 
)

Definition at line 571 of file connection.c.

References ERROR, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), and PQsendQuery().

Referenced by begin_remote_xact(), configure_remote_session(), pgfdw_subxact_callback(), pgfdw_xact_callback(), and postgresExecForeignTruncate().

572 {
573  PGresult *res;
574 
575  if (!PQsendQuery(conn, sql))
576  pgfdw_report_error(ERROR, NULL, conn, false, sql);
577  res = pgfdw_get_result(conn, sql);
578  if (PQresultStatus(res) != PGRES_COMMAND_OK)
579  pgfdw_report_error(ERROR, res, conn, true, sql);
580  PQclear(res);
581 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3097
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1279
#define ERROR
Definition: elog.h:46
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:778
void PQclear(PGresult *res)
Definition: fe-exec.c:680
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:713

◆ GetConnection()

PGconn* GetConnection ( UserMapping user,
bool  will_prep_stmt,
PgFdwConnState **  state 
)

Definition at line 125 of file connection.c.

References Assert, begin_remote_xact(), CacheRegisterSyscacheCallback(), ConnCacheEntry::conn, CONNECTION_BAD, CopyErrorData(), CurrentMemoryContext, DEBUG3, disconnect_pg_server(), elog, HASHCTL::entrysize, ereport, errdetail_internal(), errmsg_internal(), FlushErrorState(), FOREIGNSERVEROID, FreeErrorData(), HASH_BLOBS, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, ConnCacheEntry::key, HASHCTL::keysize, make_new_connection(), MemoryContextSwitchTo(), pchomp(), PgFdwConnState::pendingAreq, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_subxact_callback(), pgfdw_xact_callback(), PQerrorMessage(), PQstatus(), process_pending_request(), RegisterSubXactCallback(), RegisterXactCallback(), ErrorData::sqlerrcode, ConnCacheEntry::state, UserMapping::umid, USERMAPPINGOID, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by create_foreign_modify(), dumpBlobs(), dumpDatabase(), dumpDatabaseConfig(), dumpTableData_copy(), estimate_path_cost_size(), expand_extension_name_patterns(), expand_foreign_server_name_patterns(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignScan(), postgresExecForeignTruncate(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

126 {
127  bool found;
128  bool retry = false;
129  ConnCacheEntry *entry;
132 
133  /* First time through, initialize connection cache hashtable */
134  if (ConnectionHash == NULL)
135  {
136  HASHCTL ctl;
137 
138  ctl.keysize = sizeof(ConnCacheKey);
139  ctl.entrysize = sizeof(ConnCacheEntry);
140  ConnectionHash = hash_create("postgres_fdw connections", 8,
141  &ctl,
143 
144  /*
145  * Register some callback functions that manage connection cleanup.
146  * This should be done just once in each backend.
147  */
154  }
155 
156  /* Set flag that we did GetConnection during the current transaction */
157  xact_got_connection = true;
158 
159  /* Create hash key for the entry. Assume no pad bytes in key struct */
160  key = user->umid;
161 
162  /*
163  * Find or create cached entry for requested connection.
164  */
165  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
166  if (!found)
167  {
168  /*
169  * We need only clear "conn" here; remaining fields will be filled
170  * later when "conn" is set.
171  */
172  entry->conn = NULL;
173  }
174 
175  /* Reject further use of connections which failed abort cleanup. */
177 
178  /*
179  * If the connection needs to be remade due to invalidation, disconnect as
180  * soon as we're out of all transactions.
181  */
182  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
183  {
184  elog(DEBUG3, "closing connection %p for option changes to take effect",
185  entry->conn);
186  disconnect_pg_server(entry);
187  }
188 
189  /*
190  * If cache entry doesn't have a connection, we have to establish a new
191  * connection. (If connect_pg_server throws an error, the cache entry
192  * will remain in a valid empty state, ie conn == NULL.)
193  */
194  if (entry->conn == NULL)
195  make_new_connection(entry, user);
196 
197  /*
198  * We check the health of the cached connection here when starting a new
199  * remote transaction. If a broken connection is detected, we try to
200  * reestablish a new connection later.
201  */
202  PG_TRY();
203  {
204  /* Process a pending asynchronous request if any. */
205  if (entry->state.pendingAreq)
207  /* Start a new transaction or subtransaction if needed. */
208  begin_remote_xact(entry);
209  }
210  PG_CATCH();
211  {
213  ErrorData *errdata = CopyErrorData();
214 
215  /*
216  * If connection failure is reported when starting a new remote
217  * transaction (not subtransaction), new connection will be
218  * reestablished later.
219  *
220  * After a broken connection is detected in libpq, any error other
221  * than connection failure (e.g., out-of-memory) can be thrown
222  * somewhere between return from libpq and the expected ereport() call
223  * in pgfdw_report_error(). In this case, since PQstatus() indicates
224  * CONNECTION_BAD, checking only PQstatus() causes the false detection
225  * of connection failure. To avoid this, we also verify that the
226  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
227  * checking only the sqlstate can cause another false detection
228  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
229  * for any libpq-originated error condition.
230  */
231  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
232  PQstatus(entry->conn) != CONNECTION_BAD ||
233  entry->xact_depth > 0)
234  {
235  MemoryContextSwitchTo(ecxt);
236  PG_RE_THROW();
237  }
238 
239  /* Clean up the error state */
240  FlushErrorState();
241  FreeErrorData(errdata);
242  errdata = NULL;
243 
244  retry = true;
245  }
246  PG_END_TRY();
247 
248  /*
249  * If a broken connection is detected, disconnect it, reestablish a new
250  * connection and retry a new remote transaction. If connection failure is
251  * reported again, we give up getting a connection.
252  */
253  if (retry)
254  {
255  Assert(entry->xact_depth == 0);
256 
257  ereport(DEBUG3,
258  (errmsg_internal("could not start remote transaction on connection %p",
259  entry->conn)),
260  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
261 
262  elog(DEBUG3, "closing connection %p to reestablish a new one",
263  entry->conn);
264  disconnect_pg_server(entry);
265 
266  if (entry->conn == NULL)
267  make_new_connection(entry, user);
268 
269  begin_remote_xact(entry);
270  }
271 
272  /* Remember if caller will prepare statements */
273  entry->have_prep_stmt |= will_prep_stmt;
274 
275  /* If caller needs access to the per-connection state, return it. */
276  if (state)
277  *state = &entry->state;
278 
279  return entry->conn;
280 }
Oid umid
Definition: foreign.h:47
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6727
#define HASH_ELEM
Definition: hsearch.h:95
int sqlerrcode
Definition: elog.h:381
#define DEBUG3
Definition: elog.h:23
struct ConnCacheEntry ConnCacheEntry
ErrorData * CopyErrorData(void)
Definition: elog.c:1560
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:76
bool have_prep_stmt
Definition: connection.c:58
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
void FlushErrorState(void)
Definition: elog.c:1654
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1069
char * pchomp(const char *in)
Definition: mcxt.c:1327
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1616
void process_pending_request(AsyncRequest *areq)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
PgFdwConnState state
Definition: connection.c:67
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
bool invalidated
Definition: connection.c:61
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1143
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:833
static HTAB * ConnectionHash
Definition: connection.c:73
uintptr_t Datum
Definition: postgres.h:411
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3582
Size keysize
Definition: hsearch.h:75
#define ereport(elevel,...)
Definition: elog.h:157
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:134
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define PG_CATCH()
Definition: elog.h:323
PGconn * conn
Definition: connection.c:54
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:460
#define Assert(condition)
Definition: c.h:804
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3527
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1193
static bool xact_got_connection
Definition: connection.c:80
#define PG_RE_THROW()
Definition: elog.h:354
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:287
#define elog(elevel,...)
Definition: elog.h:232
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:1026
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6674
#define PG_TRY()
Definition: elog.h:313
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:594
Oid ConnCacheKey
Definition: connection.c:49
#define PG_END_TRY()
Definition: elog.h:338

◆ GetCursorNumber()

unsigned int GetCursorNumber ( PGconn conn)

Definition at line 658 of file connection.c.

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

659 {
660  return ++cursor_number;
661 }
static unsigned int cursor_number
Definition: connection.c:76

◆ GetPrepStmtNumber()

unsigned int GetPrepStmtNumber ( PGconn conn)

Definition at line 672 of file connection.c.

References prep_stmt_number.

Referenced by prepare_foreign_modify().

673 {
674  return ++prep_stmt_number;
675 }
static unsigned int prep_stmt_number
Definition: connection.c:77

◆ make_new_connection()

static void make_new_connection ( ConnCacheEntry entry,
UserMapping user 
)
static

Definition at line 287 of file connection.c.

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, defGetBoolean(), DefElem::defname, elog, FOREIGNSERVEROID, GetForeignServer(), GetSysCacheHashValue1, ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, ConnCacheEntry::keep_connections, lfirst, ConnCacheEntry::mapping_hashvalue, ObjectIdGetDatum, ForeignServer::options, ConnCacheEntry::server_hashvalue, ForeignServer::serverid, UserMapping::serverid, ConnCacheEntry::serverid, ForeignServer::servername, ConnCacheEntry::state, UserMapping::umid, UserMapping::userid, USERMAPPINGOID, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

288 {
289  ForeignServer *server = GetForeignServer(user->serverid);
290  ListCell *lc;
291 
292  Assert(entry->conn == NULL);
293 
294  /* Reset all transient state fields, to be sure all are clean */
295  entry->xact_depth = 0;
296  entry->have_prep_stmt = false;
297  entry->have_error = false;
298  entry->changing_xact_state = false;
299  entry->invalidated = false;
300  entry->serverid = server->serverid;
301  entry->server_hashvalue =
303  ObjectIdGetDatum(server->serverid));
304  entry->mapping_hashvalue =
306  ObjectIdGetDatum(user->umid));
307  memset(&entry->state, 0, sizeof(entry->state));
308 
309  /*
310  * Determine whether to keep the connection that we're about to make here
311  * open even after the transaction using it ends, so that the subsequent
312  * transactions can re-use it.
313  *
314  * It's enough to determine this only when making new connection because
315  * all the connections to the foreign server whose keep_connections option
316  * is changed will be closed and re-made later.
317  *
318  * By default, all the connections to any foreign servers are kept open.
319  */
320  entry->keep_connections = true;
321  foreach(lc, server->options)
322  {
323  DefElem *def = (DefElem *) lfirst(lc);
324 
325  if (strcmp(def->defname, "keep_connections") == 0)
326  entry->keep_connections = defGetBoolean(def);
327  }
328 
329  /* Now try to make the connection */
330  entry->conn = connect_pg_server(server, user);
331 
332  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
333  entry->conn, server->servername, user->umid, user->userid);
334 }
Oid umid
Definition: foreign.h:47
#define DEBUG3
Definition: elog.h:23
bool have_prep_stmt
Definition: connection.c:58
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:202
uint32 server_hashvalue
Definition: connection.c:65
Oid userid
Definition: foreign.h:48
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
bool changing_xact_state
Definition: connection.c:60
bool keep_connections
Definition: connection.c:62
PgFdwConnState state
Definition: connection.c:67
bool invalidated
Definition: connection.c:61
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
uint32 mapping_hashvalue
Definition: connection.c:66
PGconn * conn
Definition: connection.c:54
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
Oid serverid
Definition: foreign.h:49
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:232
char * defname
Definition: parsenodes.h:746
List * options
Definition: foreign.h:42
Oid serverid
Definition: foreign.h:36
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:340

◆ PG_FUNCTION_INFO_V1() [1/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_get_connections  )

◆ PG_FUNCTION_INFO_V1() [2/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_disconnect  )

◆ PG_FUNCTION_INFO_V1() [3/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_disconnect_all  )

◆ pgfdw_cancel_query()

static bool pgfdw_cancel_query ( PGconn conn)
static

Definition at line 1223 of file connection.c.

References ereport, errcode(), errmsg(), GetCurrentTimestamp(), pgfdw_get_cleanup_result(), PQcancel(), PQclear(), PQfreeCancel(), PQgetCancel(), TimestampTzPlusMilliseconds, and WARNING.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

1224 {
1225  PGcancel *cancel;
1226  char errbuf[256];
1227  PGresult *result = NULL;
1228  TimestampTz endtime;
1229 
1230  /*
1231  * If it takes too long to cancel the query and discard the result, assume
1232  * the connection is dead.
1233  */
1235 
1236  /*
1237  * Issue cancel request. Unfortunately, there's no good way to limit the
1238  * amount of time that we might block inside PQgetCancel().
1239  */
1240  if ((cancel = PQgetCancel(conn)))
1241  {
1242  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1243  {
1244  ereport(WARNING,
1245  (errcode(ERRCODE_CONNECTION_FAILURE),
1246  errmsg("could not send cancel request: %s",
1247  errbuf)));
1248  PQfreeCancel(cancel);
1249  return false;
1250  }
1251  PQfreeCancel(cancel);
1252  }
1253 
1254  /* Get and discard the result of the query. */
1255  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1256  return false;
1257  PQclear(result);
1258 
1259  return true;
1260 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1324
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4365
int errcode(int sqlerrcode)
Definition: elog.c:698
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4342
#define WARNING
Definition: elog.h:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:157
void PQclear(PGresult *res)
Definition: fe-exec.c:680
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4497
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ pgfdw_exec_cleanup_query()

static bool pgfdw_exec_cleanup_query ( PGconn conn,
const char *  query,
bool  ignore_errors 
)
static

Definition at line 1270 of file connection.c.

References GetCurrentTimestamp(), pgfdw_get_cleanup_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), PQsendQuery(), TimestampTzPlusMilliseconds, and WARNING.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

1271 {
1272  PGresult *result = NULL;
1273  TimestampTz endtime;
1274 
1275  /*
1276  * If it takes too long to execute a cleanup query, assume the connection
1277  * is dead. It's fairly likely that this is why we aborted in the first
1278  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1279  * be too long.
1280  */
1282 
1283  /*
1284  * Submit a query. Since we don't use non-blocking mode, this also can
1285  * block. But its risk is relatively small, so we ignore that for now.
1286  */
1287  if (!PQsendQuery(conn, query))
1288  {
1289  pgfdw_report_error(WARNING, NULL, conn, false, query);
1290  return false;
1291  }
1292 
1293  /* Get the result of the query. */
1294  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1295  return false;
1296 
1297  /* Issue a warning if not successful. */
1298  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1299  {
1300  pgfdw_report_error(WARNING, result, conn, true, query);
1301  return ignore_errors;
1302  }
1303  PQclear(result);
1304 
1305  return true;
1306 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1324
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3097
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1279
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:778
#define WARNING
Definition: elog.h:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void PQclear(PGresult *res)
Definition: fe-exec.c:680

◆ pgfdw_exec_query()

PGresult* pgfdw_exec_query ( PGconn conn,
const char *  query,
PgFdwConnState state 
)

Definition at line 685 of file connection.c.

References ERROR, PgFdwConnState::pendingAreq, pgfdw_get_result(), pgfdw_report_error(), PQsendQuery(), and process_pending_request().

Referenced by close_cursor(), deallocate_query(), fetch_more_data(), get_remote_estimate(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresImportForeignSchema(), and postgresReScanForeignScan().

686 {
687  /* First, process a pending asynchronous request, if any. */
688  if (state && state->pendingAreq)
690 
691  /*
692  * Submit a query. Since we don't use non-blocking mode, this also can
693  * block. But its risk is relatively small, so we ignore that for now.
694  */
695  if (!PQsendQuery(conn, query))
696  pgfdw_report_error(ERROR, NULL, conn, false, query);
697 
698  /* Wait for the result. */
699  return pgfdw_get_result(conn, query);
700 }
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1279
#define ERROR
Definition: elog.h:46
void process_pending_request(AsyncRequest *areq)
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:778
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:134
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:713

◆ pgfdw_get_cleanup_result()

static bool pgfdw_get_cleanup_result ( PGconn conn,
TimestampTz  endtime,
PGresult **  result 
)
static

Definition at line 1324 of file connection.c.

References CHECK_FOR_INTERRUPTS, GetCurrentTimestamp(), MyLatch, now(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PG_WAIT_EXTENSION, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ResetLatch(), TimestampDifferenceMilliseconds(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by pgfdw_cancel_query(), and pgfdw_exec_cleanup_query().

1325 {
1326  volatile bool timed_out = false;
1327  PGresult *volatile last_res = NULL;
1328 
1329  /* In what follows, do not leak any PGresults on an error. */
1330  PG_TRY();
1331  {
1332  for (;;)
1333  {
1334  PGresult *res;
1335 
1336  while (PQisBusy(conn))
1337  {
1338  int wc;
1340  long cur_timeout;
1341 
1342  /* If timeout has expired, give up, else get sleep time. */
1343  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1344  if (cur_timeout <= 0)
1345  {
1346  timed_out = true;
1347  goto exit;
1348  }
1349 
1350  /* Sleep until there's something to do */
1354  PQsocket(conn),
1355  cur_timeout, PG_WAIT_EXTENSION);
1357 
1359 
1360  /* Data available in socket? */
1361  if (wc & WL_SOCKET_READABLE)
1362  {
1363  if (!PQconsumeInput(conn))
1364  {
1365  /* connection trouble; treat the same as a timeout */
1366  timed_out = true;
1367  goto exit;
1368  }
1369  }
1370  }
1371 
1372  res = PQgetResult(conn);
1373  if (res == NULL)
1374  break; /* query is complete */
1375 
1376  PQclear(last_res);
1377  last_res = res;
1378  }
1379 exit: ;
1380  }
1381  PG_CATCH();
1382  {
1383  PQclear(last_res);
1384  PG_RE_THROW();
1385  }
1386  PG_END_TRY();
1387 
1388  if (timed_out)
1389  PQclear(last_res);
1390  else
1391  *result = last_res;
1392  return timed_out;
1393 }
#define WL_TIMEOUT
Definition: latch.h:128
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
#define PG_WAIT_EXTENSION
Definition: wait_event.h:23
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:126
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1853
void PQclear(PGresult *res)
Definition: fe-exec.c:680
#define PG_CATCH()
Definition: elog.h:323
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1900
#define PG_RE_THROW()
Definition: elog.h:354
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
#define PG_TRY()
Definition: elog.h:313
#define WL_LATCH_SET
Definition: latch.h:125
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6745
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1927
#define PG_END_TRY()
Definition: elog.h:338
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1693

◆ pgfdw_get_result()

PGresult* pgfdw_get_result ( PGconn conn,
const char *  query 
)

Definition at line 713 of file connection.c.

References CHECK_FOR_INTERRUPTS, ERROR, MyLatch, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PG_WAIT_EXTENSION, pgfdw_report_error(), PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ResetLatch(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by create_cursor(), do_sql_command(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), pgfdw_exec_query(), and prepare_foreign_modify().

714 {
715  PGresult *volatile last_res = NULL;
716 
717  /* In what follows, do not leak any PGresults on an error. */
718  PG_TRY();
719  {
720  for (;;)
721  {
722  PGresult *res;
723 
724  while (PQisBusy(conn))
725  {
726  int wc;
727 
728  /* Sleep until there's something to do */
732  PQsocket(conn),
733  -1L, PG_WAIT_EXTENSION);
735 
737 
738  /* Data available in socket? */
739  if (wc & WL_SOCKET_READABLE)
740  {
741  if (!PQconsumeInput(conn))
742  pgfdw_report_error(ERROR, NULL, conn, false, query);
743  }
744  }
745 
746  res = PQgetResult(conn);
747  if (res == NULL)
748  break; /* query is complete */
749 
750  PQclear(last_res);
751  last_res = res;
752  }
753  }
754  PG_CATCH();
755  {
756  PQclear(last_res);
757  PG_RE_THROW();
758  }
759  PG_END_TRY();
760 
761  return last_res;
762 }
#define PG_WAIT_EXTENSION
Definition: wait_event.h:23
#define WL_SOCKET_READABLE
Definition: latch.h:126
void ResetLatch(Latch *latch)
Definition: latch.c:660
#define ERROR
Definition: elog.h:46
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:778
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1853
void PQclear(PGresult *res)
Definition: fe-exec.c:680
#define PG_CATCH()
Definition: elog.h:323
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1900
#define PG_RE_THROW()
Definition: elog.h:354
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
#define PG_TRY()
Definition: elog.h:313
#define WL_LATCH_SET
Definition: latch.h:125
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6745
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1927
#define PG_END_TRY()
Definition: elog.h:338
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

◆ pgfdw_inval_callback()

static void pgfdw_inval_callback ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1143 of file connection.c.

References Assert, ConnCacheEntry::conn, DEBUG3, disconnect_pg_server(), elog, FOREIGNSERVEROID, hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, ConnCacheEntry::mapping_hashvalue, ConnCacheEntry::server_hashvalue, USERMAPPINGOID, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

1144 {
1145  HASH_SEQ_STATUS scan;
1146  ConnCacheEntry *entry;
1147 
1148  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1149 
1150  /* ConnectionHash must exist already, if we're registered */
1151  hash_seq_init(&scan, ConnectionHash);
1152  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1153  {
1154  /* Ignore invalid entries */
1155  if (entry->conn == NULL)
1156  continue;
1157 
1158  /* hashvalue == 0 means a cache reset, must clear all state */
1159  if (hashvalue == 0 ||
1160  (cacheid == FOREIGNSERVEROID &&
1161  entry->server_hashvalue == hashvalue) ||
1162  (cacheid == USERMAPPINGOID &&
1163  entry->mapping_hashvalue == hashvalue))
1164  {
1165  /*
1166  * Close the connection immediately if it's not used yet in this
1167  * transaction. Otherwise mark it as invalid so that
1168  * pgfdw_xact_callback() can close it at the end of this
1169  * transaction.
1170  */
1171  if (entry->xact_depth == 0)
1172  {
1173  elog(DEBUG3, "discarding connection %p", entry->conn);
1174  disconnect_pg_server(entry);
1175  }
1176  else
1177  entry->invalidated = true;
1178  }
1179  }
1180 }
#define DEBUG3
Definition: elog.h:23
uint32 server_hashvalue
Definition: connection.c:65
bool invalidated
Definition: connection.c:61
static HTAB * ConnectionHash
Definition: connection.c:73
uint32 mapping_hashvalue
Definition: connection.c:66
PGconn * conn
Definition: connection.c:54
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:460
#define Assert(condition)
Definition: c.h:804
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
#define elog(elevel,...)
Definition: elog.h:232

◆ pgfdw_reject_incomplete_xact_state_change()

static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry entry)
static

Definition at line 1193 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, disconnect_pg_server(), ereport, errcode(), errmsg(), ERROR, GetForeignServer(), ConnCacheEntry::serverid, and ForeignServer::servername.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

1194 {
1195  ForeignServer *server;
1196 
1197  /* nothing to do for inactive entries and entries of sane state */
1198  if (entry->conn == NULL || !entry->changing_xact_state)
1199  return;
1200 
1201  /* make sure this entry is inactive */
1202  disconnect_pg_server(entry);
1203 
1204  /* find server name to be shown in the message below */
1205  server = GetForeignServer(entry->serverid);
1206 
1207  ereport(ERROR,
1208  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1209  errmsg("connection to server \"%s\" was lost",
1210  server->servername)));
1211 }
int errcode(int sqlerrcode)
Definition: elog.c:698
#define ERROR
Definition: elog.h:46
bool changing_xact_state
Definition: connection.c:60
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
#define ereport(elevel,...)
Definition: elog.h:157
PGconn * conn
Definition: connection.c:54
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:460
int errmsg(const char *fmt,...)
Definition: elog.c:909
char * servername
Definition: foreign.h:39

◆ pgfdw_report_error()

void pgfdw_report_error ( int  elevel,
PGresult res,
PGconn conn,
bool  clear,
const char *  sql 
)

Definition at line 778 of file connection.c.

References ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, pchomp(), PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_FINALLY, PG_TRY, PQclear(), PQerrorMessage(), and PQresultErrorField().

Referenced by close_cursor(), create_cursor(), deallocate_query(), do_sql_command(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), fetch_more_data_begin(), get_remote_estimate(), pgfdw_exec_cleanup_query(), pgfdw_exec_query(), pgfdw_get_result(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresForeignAsyncNotify(), postgresImportForeignSchema(), postgresReScanForeignScan(), and prepare_foreign_modify().

780 {
781  /* If requested, PGresult must be released before leaving this function. */
782  PG_TRY();
783  {
784  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
785  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
786  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
787  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
788  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
789  int sqlstate;
790 
791  if (diag_sqlstate)
792  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
793  diag_sqlstate[1],
794  diag_sqlstate[2],
795  diag_sqlstate[3],
796  diag_sqlstate[4]);
797  else
798  sqlstate = ERRCODE_CONNECTION_FAILURE;
799 
800  /*
801  * If we don't get a message from the PGresult, try the PGconn. This
802  * is needed because for connection-level failures, PQexec may just
803  * return NULL, not a PGresult at all.
804  */
805  if (message_primary == NULL)
806  message_primary = pchomp(PQerrorMessage(conn));
807 
808  ereport(elevel,
809  (errcode(sqlstate),
810  message_primary ? errmsg_internal("%s", message_primary) :
811  errmsg("could not obtain message string for remote error"),
812  message_detail ? errdetail_internal("%s", message_detail) : 0,
813  message_hint ? errhint("%s", message_hint) : 0,
814  message_context ? errcontext("%s", message_context) : 0,
815  sql ? errcontext("remote SQL command: %s", sql) : 0));
816  }
817  PG_FINALLY();
818  {
819  if (clear)
820  PQclear(res);
821  }
822  PG_END_TRY();
823 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6727
int errhint(const char *fmt,...)
Definition: elog.c:1156
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:64
int errcode(int sqlerrcode)
Definition: elog.c:698
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1069
char * pchomp(const char *in)
Definition: mcxt.c:1327
static int elevel
Definition: vacuumlazy.c:400
#define PG_FINALLY()
Definition: elog.h:330
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
#define ereport(elevel,...)
Definition: elog.h:157
void PQclear(PGresult *res)
Definition: fe-exec.c:680
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3152
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define errcontext
Definition: elog.h:204
#define PG_TRY()
Definition: elog.h:313
#define PG_END_TRY()
Definition: elog.h:338
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64

◆ pgfdw_subxact_callback()

static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
)
static

Definition at line 1026 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, do_sql_command(), elog, ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, in_error_recursion_trouble(), pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), pgfdw_reject_incomplete_xact_state_change(), PQTRANS_ACTIVE, PQtransactionStatus(), snprintf, SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

1028 {
1029  HASH_SEQ_STATUS scan;
1030  ConnCacheEntry *entry;
1031  int curlevel;
1032 
1033  /* Nothing to do at subxact start, nor after commit. */
1034  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1035  event == SUBXACT_EVENT_ABORT_SUB))
1036  return;
1037 
1038  /* Quick exit if no connections were touched in this transaction. */
1039  if (!xact_got_connection)
1040  return;
1041 
1042  /*
1043  * Scan all connection cache entries to find open remote subtransactions
1044  * of the current level, and close them.
1045  */
1046  curlevel = GetCurrentTransactionNestLevel();
1047  hash_seq_init(&scan, ConnectionHash);
1048  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1049  {
1050  char sql[100];
1051 
1052  /*
1053  * We only care about connections with open remote subtransactions of
1054  * the current level.
1055  */
1056  if (entry->conn == NULL || entry->xact_depth < curlevel)
1057  continue;
1058 
1059  if (entry->xact_depth > curlevel)
1060  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1061  entry->xact_depth);
1062 
1063  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1064  {
1065  /*
1066  * If abort cleanup previously failed for this connection, we
1067  * can't issue any more commands against it.
1068  */
1070 
1071  /* Commit all remote subtransactions during pre-commit */
1072  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1073  entry->changing_xact_state = true;
1074  do_sql_command(entry->conn, sql);
1075  entry->changing_xact_state = false;
1076  }
1077  else if (in_error_recursion_trouble())
1078  {
1079  /*
1080  * Don't try to clean up the connection if we're already in error
1081  * recursion trouble.
1082  */
1083  entry->changing_xact_state = true;
1084  }
1085  else if (!entry->changing_xact_state)
1086  {
1087  bool abort_cleanup_failure = false;
1088 
1089  /* Remember that abort cleanup is in progress. */
1090  entry->changing_xact_state = true;
1091 
1092  /* Assume we might have lost track of prepared statements */
1093  entry->have_error = true;
1094 
1095  /*
1096  * If a command has been submitted to the remote server by using
1097  * an asynchronous execution function, the command might not have
1098  * yet completed. Check to see if a command is still being
1099  * processed by the remote server, and if so, request cancellation
1100  * of the command.
1101  */
1102  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1103  !pgfdw_cancel_query(entry->conn))
1104  abort_cleanup_failure = true;
1105  else
1106  {
1107  /* Rollback all remote subtransactions during abort */
1108  snprintf(sql, sizeof(sql),
1109  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
1110  curlevel, curlevel);
1111  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1112  abort_cleanup_failure = true;
1113  }
1114 
1115  /* Disarm changing_xact_state if it all worked. */
1116  entry->changing_xact_state = abort_cleanup_failure;
1117  }
1118 
1119  /* OK, we're outta that level of subtransaction */
1120  entry->xact_depth--;
1121  }
1122 }
#define ERROR
Definition: elog.h:46
bool changing_xact_state
Definition: connection.c:60
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6682
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1270
static HTAB * ConnectionHash
Definition: connection.c:73
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1223
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
bool in_error_recursion_trouble(void)
Definition: elog.c:291
PGconn * conn
Definition: connection.c:54
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1193
static bool xact_got_connection
Definition: connection.c:80
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:571
#define elog(elevel,...)
Definition: elog.h:232
#define snprintf
Definition: port.h:216

◆ pgfdw_xact_callback()

static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
)
static

Definition at line 833 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_OK, cursor_number, DEBUG3, disconnect_pg_server(), do_sql_command(), elog, ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, in_error_recursion_trouble(), ConnCacheEntry::invalidated, ConnCacheEntry::keep_connections, pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), pgfdw_reject_incomplete_xact_state_change(), PQclear(), PQexec(), PQstatus(), PQTRANS_ACTIVE, PQTRANS_IDLE, PQtransactionStatus(), ConnCacheEntry::state, ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

834 {
835  HASH_SEQ_STATUS scan;
836  ConnCacheEntry *entry;
837 
838  /* Quick exit if no connections were touched in this transaction. */
839  if (!xact_got_connection)
840  return;
841 
842  /*
843  * Scan all connection cache entries to find open remote transactions, and
844  * close them.
845  */
847  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
848  {
849  PGresult *res;
850 
851  /* Ignore cache entry if no open connection right now */
852  if (entry->conn == NULL)
853  continue;
854 
855  /* If it has an open remote transaction, try to close it */
856  if (entry->xact_depth > 0)
857  {
858  bool abort_cleanup_failure = false;
859 
860  elog(DEBUG3, "closing remote transaction on connection %p",
861  entry->conn);
862 
863  switch (event)
864  {
867 
868  /*
869  * If abort cleanup previously failed for this connection,
870  * we can't issue any more commands against it.
871  */
873 
874  /* Commit all remote transactions during pre-commit */
875  entry->changing_xact_state = true;
876  do_sql_command(entry->conn, "COMMIT TRANSACTION");
877  entry->changing_xact_state = false;
878 
879  /*
880  * If there were any errors in subtransactions, and we
881  * made prepared statements, do a DEALLOCATE ALL to make
882  * sure we get rid of all prepared statements. This is
883  * annoying and not terribly bulletproof, but it's
884  * probably not worth trying harder.
885  *
886  * DEALLOCATE ALL only exists in 8.3 and later, so this
887  * constrains how old a server postgres_fdw can
888  * communicate with. We intentionally ignore errors in
889  * the DEALLOCATE, so that we can hobble along to some
890  * extent with older servers (leaking prepared statements
891  * as we go; but we don't really support update operations
892  * pre-8.3 anyway).
893  */
894  if (entry->have_prep_stmt && entry->have_error)
895  {
896  res = PQexec(entry->conn, "DEALLOCATE ALL");
897  PQclear(res);
898  }
899  entry->have_prep_stmt = false;
900  entry->have_error = false;
901  break;
903 
904  /*
905  * We disallow any remote transactions, since it's not
906  * very reasonable to hold them open until the prepared
907  * transaction is committed. For the moment, throw error
908  * unconditionally; later we might allow read-only cases.
909  * Note that the error will cause us to come right back
910  * here with event == XACT_EVENT_ABORT, so we'll clean up
911  * the connection state at that point.
912  */
913  ereport(ERROR,
914  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
915  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
916  break;
918  case XACT_EVENT_COMMIT:
919  case XACT_EVENT_PREPARE:
920  /* Pre-commit should have closed the open transaction */
921  elog(ERROR, "missed cleaning up connection during pre-commit");
922  break;
924  case XACT_EVENT_ABORT:
925 
926  /*
927  * Don't try to clean up the connection if we're already
928  * in error recursion trouble.
929  */
931  entry->changing_xact_state = true;
932 
933  /*
934  * If connection is already unsalvageable, don't touch it
935  * further.
936  */
937  if (entry->changing_xact_state)
938  break;
939 
940  /*
941  * Mark this connection as in the process of changing
942  * transaction state.
943  */
944  entry->changing_xact_state = true;
945 
946  /* Assume we might have lost track of prepared statements */
947  entry->have_error = true;
948 
949  /*
950  * If a command has been submitted to the remote server by
951  * using an asynchronous execution function, the command
952  * might not have yet completed. Check to see if a
953  * command is still being processed by the remote server,
954  * and if so, request cancellation of the command.
955  */
956  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
957  !pgfdw_cancel_query(entry->conn))
958  {
959  /* Unable to cancel running query. */
960  abort_cleanup_failure = true;
961  }
962  else if (!pgfdw_exec_cleanup_query(entry->conn,
963  "ABORT TRANSACTION",
964  false))
965  {
966  /* Unable to abort remote transaction. */
967  abort_cleanup_failure = true;
968  }
969  else if (entry->have_prep_stmt && entry->have_error &&
971  "DEALLOCATE ALL",
972  true))
973  {
974  /* Trouble clearing prepared statements. */
975  abort_cleanup_failure = true;
976  }
977  else
978  {
979  entry->have_prep_stmt = false;
980  entry->have_error = false;
981  /* Also reset per-connection state */
982  memset(&entry->state, 0, sizeof(entry->state));
983  }
984 
985  /* Disarm changing_xact_state if it all worked. */
986  entry->changing_xact_state = abort_cleanup_failure;
987  break;
988  }
989  }
990 
991  /* Reset state to show we're out of a transaction */
992  entry->xact_depth = 0;
993 
994  /*
995  * If the connection isn't in a good idle state, it is marked as
996  * invalid or keep_connections option of its server is disabled, then
997  * discard it to recover. Next GetConnection will open a new
998  * connection.
999  */
1000  if (PQstatus(entry->conn) != CONNECTION_OK ||
1001  PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1002  entry->changing_xact_state ||
1003  entry->invalidated ||
1004  !entry->keep_connections)
1005  {
1006  elog(DEBUG3, "discarding connection %p", entry->conn);
1007  disconnect_pg_server(entry);
1008  }
1009  }
1010 
1011  /*
1012  * Regardless of the event type, we can now mark ourselves as out of the
1013  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1014  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1015  */
1016  xact_got_connection = false;
1017 
1018  /* Also reset cursor numbering for next transaction */
1019  cursor_number = 0;
1020 }
#define DEBUG3
Definition: elog.h:23
int errcode(int sqlerrcode)
Definition: elog.c:698
bool have_prep_stmt
Definition: connection.c:58
#define ERROR
Definition: elog.h:46
bool changing_xact_state
Definition: connection.c:60
bool keep_connections
Definition: connection.c:62
PgFdwConnState state
Definition: connection.c:67
static unsigned int cursor_number
Definition: connection.c:76
bool invalidated
Definition: connection.c:61
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6682
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1270
static HTAB * ConnectionHash
Definition: connection.c:73
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1223
#define ereport(elevel,...)
Definition: elog.h:157
void PQclear(PGresult *res)
Definition: fe-exec.c:680
bool in_error_recursion_trouble(void)
Definition: elog.c:291
PGconn * conn
Definition: connection.c:54
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:460
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1193
static bool xact_got_connection
Definition: connection.c:80
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
int errmsg(const char *fmt,...)
Definition: elog.c:909
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:571
#define elog(elevel,...)
Definition: elog.h:232
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2142
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6674

◆ postgres_fdw_disconnect()

Datum postgres_fdw_disconnect ( PG_FUNCTION_ARGS  )

Definition at line 1536 of file connection.c.

References disconnect_cached_connections(), GetForeignServerByName(), PG_GETARG_TEXT_PP, PG_RETURN_BOOL, ForeignServer::serverid, and text_to_cstring().

1537 {
1538  ForeignServer *server;
1539  char *servername;
1540 
1541  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
1542  server = GetForeignServerByName(servername, false);
1543 
1545 }
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:180
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:1585
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
char * text_to_cstring(const text *t)
Definition: varlena.c:223
Oid serverid
Definition: foreign.h:36

◆ postgres_fdw_disconnect_all()

Datum postgres_fdw_disconnect_all ( PG_FUNCTION_ARGS  )

Definition at line 1557 of file connection.c.

References disconnect_cached_connections(), InvalidOid, and PG_RETURN_BOOL.

1558 {
1560 }
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:1585
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
#define InvalidOid
Definition: postgres_ext.h:36

◆ postgres_fdw_get_connections()

Datum postgres_fdw_get_connections ( PG_FUNCTION_ARGS  )

Definition at line 1409 of file connection.c.

References ReturnSetInfo::allowedModes, Assert, BoolGetDatum, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, FSV_MISSING_OK, get_call_result_type(), GetForeignServerExtended(), hash_seq_init(), hash_seq_search(), IsA, MemoryContextSwitchTo(), MemSet, PG_RETURN_VOID, POSTGRES_FDW_GET_CONNECTIONS_COLS, ReturnSetInfo::returnMode, ForeignServer::servername, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, and work_mem.

1410 {
1411 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1412  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1413  TupleDesc tupdesc;
1414  Tuplestorestate *tupstore;
1415  MemoryContext per_query_ctx;
1416  MemoryContext oldcontext;
1417  HASH_SEQ_STATUS scan;
1418  ConnCacheEntry *entry;
1419 
1420  /* check to see if caller supports us returning a tuplestore */
1421  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1422  ereport(ERROR,
1423  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1424  errmsg("set-valued function called in context that cannot accept a set")));
1425  if (!(rsinfo->allowedModes & SFRM_Materialize))
1426  ereport(ERROR,
1427  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1428  errmsg("materialize mode required, but it is not allowed in this context")));
1429 
1430  /* Build a tuple descriptor for our result type */
1431  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1432  elog(ERROR, "return type must be a row type");
1433 
1434  /* Build tuplestore to hold the result rows */
1435  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1436  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1437 
1438  tupstore = tuplestore_begin_heap(true, false, work_mem);
1439  rsinfo->returnMode = SFRM_Materialize;
1440  rsinfo->setResult = tupstore;
1441  rsinfo->setDesc = tupdesc;
1442 
1443  MemoryContextSwitchTo(oldcontext);
1444 
1445  /* If cache doesn't exist, we return no records */
1446  if (!ConnectionHash)
1447  {
1448  /* clean up and return the tuplestore */
1449  tuplestore_donestoring(tupstore);
1450 
1451  PG_RETURN_VOID();
1452  }
1453 
1454  hash_seq_init(&scan, ConnectionHash);
1455  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1456  {
1457  ForeignServer *server;
1460 
1461  /* We only look for open remote connections */
1462  if (!entry->conn)
1463  continue;
1464 
1465  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
1466 
1467  MemSet(values, 0, sizeof(values));
1468  MemSet(nulls, 0, sizeof(nulls));
1469 
1470  /*
1471  * The foreign server may have been dropped in current explicit
1472  * transaction. It is not possible to drop the server from another
1473  * session when the connection associated with it is in use in the
1474  * current transaction, if tried so, the drop query in another session
1475  * blocks until the current transaction finishes.
1476  *
1477  * Even though the server is dropped in the current transaction, the
1478  * cache can still have associated active connection entry, say we
1479  * call such connections dangling. Since we can not fetch the server
1480  * name from system catalogs for dangling connections, instead we show
1481  * NULL value for server name in output.
1482  *
1483  * We could have done better by storing the server name in the cache
1484  * entry instead of server oid so that it could be used in the output.
1485  * But the server name in each cache entry requires 64 bytes of
1486  * memory, which is huge, when there are many cached connections and
1487  * the use case i.e. dropping the foreign server within the explicit
1488  * current transaction seems rare. So, we chose to show NULL value for
1489  * server name in output.
1490  *
1491  * Such dangling connections get closed either in next use or at the
1492  * end of current explicit transaction in pgfdw_xact_callback.
1493  */
1494  if (!server)
1495  {
1496  /*
1497  * If the server has been dropped in the current explicit
1498  * transaction, then this entry would have been invalidated in
1499  * pgfdw_inval_callback at the end of drop server command. Note
1500  * that this connection would not have been closed in
1501  * pgfdw_inval_callback because it is still being used in the
1502  * current explicit transaction. So, assert that here.
1503  */
1504  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
1505 
1506  /* Show null, if no server name was found */
1507  nulls[0] = true;
1508  }
1509  else
1510  values[0] = CStringGetTextDatum(server->servername);
1511 
1512  values[1] = BoolGetDatum(!entry->invalidated);
1513 
1514  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1515  }
1516 
1517  /* clean up and return the tuplestore */
1518  tuplestore_donestoring(tupstore);
1519 
1520  PG_RETURN_VOID();
1521 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int errcode(int sqlerrcode)
Definition: elog.c:698
#define MemSet(start, val, len)
Definition: c.h:1008
#define ERROR
Definition: elog.h:46
#define FSV_MISSING_OK
Definition: foreign.h:61
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
static HTAB * ConnectionHash
Definition: connection.c:73
uintptr_t Datum
Definition: postgres.h:411
int work_mem
Definition: globals.c:124
#define BoolGetDatum(X)
Definition: postgres.h:446
#define ereport(elevel,...)
Definition: elog.h:157
int allowedModes
Definition: execnodes.h:305
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:121
#define PG_RETURN_VOID()
Definition: fmgr.h:349
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
#define Assert(condition)
Definition: c.h:804
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
Tuplestorestate * setResult
Definition: execnodes.h:310
static Datum values[MAXATTR]
Definition: bootstrap.c:166
ExprContext * econtext
Definition: execnodes.h:303
TupleDesc setDesc
Definition: execnodes.h:311
int errmsg(const char *fmt,...)
Definition: elog.c:909
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:232
#define CStringGetTextDatum(s)
Definition: builtins.h:82
#define POSTGRES_FDW_GET_CONNECTIONS_COLS

◆ ReleaseConnection()

void ReleaseConnection ( PGconn conn)

Definition at line 637 of file connection.c.

Referenced by estimate_path_cost_size(), finish_foreign_modify(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndDirectModify(), postgresEndForeignScan(), and postgresImportForeignSchema().

638 {
639  /*
640  * Currently, we don't actually track connection references because all
641  * cleanup is managed on a transaction or subtransaction basis instead. So
642  * there's nothing to do here.
643  */
644 }

◆ UserMappingPasswordRequired()

static bool UserMappingPasswordRequired ( UserMapping user)
static

Definition at line 475 of file connection.c.

References defGetBoolean(), DefElem::defname, lfirst, and UserMapping::options.

Referenced by check_conn_params(), and connect_pg_server().

476 {
477  ListCell *cell;
478 
479  foreach(cell, user->options)
480  {
481  DefElem *def = (DefElem *) lfirst(cell);
482 
483  if (strcmp(def->defname, "password_required") == 0)
484  return defGetBoolean(def);
485  }
486 
487  return true;
488 }
bool defGetBoolean(DefElem *def)
Definition: define.c:111
List * options
Definition: foreign.h:50
#define lfirst(lc)
Definition: pg_list.h:169
char * defname
Definition: parsenodes.h:746

Variable Documentation

◆ ConnectionHash

HTAB* ConnectionHash = NULL
static

Definition at line 73 of file connection.c.

◆ cursor_number

unsigned int cursor_number = 0
static

Definition at line 76 of file connection.c.

Referenced by GetCursorNumber(), and pgfdw_xact_callback().

◆ prep_stmt_number

unsigned int prep_stmt_number = 0
static

Definition at line 77 of file connection.c.

Referenced by GetPrepStmtNumber().

◆ xact_got_connection

bool xact_got_connection = false
static

Definition at line 80 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().