PostgreSQL Source Code  git master
connection.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postgres_fdw.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheEntry
 

Macros

#define POSTGRES_FDW_GET_CONNECTIONS_COLS   2
 

Typedefs

typedef Oid ConnCacheKey
 
typedef struct ConnCacheEntry ConnCacheEntry
 

Functions

 PG_FUNCTION_INFO_V1 (postgres_fdw_get_connections)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_disconnect)
 
 PG_FUNCTION_INFO_V1 (postgres_fdw_disconnect_all)
 
static void make_new_connection (ConnCacheEntry *entry, UserMapping *user)
 
static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
 
static void disconnect_pg_server (ConnCacheEntry *entry)
 
static void check_conn_params (const char **keywords, const char **values, UserMapping *user)
 
static void configure_remote_session (PGconn *conn)
 
static void begin_remote_xact (ConnCacheEntry *entry)
 
static void pgfdw_xact_callback (XactEvent event, void *arg)
 
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
 
static void pgfdw_inval_callback (Datum arg, int cacheid, uint32 hashvalue)
 
static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry *entry)
 
static bool pgfdw_cancel_query (PGconn *conn)
 
static bool pgfdw_exec_cleanup_query (PGconn *conn, const char *query, bool ignore_errors)
 
static bool pgfdw_get_cleanup_result (PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
 
static void pgfdw_abort_cleanup (ConnCacheEntry *entry, const char *sql, bool toplevel)
 
static bool UserMappingPasswordRequired (UserMapping *user)
 
static bool disconnect_cached_connections (Oid serverid)
 
PGconnGetConnection (UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
 
void do_sql_command (PGconn *conn, const char *sql)
 
void ReleaseConnection (PGconn *conn)
 
unsigned int GetCursorNumber (PGconn *conn)
 
unsigned int GetPrepStmtNumber (PGconn *conn)
 
PGresultpgfdw_exec_query (PGconn *conn, const char *query, PgFdwConnState *state)
 
PGresultpgfdw_get_result (PGconn *conn, const char *query)
 
void pgfdw_report_error (int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
 
Datum postgres_fdw_get_connections (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_disconnect (PG_FUNCTION_ARGS)
 
Datum postgres_fdw_disconnect_all (PG_FUNCTION_ARGS)
 

Variables

static HTABConnectionHash = NULL
 
static unsigned int cursor_number = 0
 
static unsigned int prep_stmt_number = 0
 
static bool xact_got_connection = false
 

Macro Definition Documentation

◆ POSTGRES_FDW_GET_CONNECTIONS_COLS

#define POSTGRES_FDW_GET_CONNECTIONS_COLS   2

Typedef Documentation

◆ ConnCacheEntry

◆ ConnCacheKey

typedef Oid ConnCacheKey

Definition at line 49 of file connection.c.

Function Documentation

◆ begin_remote_xact()

static void begin_remote_xact ( ConnCacheEntry entry)
static

Definition at line 647 of file connection.c.

648 {
649  int curlevel = GetCurrentTransactionNestLevel();
650 
651  /* Start main transaction if we haven't yet */
652  if (entry->xact_depth <= 0)
653  {
654  const char *sql;
655 
656  elog(DEBUG3, "starting remote transaction on connection %p",
657  entry->conn);
658 
660  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
661  else
662  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
663  entry->changing_xact_state = true;
664  do_sql_command(entry->conn, sql);
665  entry->xact_depth = 1;
666  entry->changing_xact_state = false;
667  }
668 
669  /*
670  * If we're in a subtransaction, stack up savepoints to match our level.
671  * This ensures we can rollback just the desired effects when a
672  * subtransaction aborts.
673  */
674  while (entry->xact_depth < curlevel)
675  {
676  char sql[64];
677 
678  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
679  entry->changing_xact_state = true;
680  do_sql_command(entry->conn, sql);
681  entry->xact_depth++;
682  entry->changing_xact_state = false;
683  }
684 }
void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:624
#define DEBUG3
Definition: elog.h:22
#define elog(elevel,...)
Definition: elog.h:218
#define snprintf
Definition: port.h:225
PGconn * conn
Definition: connection.c:54
bool changing_xact_state
Definition: connection.c:60
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:909
#define IsolationIsSerializable()
Definition: xact.h:52

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog, GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ check_conn_params()

static void check_conn_params ( const char **  keywords,
const char **  values,
UserMapping user 
)
static

Definition at line 551 of file connection.c.

552 {
553  int i;
554 
555  /* no check required if superuser */
556  if (superuser_arg(user->userid))
557  return;
558 
559  /* ok if params contain a non-empty password */
560  for (i = 0; keywords[i] != NULL; i++)
561  {
562  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
563  return;
564  }
565 
566  /* ok if the superuser explicitly said so at user mapping creation time */
568  return;
569 
570  ereport(ERROR,
571  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
572  errmsg("password is required"),
573  errdetail("Non-superusers must provide a password in the user mapping.")));
574 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:528
int errdetail(const char *fmt,...)
Definition: elog.c:1037
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define ereport(elevel,...)
Definition: elog.h:143
int i
Definition: isn.c:73
static char * user
Definition: pg_regress.c:95
bool superuser_arg(Oid roleid)
Definition: superuser.c:56

References ereport, errcode(), errdetail(), errmsg(), ERROR, i, superuser_arg(), user, UserMappingPasswordRequired(), and values.

Referenced by connect_pg_server().

◆ configure_remote_session()

static void configure_remote_session ( PGconn conn)
static

Definition at line 588 of file connection.c.

589 {
590  int remoteversion = PQserverVersion(conn);
591 
592  /* Force the search path to contain only pg_catalog (see deparse.c) */
593  do_sql_command(conn, "SET search_path = pg_catalog");
594 
595  /*
596  * Set remote timezone; this is basically just cosmetic, since all
597  * transmitted and returned timestamptzs should specify a zone explicitly
598  * anyway. However it makes the regression test outputs more predictable.
599  *
600  * We don't risk setting remote zone equal to ours, since the remote
601  * server might use a different timezone database. Instead, use UTC
602  * (quoted, because very old servers are picky about case).
603  */
604  do_sql_command(conn, "SET timezone = 'UTC'");
605 
606  /*
607  * Set values needed to ensure unambiguous data output from remote. (This
608  * logic should match what pg_dump does. See also set_transmission_modes
609  * in postgres_fdw.c.)
610  */
611  do_sql_command(conn, "SET datestyle = ISO");
612  if (remoteversion >= 80400)
613  do_sql_command(conn, "SET intervalstyle = postgres");
614  if (remoteversion >= 90000)
615  do_sql_command(conn, "SET extra_float_digits = 3");
616  else
617  do_sql_command(conn, "SET extra_float_digits = 2");
618 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6905
PGconn * conn
Definition: streamutil.c:54

References conn, do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

◆ connect_pg_server()

static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
)
static

Definition at line 340 of file connection.c.

341 {
342  PGconn *volatile conn = NULL;
343 
344  /*
345  * Use PG_TRY block to ensure closing connection on error.
346  */
347  PG_TRY();
348  {
349  const char **keywords;
350  const char **values;
351  char *appname = NULL;
352  int n;
353 
354  /*
355  * Construct connection params from generic options of ForeignServer
356  * and UserMapping. (Some of them might not be libpq options, in
357  * which case we'll just waste a few array slots.) Add 4 extra slots
358  * for application_name, fallback_application_name, client_encoding,
359  * end marker.
360  */
361  n = list_length(server->options) + list_length(user->options) + 4;
362  keywords = (const char **) palloc(n * sizeof(char *));
363  values = (const char **) palloc(n * sizeof(char *));
364 
365  n = 0;
366  n += ExtractConnectionOptions(server->options,
367  keywords + n, values + n);
368  n += ExtractConnectionOptions(user->options,
369  keywords + n, values + n);
370 
371  /*
372  * Use pgfdw_application_name as application_name if set.
373  *
374  * PQconnectdbParams() processes the parameter arrays from start to
375  * end. If any key word is repeated, the last value is used. Therefore
376  * note that pgfdw_application_name must be added to the arrays after
377  * options of ForeignServer are, so that it can override
378  * application_name set in ForeignServer.
379  */
381  {
382  keywords[n] = "application_name";
384  n++;
385  }
386 
387  /*
388  * Search the parameter arrays to find application_name setting, and
389  * replace escape sequences in it with status information if found.
390  * The arrays are searched backwards because the last value is used if
391  * application_name is repeatedly set.
392  */
393  for (int i = n - 1; i >= 0; i--)
394  {
395  if (strcmp(keywords[i], "application_name") == 0 &&
396  *(values[i]) != '\0')
397  {
398  /*
399  * Use this application_name setting if it's not empty string
400  * even after any escape sequences in it are replaced.
401  */
402  appname = process_pgfdw_appname(values[i]);
403  if (appname[0] != '\0')
404  {
405  values[i] = appname;
406  break;
407  }
408 
409  /*
410  * This empty application_name is not used, so we set
411  * values[i] to NULL and keep searching the array to find the
412  * next one.
413  */
414  values[i] = NULL;
415  pfree(appname);
416  appname = NULL;
417  }
418  }
419 
420  /* Use "postgres_fdw" as fallback_application_name */
421  keywords[n] = "fallback_application_name";
422  values[n] = "postgres_fdw";
423  n++;
424 
425  /* Set client_encoding so that libpq can convert encoding properly. */
426  keywords[n] = "client_encoding";
428  n++;
429 
430  keywords[n] = values[n] = NULL;
431 
432  /* verify the set of connection parameters */
433  check_conn_params(keywords, values, user);
434 
435  /*
436  * We must obey fd.c's limit on non-virtual file descriptors. Assume
437  * that a PGconn represents one long-lived FD. (Doing this here also
438  * ensures that VFDs are closed if needed to make room.)
439  */
440  if (!AcquireExternalFD())
441  {
442 #ifndef WIN32 /* can't write #if within ereport() macro */
443  ereport(ERROR,
444  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
445  errmsg("could not connect to server \"%s\"",
446  server->servername),
447  errdetail("There are too many open files on the local server."),
448  errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
449 #else
450  ereport(ERROR,
451  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
452  errmsg("could not connect to server \"%s\"",
453  server->servername),
454  errdetail("There are too many open files on the local server."),
455  errhint("Raise the server's max_files_per_process setting.")));
456 #endif
457  }
458 
459  /* OK to make connection */
460  conn = PQconnectdbParams(keywords, values, false);
461 
462  if (!conn)
463  ReleaseExternalFD(); /* because the PG_CATCH block won't */
464 
465  if (!conn || PQstatus(conn) != CONNECTION_OK)
466  ereport(ERROR,
467  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
468  errmsg("could not connect to server \"%s\"",
469  server->servername),
471 
472  /*
473  * Check that non-superuser has used password to establish connection;
474  * otherwise, he's piggybacking on the postgres server's user
475  * identity. See also dblink_security_check() in contrib/dblink and
476  * check_conn_params.
477  */
480  ereport(ERROR,
481  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
482  errmsg("password is required"),
483  errdetail("Non-superuser cannot connect if the server does not request a password."),
484  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
485 
486  /* Prepare new session for use */
488 
489  if (appname != NULL)
490  pfree(appname);
491  pfree(keywords);
492  pfree(values);
493  }
494  PG_CATCH();
495  {
496  /* Release PGconn data structure if we managed to create one */
497  if (conn)
498  {
499  PQfinish(conn);
501  }
502  PG_RE_THROW();
503  }
504  PG_END_TRY();
505 
506  return conn;
507 }
static void configure_remote_session(PGconn *conn)
Definition: connection.c:588
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:551
char * process_pgfdw_appname(const char *appname)
Definition: option.c:457
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:380
char * pgfdw_application_name
Definition: option.c:52
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1064
int errhint(const char *fmt,...)
Definition: elog.c:1151
#define PG_RE_THROW()
Definition: elog.h:340
#define PG_END_TRY()
Definition: elog.h:324
#define PG_TRY()
Definition: elog.h:299
#define PG_CATCH()
Definition: elog.h:309
void ReleaseExternalFD(void)
Definition: fd.c:1230
bool AcquireExternalFD(void)
Definition: fd.c:1177
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:657
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6981
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6915
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6862
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4256
@ CONNECTION_OK
Definition: libpq-fe.h:58
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1216
char * pchomp(const char *in)
Definition: mcxt.c:1327
void pfree(void *pointer)
Definition: mcxt.c:1169
void * palloc(Size size)
Definition: mcxt.c:1062
static int list_length(const List *l)
Definition: pg_list.h:149
List * options
Definition: foreign.h:42
char * servername
Definition: foreign.h:39

References AcquireExternalFD(), check_conn_params(), configure_remote_session(), conn, CONNECTION_OK, ereport, errcode(), errdetail(), errdetail_internal(), errhint(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), i, list_length(), ForeignServer::options, palloc(), pchomp(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_application_name, PQconnectdbParams(), PQconnectionUsedPassword(), PQerrorMessage(), PQfinish(), PQstatus(), process_pgfdw_appname(), ReleaseExternalFD(), ForeignServer::servername, superuser_arg(), user, UserMappingPasswordRequired(), and values.

Referenced by make_new_connection().

◆ disconnect_cached_connections()

static bool disconnect_cached_connections ( Oid  serverid)
static

Definition at line 1642 of file connection.c.

1643 {
1644  HASH_SEQ_STATUS scan;
1645  ConnCacheEntry *entry;
1646  bool all = !OidIsValid(serverid);
1647  bool result = false;
1648 
1649  /*
1650  * Connection cache hashtable has not been initialized yet in this
1651  * session, so return false.
1652  */
1653  if (!ConnectionHash)
1654  return false;
1655 
1656  hash_seq_init(&scan, ConnectionHash);
1657  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1658  {
1659  /* Ignore cache entry if no open connection right now. */
1660  if (!entry->conn)
1661  continue;
1662 
1663  if (all || entry->serverid == serverid)
1664  {
1665  /*
1666  * Emit a warning because the connection to close is used in the
1667  * current transaction and cannot be disconnected right now.
1668  */
1669  if (entry->xact_depth > 0)
1670  {
1671  ForeignServer *server;
1672 
1673  server = GetForeignServerExtended(entry->serverid,
1674  FSV_MISSING_OK);
1675 
1676  if (!server)
1677  {
1678  /*
1679  * If the foreign server was dropped while its connection
1680  * was used in the current transaction, the connection
1681  * must have been marked as invalid by
1682  * pgfdw_inval_callback at the end of DROP SERVER command.
1683  */
1684  Assert(entry->invalidated);
1685 
1686  ereport(WARNING,
1687  (errmsg("cannot close dropped server connection because it is still in use")));
1688  }
1689  else
1690  ereport(WARNING,
1691  (errmsg("cannot close connection for server \"%s\" because it is still in use",
1692  server->servername)));
1693  }
1694  else
1695  {
1696  elog(DEBUG3, "discarding connection %p", entry->conn);
1697  disconnect_pg_server(entry);
1698  result = true;
1699  }
1700  }
1701  }
1702 
1703  return result;
1704 }
#define OidIsValid(objectId)
Definition: c.h:710
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:513
static HTAB * ConnectionHash
Definition: connection.c:73
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
#define WARNING
Definition: elog.h:30
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:121
#define FSV_MISSING_OK
Definition: foreign.h:61
Assert(fmt[strlen(fmt) - 1] !='\n')
bool invalidated
Definition: connection.c:61

References Assert(), ConnCacheEntry::conn, ConnectionHash, DEBUG3, disconnect_pg_server(), elog, ereport, errmsg(), FSV_MISSING_OK, GetForeignServerExtended(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, OidIsValid, ConnCacheEntry::serverid, ForeignServer::servername, WARNING, and ConnCacheEntry::xact_depth.

Referenced by postgres_fdw_disconnect(), and postgres_fdw_disconnect_all().

◆ disconnect_pg_server()

static void disconnect_pg_server ( ConnCacheEntry entry)
static

Definition at line 513 of file connection.c.

514 {
515  if (entry->conn != NULL)
516  {
517  PQfinish(entry->conn);
518  entry->conn = NULL;
520  }
521 }

References ConnCacheEntry::conn, PQfinish(), and ReleaseExternalFD().

Referenced by disconnect_cached_connections(), GetConnection(), pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), and pgfdw_xact_callback().

◆ do_sql_command()

void do_sql_command ( PGconn conn,
const char *  sql 
)

Definition at line 624 of file connection.c.

625 {
626  PGresult *res;
627 
628  if (!PQsendQuery(conn, sql))
629  pgfdw_report_error(ERROR, NULL, conn, false, sql);
630  res = pgfdw_get_result(conn, sql);
632  pgfdw_report_error(ERROR, res, conn, true, sql);
633  PQclear(res);
634 }
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:766
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:831
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
void PQclear(PGresult *res)
Definition: fe-exec.c:694
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1326
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:95

References conn, ERROR, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), PQsendQuery(), and res.

Referenced by begin_remote_xact(), configure_remote_session(), pgfdw_subxact_callback(), pgfdw_xact_callback(), and postgresExecForeignTruncate().

◆ GetConnection()

PGconn* GetConnection ( UserMapping user,
bool  will_prep_stmt,
PgFdwConnState **  state 
)

Definition at line 127 of file connection.c.

128 {
129  bool found;
130  bool retry = false;
131  ConnCacheEntry *entry;
134 
135  /* First time through, initialize connection cache hashtable */
136  if (ConnectionHash == NULL)
137  {
138  HASHCTL ctl;
139 
140  ctl.keysize = sizeof(ConnCacheKey);
141  ctl.entrysize = sizeof(ConnCacheEntry);
142  ConnectionHash = hash_create("postgres_fdw connections", 8,
143  &ctl,
145 
146  /*
147  * Register some callback functions that manage connection cleanup.
148  * This should be done just once in each backend.
149  */
156  }
157 
158  /* Set flag that we did GetConnection during the current transaction */
159  xact_got_connection = true;
160 
161  /* Create hash key for the entry. Assume no pad bytes in key struct */
162  key = user->umid;
163 
164  /*
165  * Find or create cached entry for requested connection.
166  */
167  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
168  if (!found)
169  {
170  /*
171  * We need only clear "conn" here; remaining fields will be filled
172  * later when "conn" is set.
173  */
174  entry->conn = NULL;
175  }
176 
177  /* Reject further use of connections which failed abort cleanup. */
179 
180  /*
181  * If the connection needs to be remade due to invalidation, disconnect as
182  * soon as we're out of all transactions.
183  */
184  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
185  {
186  elog(DEBUG3, "closing connection %p for option changes to take effect",
187  entry->conn);
188  disconnect_pg_server(entry);
189  }
190 
191  /*
192  * If cache entry doesn't have a connection, we have to establish a new
193  * connection. (If connect_pg_server throws an error, the cache entry
194  * will remain in a valid empty state, ie conn == NULL.)
195  */
196  if (entry->conn == NULL)
197  make_new_connection(entry, user);
198 
199  /*
200  * We check the health of the cached connection here when using it. In
201  * cases where we're out of all transactions, if a broken connection is
202  * detected, we try to reestablish a new connection later.
203  */
204  PG_TRY();
205  {
206  /* Process a pending asynchronous request if any. */
207  if (entry->state.pendingAreq)
209  /* Start a new transaction or subtransaction if needed. */
210  begin_remote_xact(entry);
211  }
212  PG_CATCH();
213  {
215  ErrorData *errdata = CopyErrorData();
216 
217  /*
218  * Determine whether to try to reestablish the connection.
219  *
220  * After a broken connection is detected in libpq, any error other
221  * than connection failure (e.g., out-of-memory) can be thrown
222  * somewhere between return from libpq and the expected ereport() call
223  * in pgfdw_report_error(). In this case, since PQstatus() indicates
224  * CONNECTION_BAD, checking only PQstatus() causes the false detection
225  * of connection failure. To avoid this, we also verify that the
226  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
227  * checking only the sqlstate can cause another false detection
228  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
229  * for any libpq-originated error condition.
230  */
231  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
232  PQstatus(entry->conn) != CONNECTION_BAD ||
233  entry->xact_depth > 0)
234  {
235  MemoryContextSwitchTo(ecxt);
236  PG_RE_THROW();
237  }
238 
239  /* Clean up the error state */
240  FlushErrorState();
241  FreeErrorData(errdata);
242  errdata = NULL;
243 
244  retry = true;
245  }
246  PG_END_TRY();
247 
248  /*
249  * If a broken connection is detected, disconnect it, reestablish a new
250  * connection and retry a new remote transaction. If connection failure is
251  * reported again, we give up getting a connection.
252  */
253  if (retry)
254  {
255  Assert(entry->xact_depth == 0);
256 
257  ereport(DEBUG3,
258  (errmsg_internal("could not start remote transaction on connection %p",
259  entry->conn)),
260  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
261 
262  elog(DEBUG3, "closing connection %p to reestablish a new one",
263  entry->conn);
264  disconnect_pg_server(entry);
265 
266  if (entry->conn == NULL)
267  make_new_connection(entry, user);
268 
269  begin_remote_xact(entry);
270  }
271 
272  /* Remember if caller will prepare statements */
273  entry->have_prep_stmt |= will_prep_stmt;
274 
275  /* If caller needs access to the per-connection state, return it. */
276  if (state)
277  *state = &entry->state;
278 
279  return entry->conn;
280 }
Oid ConnCacheKey
Definition: connection.c:49
static bool xact_got_connection
Definition: connection.c:80
struct ConnCacheEntry ConnCacheEntry
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:287
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:1018
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1152
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1102
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:887
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:647
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
int errmsg_internal(const char *fmt,...)
Definition: elog.c:991
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1611
void FlushErrorState(void)
Definition: elog.c:1649
ErrorData * CopyErrorData(void)
Definition: elog.c:1555
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1518
@ CONNECTION_BAD
Definition: libpq-fe.h:59
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uintptr_t Datum
Definition: postgres.h:411
void process_pending_request(AsyncRequest *areq)
bool have_prep_stmt
Definition: connection.c:58
PgFdwConnState state
Definition: connection.c:67
int sqlerrcode
Definition: elog.h:367
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
AsyncRequest * pendingAreq
Definition: postgres_fdw.h:134
Definition: regguts.h:318
@ FOREIGNSERVEROID
Definition: syscache.h:64
@ USERMAPPINGOID
Definition: syscache.h:113
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3598
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3653

References Assert(), begin_remote_xact(), CacheRegisterSyscacheCallback(), ConnCacheEntry::conn, CONNECTION_BAD, ConnectionHash, CopyErrorData(), CurrentMemoryContext, DEBUG3, disconnect_pg_server(), elog, HASHCTL::entrysize, ereport, errdetail_internal(), errmsg_internal(), FlushErrorState(), FOREIGNSERVEROID, FreeErrorData(), HASH_BLOBS, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, sort-test::key, HASHCTL::keysize, make_new_connection(), MemoryContextSwitchTo(), pchomp(), PgFdwConnState::pendingAreq, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_subxact_callback(), pgfdw_xact_callback(), PQerrorMessage(), PQstatus(), process_pending_request(), RegisterSubXactCallback(), RegisterXactCallback(), ErrorData::sqlerrcode, ConnCacheEntry::state, user, USERMAPPINGOID, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by create_foreign_modify(), dumpBlobs(), dumpDatabase(), dumpDatabaseConfig(), dumpTableData_copy(), estimate_path_cost_size(), expand_extension_name_patterns(), expand_foreign_server_name_patterns(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignScan(), postgresExecForeignTruncate(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

◆ GetCursorNumber()

unsigned int GetCursorNumber ( PGconn conn)

Definition at line 711 of file connection.c.

712 {
713  return ++cursor_number;
714 }
static unsigned int cursor_number
Definition: connection.c:76

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

◆ GetPrepStmtNumber()

unsigned int GetPrepStmtNumber ( PGconn conn)

Definition at line 725 of file connection.c.

726 {
727  return ++prep_stmt_number;
728 }
static unsigned int prep_stmt_number
Definition: connection.c:77

References prep_stmt_number.

Referenced by prepare_foreign_modify().

◆ make_new_connection()

static void make_new_connection ( ConnCacheEntry entry,
UserMapping user 
)
static

Definition at line 287 of file connection.c.

288 {
289  ForeignServer *server = GetForeignServer(user->serverid);
290  ListCell *lc;
291 
292  Assert(entry->conn == NULL);
293 
294  /* Reset all transient state fields, to be sure all are clean */
295  entry->xact_depth = 0;
296  entry->have_prep_stmt = false;
297  entry->have_error = false;
298  entry->changing_xact_state = false;
299  entry->invalidated = false;
300  entry->serverid = server->serverid;
301  entry->server_hashvalue =
303  ObjectIdGetDatum(server->serverid));
304  entry->mapping_hashvalue =
306  ObjectIdGetDatum(user->umid));
307  memset(&entry->state, 0, sizeof(entry->state));
308 
309  /*
310  * Determine whether to keep the connection that we're about to make here
311  * open even after the transaction using it ends, so that the subsequent
312  * transactions can re-use it.
313  *
314  * It's enough to determine this only when making new connection because
315  * all the connections to the foreign server whose keep_connections option
316  * is changed will be closed and re-made later.
317  *
318  * By default, all the connections to any foreign servers are kept open.
319  */
320  entry->keep_connections = true;
321  foreach(lc, server->options)
322  {
323  DefElem *def = (DefElem *) lfirst(lc);
324 
325  if (strcmp(def->defname, "keep_connections") == 0)
326  entry->keep_connections = defGetBoolean(def);
327  }
328 
329  /* Now try to make the connection */
330  entry->conn = connect_pg_server(server, user);
331 
332  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
333  entry->conn, server->servername, user->umid, user->userid);
334 }
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:340
bool defGetBoolean(DefElem *def)
Definition: define.c:108
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
#define lfirst(lc)
Definition: pg_list.h:169
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
uint32 server_hashvalue
Definition: connection.c:65
uint32 mapping_hashvalue
Definition: connection.c:66
bool keep_connections
Definition: connection.c:62
char * defname
Definition: parsenodes.h:758
Oid serverid
Definition: foreign.h:36
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:204

References Assert(), ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, defGetBoolean(), DefElem::defname, elog, FOREIGNSERVEROID, GetForeignServer(), GetSysCacheHashValue1, ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, ConnCacheEntry::keep_connections, lfirst, ConnCacheEntry::mapping_hashvalue, ObjectIdGetDatum, ForeignServer::options, ConnCacheEntry::server_hashvalue, ConnCacheEntry::serverid, ForeignServer::serverid, ForeignServer::servername, ConnCacheEntry::state, user, USERMAPPINGOID, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ PG_FUNCTION_INFO_V1() [1/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_disconnect  )

◆ PG_FUNCTION_INFO_V1() [2/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_disconnect_all  )

◆ PG_FUNCTION_INFO_V1() [3/3]

PG_FUNCTION_INFO_V1 ( postgres_fdw_get_connections  )

◆ pgfdw_abort_cleanup()

static void pgfdw_abort_cleanup ( ConnCacheEntry entry,
const char *  sql,
bool  toplevel 
)
static

Definition at line 1398 of file connection.c.

1399 {
1400  /*
1401  * Don't try to clean up the connection if we're already in error
1402  * recursion trouble.
1403  */
1405  entry->changing_xact_state = true;
1406 
1407  /*
1408  * If connection is already unsalvageable, don't touch it further.
1409  */
1410  if (entry->changing_xact_state)
1411  return;
1412 
1413  /*
1414  * Mark this connection as in the process of changing transaction state.
1415  */
1416  entry->changing_xact_state = true;
1417 
1418  /* Assume we might have lost track of prepared statements */
1419  entry->have_error = true;
1420 
1421  /*
1422  * If a command has been submitted to the remote server by using an
1423  * asynchronous execution function, the command might not have yet
1424  * completed. Check to see if a command is still being processed by the
1425  * remote server, and if so, request cancellation of the command.
1426  */
1427  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1428  !pgfdw_cancel_query(entry->conn))
1429  return; /* Unable to cancel running query */
1430 
1431  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1432  return; /* Unable to abort remote transaction */
1433 
1434  if (toplevel)
1435  {
1436  if (entry->have_prep_stmt && entry->have_error &&
1438  "DEALLOCATE ALL",
1439  true))
1440  return; /* Trouble clearing prepared statements */
1441 
1442  entry->have_prep_stmt = false;
1443  entry->have_error = false;
1444  /* Also reset per-connection state */
1445  memset(&entry->state, 0, sizeof(entry->state));
1446  }
1447 
1448  /* Disarm changing_xact_state if it all worked */
1449  entry->changing_xact_state = false;
1450 }
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1251
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1187
bool in_error_recursion_trouble(void)
Definition: elog.c:286
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6870
@ PQTRANS_ACTIVE
Definition: libpq-fe.h:117

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, in_error_recursion_trouble(), pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), PQTRANS_ACTIVE, PQtransactionStatus(), and ConnCacheEntry::state.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_cancel_query()

static bool pgfdw_cancel_query ( PGconn conn)
static

Definition at line 1187 of file connection.c.

1188 {
1189  PGcancel *cancel;
1190  char errbuf[256];
1191  PGresult *result = NULL;
1192  TimestampTz endtime;
1193  bool timed_out;
1194 
1195  /*
1196  * If it takes too long to cancel the query and discard the result, assume
1197  * the connection is dead.
1198  */
1200 
1201  /*
1202  * Issue cancel request. Unfortunately, there's no good way to limit the
1203  * amount of time that we might block inside PQgetCancel().
1204  */
1205  if ((cancel = PQgetCancel(conn)))
1206  {
1207  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1208  {
1209  ereport(WARNING,
1210  (errcode(ERRCODE_CONNECTION_FAILURE),
1211  errmsg("could not send cancel request: %s",
1212  errbuf)));
1213  PQfreeCancel(cancel);
1214  return false;
1215  }
1216  PQfreeCancel(cancel);
1217  }
1218 
1219  /* Get and discard the result of the query. */
1220  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1221  {
1222  if (timed_out)
1223  ereport(WARNING,
1224  (errmsg("could not get result of cancel request due to timeout")));
1225  else
1226  ereport(WARNING,
1227  (errcode(ERRCODE_CONNECTION_FAILURE),
1228  errmsg("could not get result of cancel request: %s",
1229  pchomp(PQerrorMessage(conn)))));
1230 
1231  return false;
1232  }
1233  PQclear(result);
1234 
1235  return true;
1236 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out)
Definition: connection.c:1311
int64 TimestampTz
Definition: timestamp.h:39
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4377
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4492
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4445
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56

References conn, ereport, errcode(), errmsg(), GetCurrentTimestamp(), pchomp(), pgfdw_get_cleanup_result(), PQcancel(), PQclear(), PQerrorMessage(), PQfreeCancel(), PQgetCancel(), TimestampTzPlusMilliseconds, and WARNING.

Referenced by pgfdw_abort_cleanup().

◆ pgfdw_exec_cleanup_query()

static bool pgfdw_exec_cleanup_query ( PGconn conn,
const char *  query,
bool  ignore_errors 
)
static

Definition at line 1251 of file connection.c.

1252 {
1253  PGresult *result = NULL;
1254  TimestampTz endtime;
1255  bool timed_out;
1256 
1257  /*
1258  * If it takes too long to execute a cleanup query, assume the connection
1259  * is dead. It's fairly likely that this is why we aborted in the first
1260  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1261  * be too long.
1262  */
1264 
1265  /*
1266  * Submit a query. Since we don't use non-blocking mode, this also can
1267  * block. But its risk is relatively small, so we ignore that for now.
1268  */
1269  if (!PQsendQuery(conn, query))
1270  {
1271  pgfdw_report_error(WARNING, NULL, conn, false, query);
1272  return false;
1273  }
1274 
1275  /* Get the result of the query. */
1276  if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1277  {
1278  if (timed_out)
1279  ereport(WARNING,
1280  (errmsg("could not get query result due to timeout"),
1281  query ? errcontext("remote SQL command: %s", query) : 0));
1282  else
1283  pgfdw_report_error(WARNING, NULL, conn, false, query);
1284 
1285  return false;
1286  }
1287 
1288  /* Issue a warning if not successful. */
1289  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1290  {
1291  pgfdw_report_error(WARNING, result, conn, true, query);
1292  return ignore_errors;
1293  }
1294  PQclear(result);
1295 
1296  return true;
1297 }
#define errcontext
Definition: elog.h:190

References conn, ereport, errcontext, errmsg(), GetCurrentTimestamp(), pgfdw_get_cleanup_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), PQsendQuery(), TimestampTzPlusMilliseconds, and WARNING.

Referenced by pgfdw_abort_cleanup().

◆ pgfdw_exec_query()

PGresult* pgfdw_exec_query ( PGconn conn,
const char *  query,
PgFdwConnState state 
)

Definition at line 738 of file connection.c.

739 {
740  /* First, process a pending asynchronous request, if any. */
741  if (state && state->pendingAreq)
742  process_pending_request(state->pendingAreq);
743 
744  /*
745  * Submit a query. Since we don't use non-blocking mode, this also can
746  * block. But its risk is relatively small, so we ignore that for now.
747  */
748  if (!PQsendQuery(conn, query))
749  pgfdw_report_error(ERROR, NULL, conn, false, query);
750 
751  /* Wait for the result. */
752  return pgfdw_get_result(conn, query);
753 }

References conn, ERROR, pgfdw_get_result(), pgfdw_report_error(), PQsendQuery(), and process_pending_request().

Referenced by close_cursor(), deallocate_query(), fetch_more_data(), get_remote_estimate(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresImportForeignSchema(), and postgresReScanForeignScan().

◆ pgfdw_get_cleanup_result()

static bool pgfdw_get_cleanup_result ( PGconn conn,
TimestampTz  endtime,
PGresult **  result,
bool timed_out 
)
static

Definition at line 1311 of file connection.c.

1313 {
1314  volatile bool failed = false;
1315  PGresult *volatile last_res = NULL;
1316 
1317  *timed_out = false;
1318 
1319  /* In what follows, do not leak any PGresults on an error. */
1320  PG_TRY();
1321  {
1322  for (;;)
1323  {
1324  PGresult *res;
1325 
1326  while (PQisBusy(conn))
1327  {
1328  int wc;
1330  long cur_timeout;
1331 
1332  /* If timeout has expired, give up, else get sleep time. */
1333  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1334  if (cur_timeout <= 0)
1335  {
1336  *timed_out = true;
1337  failed = true;
1338  goto exit;
1339  }
1340 
1341  /* Sleep until there's something to do */
1345  PQsocket(conn),
1346  cur_timeout, PG_WAIT_EXTENSION);
1348 
1350 
1351  /* Data available in socket? */
1352  if (wc & WL_SOCKET_READABLE)
1353  {
1354  if (!PQconsumeInput(conn))
1355  {
1356  /* connection trouble */
1357  failed = true;
1358  goto exit;
1359  }
1360  }
1361  }
1362 
1363  res = PQgetResult(conn);
1364  if (res == NULL)
1365  break; /* query is complete */
1366 
1367  PQclear(last_res);
1368  last_res = res;
1369  }
1370 exit: ;
1371  }
1372  PG_CATCH();
1373  {
1374  PQclear(last_res);
1375  PG_RE_THROW();
1376  }
1377  PG_END_TRY();
1378 
1379  if (failed)
1380  PQclear(last_res);
1381  else
1382  *result = last_res;
1383  return failed;
1384 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1693
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6941
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1904
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1951
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
struct Latch * MyLatch
Definition: globals.c:57
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
void ResetLatch(Latch *latch)
Definition: latch.c:660
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
exit(1)
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
#define PG_WAIT_EXTENSION
Definition: wait_event.h:23

References CHECK_FOR_INTERRUPTS, conn, exit(), GetCurrentTimestamp(), MyLatch, now(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PG_WAIT_EXTENSION, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), res, ResetLatch(), TimestampDifferenceMilliseconds(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by pgfdw_cancel_query(), and pgfdw_exec_cleanup_query().

◆ pgfdw_get_result()

PGresult* pgfdw_get_result ( PGconn conn,
const char *  query 
)

Definition at line 766 of file connection.c.

767 {
768  PGresult *volatile last_res = NULL;
769 
770  /* In what follows, do not leak any PGresults on an error. */
771  PG_TRY();
772  {
773  for (;;)
774  {
775  PGresult *res;
776 
777  while (PQisBusy(conn))
778  {
779  int wc;
780 
781  /* Sleep until there's something to do */
785  PQsocket(conn),
786  -1L, PG_WAIT_EXTENSION);
788 
790 
791  /* Data available in socket? */
792  if (wc & WL_SOCKET_READABLE)
793  {
794  if (!PQconsumeInput(conn))
795  pgfdw_report_error(ERROR, NULL, conn, false, query);
796  }
797  }
798 
799  res = PQgetResult(conn);
800  if (res == NULL)
801  break; /* query is complete */
802 
803  PQclear(last_res);
804  last_res = res;
805  }
806  }
807  PG_CATCH();
808  {
809  PQclear(last_res);
810  PG_RE_THROW();
811  }
812  PG_END_TRY();
813 
814  return last_res;
815 }

References CHECK_FOR_INTERRUPTS, conn, ERROR, MyLatch, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PG_WAIT_EXTENSION, pgfdw_report_error(), PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), res, ResetLatch(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by create_cursor(), do_sql_command(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), pgfdw_exec_query(), and prepare_foreign_modify().

◆ pgfdw_inval_callback()

static void pgfdw_inval_callback ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1102 of file connection.c.

1103 {
1104  HASH_SEQ_STATUS scan;
1105  ConnCacheEntry *entry;
1106 
1107  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1108 
1109  /* ConnectionHash must exist already, if we're registered */
1110  hash_seq_init(&scan, ConnectionHash);
1111  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1112  {
1113  /* Ignore invalid entries */
1114  if (entry->conn == NULL)
1115  continue;
1116 
1117  /* hashvalue == 0 means a cache reset, must clear all state */
1118  if (hashvalue == 0 ||
1119  (cacheid == FOREIGNSERVEROID &&
1120  entry->server_hashvalue == hashvalue) ||
1121  (cacheid == USERMAPPINGOID &&
1122  entry->mapping_hashvalue == hashvalue))
1123  {
1124  /*
1125  * Close the connection immediately if it's not used yet in this
1126  * transaction. Otherwise mark it as invalid so that
1127  * pgfdw_xact_callback() can close it at the end of this
1128  * transaction.
1129  */
1130  if (entry->xact_depth == 0)
1131  {
1132  elog(DEBUG3, "discarding connection %p", entry->conn);
1133  disconnect_pg_server(entry);
1134  }
1135  else
1136  entry->invalidated = true;
1137  }
1138  }
1139 }

References Assert(), ConnCacheEntry::conn, ConnectionHash, DEBUG3, disconnect_pg_server(), elog, FOREIGNSERVEROID, hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, ConnCacheEntry::mapping_hashvalue, ConnCacheEntry::server_hashvalue, USERMAPPINGOID, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

◆ pgfdw_reject_incomplete_xact_state_change()

static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry entry)
static

Definition at line 1152 of file connection.c.

1153 {
1154  ForeignServer *server;
1155 
1156  /* nothing to do for inactive entries and entries of sane state */
1157  if (entry->conn == NULL || !entry->changing_xact_state)
1158  return;
1159 
1160  /* make sure this entry is inactive */
1161  disconnect_pg_server(entry);
1162 
1163  /* find server name to be shown in the message below */
1164  server = GetForeignServer(entry->serverid);
1165 
1166  ereport(ERROR,
1167  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1168  errmsg("connection to server \"%s\" was lost",
1169  server->servername)));
1170 }

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, disconnect_pg_server(), ereport, errcode(), errmsg(), ERROR, GetForeignServer(), ConnCacheEntry::serverid, and ForeignServer::servername.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

◆ pgfdw_report_error()

void pgfdw_report_error ( int  elevel,
PGresult res,
PGconn conn,
bool  clear,
const char *  sql 
)

Definition at line 831 of file connection.c.

833 {
834  /* If requested, PGresult must be released before leaving this function. */
835  PG_TRY();
836  {
837  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
838  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
839  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
840  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
841  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
842  int sqlstate;
843 
844  if (diag_sqlstate)
845  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
846  diag_sqlstate[1],
847  diag_sqlstate[2],
848  diag_sqlstate[3],
849  diag_sqlstate[4]);
850  else
851  sqlstate = ERRCODE_CONNECTION_FAILURE;
852 
853  /*
854  * If we don't get a message from the PGresult, try the PGconn. This
855  * is needed because for connection-level failures, PQexec may just
856  * return NULL, not a PGresult at all.
857  */
858  if (message_primary == NULL)
859  message_primary = pchomp(PQerrorMessage(conn));
860 
861  ereport(elevel,
862  (errcode(sqlstate),
863  (message_primary != NULL && message_primary[0] != '\0') ?
864  errmsg_internal("%s", message_primary) :
865  errmsg("could not obtain message string for remote error"),
866  message_detail ? errdetail_internal("%s", message_detail) : 0,
867  message_hint ? errhint("%s", message_hint) : 0,
868  message_context ? errcontext("%s", message_context) : 0,
869  sql ? errcontext("remote SQL command: %s", sql) : 0));
870  }
871  PG_FINALLY();
872  {
873  if (clear)
874  PQclear(res);
875  }
876  PG_END_TRY();
877 }
#define PG_FINALLY()
Definition: elog.h:316
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:50
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3233
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64

References conn, ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, pchomp(), PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_FINALLY, PG_TRY, PQclear(), PQerrorMessage(), PQresultErrorField(), and res.

Referenced by close_cursor(), create_cursor(), deallocate_query(), do_sql_command(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), fetch_more_data_begin(), get_remote_estimate(), pgfdw_exec_cleanup_query(), pgfdw_exec_query(), pgfdw_get_result(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresForeignAsyncNotify(), postgresImportForeignSchema(), postgresReScanForeignScan(), and prepare_foreign_modify().

◆ pgfdw_subxact_callback()

static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
)
static

Definition at line 1018 of file connection.c.

1020 {
1021  HASH_SEQ_STATUS scan;
1022  ConnCacheEntry *entry;
1023  int curlevel;
1024 
1025  /* Nothing to do at subxact start, nor after commit. */
1026  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1027  event == SUBXACT_EVENT_ABORT_SUB))
1028  return;
1029 
1030  /* Quick exit if no connections were touched in this transaction. */
1031  if (!xact_got_connection)
1032  return;
1033 
1034  /*
1035  * Scan all connection cache entries to find open remote subtransactions
1036  * of the current level, and close them.
1037  */
1038  curlevel = GetCurrentTransactionNestLevel();
1039  hash_seq_init(&scan, ConnectionHash);
1040  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1041  {
1042  char sql[100];
1043 
1044  /*
1045  * We only care about connections with open remote subtransactions of
1046  * the current level.
1047  */
1048  if (entry->conn == NULL || entry->xact_depth < curlevel)
1049  continue;
1050 
1051  if (entry->xact_depth > curlevel)
1052  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1053  entry->xact_depth);
1054 
1055  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1056  {
1057  /*
1058  * If abort cleanup previously failed for this connection, we
1059  * can't issue any more commands against it.
1060  */
1062 
1063  /* Commit all remote subtransactions during pre-commit */
1064  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1065  entry->changing_xact_state = true;
1066  do_sql_command(entry->conn, sql);
1067  entry->changing_xact_state = false;
1068  }
1069  else
1070  {
1071  /* Rollback all remote subtransactions during abort */
1072  snprintf(sql, sizeof(sql),
1073  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
1074  curlevel, curlevel);
1075  pgfdw_abort_cleanup(entry, sql, false);
1076  }
1077 
1078  /* OK, we're outta that level of subtransaction */
1079  entry->xact_depth--;
1080  }
1081 }
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel)
Definition: connection.c:1398
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition: xact.h:132
@ SUBXACT_EVENT_ABORT_SUB
Definition: xact.h:131

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, ConnectionHash, do_sql_command(), elog, ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), pgfdw_abort_cleanup(), pgfdw_reject_incomplete_xact_state_change(), snprintf, SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

◆ pgfdw_xact_callback()

static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
)
static

Definition at line 887 of file connection.c.

888 {
889  HASH_SEQ_STATUS scan;
890  ConnCacheEntry *entry;
891 
892  /* Quick exit if no connections were touched in this transaction. */
893  if (!xact_got_connection)
894  return;
895 
896  /*
897  * Scan all connection cache entries to find open remote transactions, and
898  * close them.
899  */
901  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
902  {
903  PGresult *res;
904 
905  /* Ignore cache entry if no open connection right now */
906  if (entry->conn == NULL)
907  continue;
908 
909  /* If it has an open remote transaction, try to close it */
910  if (entry->xact_depth > 0)
911  {
912  elog(DEBUG3, "closing remote transaction on connection %p",
913  entry->conn);
914 
915  switch (event)
916  {
919 
920  /*
921  * If abort cleanup previously failed for this connection,
922  * we can't issue any more commands against it.
923  */
925 
926  /* Commit all remote transactions during pre-commit */
927  entry->changing_xact_state = true;
928  do_sql_command(entry->conn, "COMMIT TRANSACTION");
929  entry->changing_xact_state = false;
930 
931  /*
932  * If there were any errors in subtransactions, and we
933  * made prepared statements, do a DEALLOCATE ALL to make
934  * sure we get rid of all prepared statements. This is
935  * annoying and not terribly bulletproof, but it's
936  * probably not worth trying harder.
937  *
938  * DEALLOCATE ALL only exists in 8.3 and later, so this
939  * constrains how old a server postgres_fdw can
940  * communicate with. We intentionally ignore errors in
941  * the DEALLOCATE, so that we can hobble along to some
942  * extent with older servers (leaking prepared statements
943  * as we go; but we don't really support update operations
944  * pre-8.3 anyway).
945  */
946  if (entry->have_prep_stmt && entry->have_error)
947  {
948  res = PQexec(entry->conn, "DEALLOCATE ALL");
949  PQclear(res);
950  }
951  entry->have_prep_stmt = false;
952  entry->have_error = false;
953  break;
955 
956  /*
957  * We disallow any remote transactions, since it's not
958  * very reasonable to hold them open until the prepared
959  * transaction is committed. For the moment, throw error
960  * unconditionally; later we might allow read-only cases.
961  * Note that the error will cause us to come right back
962  * here with event == XACT_EVENT_ABORT, so we'll clean up
963  * the connection state at that point.
964  */
965  ereport(ERROR,
966  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
967  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
968  break;
970  case XACT_EVENT_COMMIT:
971  case XACT_EVENT_PREPARE:
972  /* Pre-commit should have closed the open transaction */
973  elog(ERROR, "missed cleaning up connection during pre-commit");
974  break;
976  case XACT_EVENT_ABORT:
977 
978  pgfdw_abort_cleanup(entry, "ABORT TRANSACTION", true);
979  break;
980  }
981  }
982 
983  /* Reset state to show we're out of a transaction */
984  entry->xact_depth = 0;
985 
986  /*
987  * If the connection isn't in a good idle state, it is marked as
988  * invalid or keep_connections option of its server is disabled, then
989  * discard it to recover. Next GetConnection will open a new
990  * connection.
991  */
992  if (PQstatus(entry->conn) != CONNECTION_OK ||
994  entry->changing_xact_state ||
995  entry->invalidated ||
996  !entry->keep_connections)
997  {
998  elog(DEBUG3, "discarding connection %p", entry->conn);
999  disconnect_pg_server(entry);
1000  }
1001  }
1002 
1003  /*
1004  * Regardless of the event type, we can now mark ourselves as out of the
1005  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1006  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1007  */
1008  xact_got_connection = false;
1009 
1010  /* Also reset cursor numbering for next transaction */
1011  cursor_number = 0;
1012 }
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
@ PQTRANS_IDLE
Definition: libpq-fe.h:116
@ XACT_EVENT_PRE_PREPARE
Definition: xact.h:122
@ XACT_EVENT_COMMIT
Definition: xact.h:115
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition: xact.h:121
@ XACT_EVENT_PARALLEL_COMMIT
Definition: xact.h:116
@ XACT_EVENT_ABORT
Definition: xact.h:117
@ XACT_EVENT_PRE_COMMIT
Definition: xact.h:120
@ XACT_EVENT_PARALLEL_ABORT
Definition: xact.h:118
@ XACT_EVENT_PREPARE
Definition: xact.h:119

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_OK, ConnectionHash, cursor_number, DEBUG3, disconnect_pg_server(), do_sql_command(), elog, ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, ConnCacheEntry::keep_connections, pgfdw_abort_cleanup(), pgfdw_reject_incomplete_xact_state_change(), PQclear(), PQexec(), PQstatus(), PQTRANS_IDLE, PQtransactionStatus(), res, ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

◆ postgres_fdw_disconnect()

Datum postgres_fdw_disconnect ( PG_FUNCTION_ARGS  )

Definition at line 1593 of file connection.c.

1594 {
1595  ForeignServer *server;
1596  char *servername;
1597 
1598  servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
1599  server = GetForeignServerByName(servername, false);
1600 
1602 }
static bool disconnect_cached_connections(Oid serverid)
Definition: connection.c:1642
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:180
char * text_to_cstring(const text *t)
Definition: varlena.c:222

References disconnect_cached_connections(), GetForeignServerByName(), PG_GETARG_TEXT_PP, PG_RETURN_BOOL, ForeignServer::serverid, and text_to_cstring().

◆ postgres_fdw_disconnect_all()

Datum postgres_fdw_disconnect_all ( PG_FUNCTION_ARGS  )

Definition at line 1614 of file connection.c.

1615 {
1617 }
#define InvalidOid
Definition: postgres_ext.h:36

References disconnect_cached_connections(), InvalidOid, and PG_RETURN_BOOL.

◆ postgres_fdw_get_connections()

Datum postgres_fdw_get_connections ( PG_FUNCTION_ARGS  )

Definition at line 1466 of file connection.c.

1467 {
1468 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1469  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1470  TupleDesc tupdesc;
1471  Tuplestorestate *tupstore;
1472  MemoryContext per_query_ctx;
1473  MemoryContext oldcontext;
1474  HASH_SEQ_STATUS scan;
1475  ConnCacheEntry *entry;
1476 
1477  /* check to see if caller supports us returning a tuplestore */
1478  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1479  ereport(ERROR,
1480  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1481  errmsg("set-valued function called in context that cannot accept a set")));
1482  if (!(rsinfo->allowedModes & SFRM_Materialize))
1483  ereport(ERROR,
1484  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1485  errmsg("materialize mode required, but it is not allowed in this context")));
1486 
1487  /* Build a tuple descriptor for our result type */
1488  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1489  elog(ERROR, "return type must be a row type");
1490 
1491  /* Build tuplestore to hold the result rows */
1492  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1493  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1494 
1495  tupstore = tuplestore_begin_heap(true, false, work_mem);
1496  rsinfo->returnMode = SFRM_Materialize;
1497  rsinfo->setResult = tupstore;
1498  rsinfo->setDesc = tupdesc;
1499 
1500  MemoryContextSwitchTo(oldcontext);
1501 
1502  /* If cache doesn't exist, we return no records */
1503  if (!ConnectionHash)
1504  {
1505  /* clean up and return the tuplestore */
1506  tuplestore_donestoring(tupstore);
1507 
1508  PG_RETURN_VOID();
1509  }
1510 
1511  hash_seq_init(&scan, ConnectionHash);
1512  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1513  {
1514  ForeignServer *server;
1517 
1518  /* We only look for open remote connections */
1519  if (!entry->conn)
1520  continue;
1521 
1522  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
1523 
1524  MemSet(values, 0, sizeof(values));
1525  MemSet(nulls, 0, sizeof(nulls));
1526 
1527  /*
1528  * The foreign server may have been dropped in current explicit
1529  * transaction. It is not possible to drop the server from another
1530  * session when the connection associated with it is in use in the
1531  * current transaction, if tried so, the drop query in another session
1532  * blocks until the current transaction finishes.
1533  *
1534  * Even though the server is dropped in the current transaction, the
1535  * cache can still have associated active connection entry, say we
1536  * call such connections dangling. Since we can not fetch the server
1537  * name from system catalogs for dangling connections, instead we show
1538  * NULL value for server name in output.
1539  *
1540  * We could have done better by storing the server name in the cache
1541  * entry instead of server oid so that it could be used in the output.
1542  * But the server name in each cache entry requires 64 bytes of
1543  * memory, which is huge, when there are many cached connections and
1544  * the use case i.e. dropping the foreign server within the explicit
1545  * current transaction seems rare. So, we chose to show NULL value for
1546  * server name in output.
1547  *
1548  * Such dangling connections get closed either in next use or at the
1549  * end of current explicit transaction in pgfdw_xact_callback.
1550  */
1551  if (!server)
1552  {
1553  /*
1554  * If the server has been dropped in the current explicit
1555  * transaction, then this entry would have been invalidated in
1556  * pgfdw_inval_callback at the end of drop server command. Note
1557  * that this connection would not have been closed in
1558  * pgfdw_inval_callback because it is still being used in the
1559  * current explicit transaction. So, assert that here.
1560  */
1561  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
1562 
1563  /* Show null, if no server name was found */
1564  nulls[0] = true;
1565  }
1566  else
1567  values[0] = CStringGetTextDatum(server->servername);
1568 
1569  values[1] = BoolGetDatum(!entry->invalidated);
1570 
1571  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1572  }
1573 
1574  /* clean up and return the tuplestore */
1575  tuplestore_donestoring(tupstore);
1576 
1577  PG_RETURN_VOID();
1578 }
#define CStringGetTextDatum(s)
Definition: builtins.h:85
#define MemSet(start, val, len)
Definition: c.h:1008
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
@ SFRM_Materialize
Definition: execnodes.h:293
#define PG_RETURN_VOID()
Definition: fmgr.h:349
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
int work_mem
Definition: globals.c:124
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
#define BoolGetDatum(X)
Definition: postgres.h:446
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:238
SetFunctionReturnMode returnMode
Definition: execnodes.h:312
ExprContext * econtext
Definition: execnodes.h:308
TupleDesc setDesc
Definition: execnodes.h:316
Tuplestorestate * setResult
Definition: execnodes.h:315
int allowedModes
Definition: execnodes.h:310
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60

References ReturnSetInfo::allowedModes, Assert(), BoolGetDatum, ConnectionHash, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, FSV_MISSING_OK, get_call_result_type(), GetForeignServerExtended(), hash_seq_init(), hash_seq_search(), if(), IsA, MemoryContextSwitchTo(), MemSet, PG_RETURN_VOID, POSTGRES_FDW_GET_CONNECTIONS_COLS, ReturnSetInfo::returnMode, ForeignServer::servername, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, and work_mem.

◆ ReleaseConnection()

void ReleaseConnection ( PGconn conn)

Definition at line 690 of file connection.c.

691 {
692  /*
693  * Currently, we don't actually track connection references because all
694  * cleanup is managed on a transaction or subtransaction basis instead. So
695  * there's nothing to do here.
696  */
697 }

Referenced by estimate_path_cost_size(), finish_foreign_modify(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndDirectModify(), postgresEndForeignScan(), and postgresImportForeignSchema().

◆ UserMappingPasswordRequired()

static bool UserMappingPasswordRequired ( UserMapping user)
static

Definition at line 528 of file connection.c.

529 {
530  ListCell *cell;
531 
532  foreach(cell, user->options)
533  {
534  DefElem *def = (DefElem *) lfirst(cell);
535 
536  if (strcmp(def->defname, "password_required") == 0)
537  return defGetBoolean(def);
538  }
539 
540  return true;
541 }

References defGetBoolean(), DefElem::defname, lfirst, and user.

Referenced by check_conn_params(), and connect_pg_server().

Variable Documentation

◆ ConnectionHash

◆ cursor_number

unsigned int cursor_number = 0
static

◆ prep_stmt_number

unsigned int prep_stmt_number = 0
static

Definition at line 77 of file connection.c.

Referenced by GetPrepStmtNumber().

◆ xact_got_connection

bool xact_got_connection = false
static

Definition at line 80 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().