PostgreSQL Source Code  git master
connection.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postgres_fdw.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheEntry
 

Macros

#define POSTGRES_FDW_GET_CONNECTIONS_COLS   2
 

Typedefs

typedef Oid ConnCacheKey
 
typedef struct ConnCacheEntry ConnCacheEntry
 

Functions

 PG_FUNCTION_INFO_V1 (postgres_fdw_get_connections)
 
static void make_new_connection (ConnCacheEntry *entry, UserMapping *user)
 
static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
 
static void disconnect_pg_server (ConnCacheEntry *entry)
 
static void check_conn_params (const char **keywords, const char **values, UserMapping *user)
 
static void configure_remote_session (PGconn *conn)
 
static void do_sql_command (PGconn *conn, const char *sql)
 
static void begin_remote_xact (ConnCacheEntry *entry)
 
static void pgfdw_xact_callback (XactEvent event, void *arg)
 
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
 
static void pgfdw_inval_callback (Datum arg, int cacheid, uint32 hashvalue)
 
static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry *entry)
 
static bool pgfdw_cancel_query (PGconn *conn)
 
static bool pgfdw_exec_cleanup_query (PGconn *conn, const char *query, bool ignore_errors)
 
static bool pgfdw_get_cleanup_result (PGconn *conn, TimestampTz endtime, PGresult **result)
 
static bool UserMappingPasswordRequired (UserMapping *user)
 
PGconnGetConnection (UserMapping *user, bool will_prep_stmt)
 
void ReleaseConnection (PGconn *conn)
 
unsigned int GetCursorNumber (PGconn *conn)
 
unsigned int GetPrepStmtNumber (PGconn *conn)
 
PGresultpgfdw_exec_query (PGconn *conn, const char *query)
 
PGresultpgfdw_get_result (PGconn *conn, const char *query)
 
void pgfdw_report_error (int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
 
Datum postgres_fdw_get_connections (PG_FUNCTION_ARGS)
 

Variables

static HTABConnectionHash = NULL
 
static unsigned int cursor_number = 0
 
static unsigned int prep_stmt_number = 0
 
static bool xact_got_connection = false
 

Macro Definition Documentation

◆ POSTGRES_FDW_GET_CONNECTIONS_COLS

#define POSTGRES_FDW_GET_CONNECTIONS_COLS   2

Typedef Documentation

◆ ConnCacheEntry

◆ ConnCacheKey

typedef Oid ConnCacheKey

Definition at line 49 of file connection.c.

Function Documentation

◆ begin_remote_xact()

static void begin_remote_xact ( ConnCacheEntry entry)
static

Definition at line 557 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog, GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

558 {
559  int curlevel = GetCurrentTransactionNestLevel();
560 
561  /* Start main transaction if we haven't yet */
562  if (entry->xact_depth <= 0)
563  {
564  const char *sql;
565 
566  elog(DEBUG3, "starting remote transaction on connection %p",
567  entry->conn);
568 
570  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
571  else
572  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
573  entry->changing_xact_state = true;
574  do_sql_command(entry->conn, sql);
575  entry->xact_depth = 1;
576  entry->changing_xact_state = false;
577  }
578 
579  /*
580  * If we're in a subtransaction, stack up savepoints to match our level.
581  * This ensures we can rollback just the desired effects when a
582  * subtransaction aborts.
583  */
584  while (entry->xact_depth < curlevel)
585  {
586  char sql[64];
587 
588  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
589  entry->changing_xact_state = true;
590  do_sql_command(entry->conn, sql);
591  entry->xact_depth++;
592  entry->changing_xact_state = false;
593  }
594 }
#define DEBUG3
Definition: elog.h:23
bool changing_xact_state
Definition: connection.c:60
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:534
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
PGconn * conn
Definition: connection.c:54
#define IsolationIsSerializable()
Definition: xact.h:52
#define elog(elevel,...)
Definition: elog.h:228
#define snprintf
Definition: port.h:215

◆ check_conn_params()

static void check_conn_params ( const char **  keywords,
const char **  values,
UserMapping user 
)
static

Definition at line 461 of file connection.c.

References ereport, errcode(), errdetail(), errmsg(), ERROR, i, superuser_arg(), UserMapping::userid, and UserMappingPasswordRequired().

Referenced by connect_pg_server().

462 {
463  int i;
464 
465  /* no check required if superuser */
466  if (superuser_arg(user->userid))
467  return;
468 
469  /* ok if params contain a non-empty password */
470  for (i = 0; keywords[i] != NULL; i++)
471  {
472  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
473  return;
474  }
475 
476  /* ok if the superuser explicitly said so at user mapping creation time */
477  if (!UserMappingPasswordRequired(user))
478  return;
479 
480  ereport(ERROR,
481  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
482  errmsg("password is required"),
483  errdetail("Non-superusers must provide a password in the user mapping.")));
484 }
int errcode(int sqlerrcode)
Definition: elog.c:704
Oid userid
Definition: foreign.h:48
#define ERROR
Definition: elog.h:45
int errdetail(const char *fmt,...)
Definition: elog.c:1048
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:438
#define ereport(elevel,...)
Definition: elog.h:155
static Datum values[MAXATTR]
Definition: bootstrap.c:165
int errmsg(const char *fmt,...)
Definition: elog.c:915
int i

◆ configure_remote_session()

static void configure_remote_session ( PGconn conn)
static

Definition at line 498 of file connection.c.

References do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

499 {
500  int remoteversion = PQserverVersion(conn);
501 
502  /* Force the search path to contain only pg_catalog (see deparse.c) */
503  do_sql_command(conn, "SET search_path = pg_catalog");
504 
505  /*
506  * Set remote timezone; this is basically just cosmetic, since all
507  * transmitted and returned timestamptzs should specify a zone explicitly
508  * anyway. However it makes the regression test outputs more predictable.
509  *
510  * We don't risk setting remote zone equal to ours, since the remote
511  * server might use a different timezone database. Instead, use UTC
512  * (quoted, because very old servers are picky about case).
513  */
514  do_sql_command(conn, "SET timezone = 'UTC'");
515 
516  /*
517  * Set values needed to ensure unambiguous data output from remote. (This
518  * logic should match what pg_dump does. See also set_transmission_modes
519  * in postgres_fdw.c.)
520  */
521  do_sql_command(conn, "SET datestyle = ISO");
522  if (remoteversion >= 80400)
523  do_sql_command(conn, "SET intervalstyle = postgres");
524  if (remoteversion >= 90000)
525  do_sql_command(conn, "SET extra_float_digits = 3");
526  else
527  do_sql_command(conn, "SET extra_float_digits = 2");
528 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6615
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:534

◆ connect_pg_server()

static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
)
static

Definition at line 303 of file connection.c.

References AcquireExternalFD(), check_conn_params(), configure_remote_session(), ConnCacheEntry::conn, CONNECTION_OK, ereport, errcode(), errdetail(), errdetail_internal(), errhint(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), list_length(), ForeignServer::options, UserMapping::options, palloc(), pchomp(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQconnectdbParams(), PQconnectionUsedPassword(), PQerrorMessage(), PQfinish(), PQstatus(), ReleaseExternalFD(), ForeignServer::servername, superuser_arg(), UserMapping::userid, UserMappingPasswordRequired(), and values.

Referenced by make_new_connection().

304 {
305  PGconn *volatile conn = NULL;
306 
307  /*
308  * Use PG_TRY block to ensure closing connection on error.
309  */
310  PG_TRY();
311  {
312  const char **keywords;
313  const char **values;
314  int n;
315 
316  /*
317  * Construct connection params from generic options of ForeignServer
318  * and UserMapping. (Some of them might not be libpq options, in
319  * which case we'll just waste a few array slots.) Add 3 extra slots
320  * for fallback_application_name, client_encoding, end marker.
321  */
322  n = list_length(server->options) + list_length(user->options) + 3;
323  keywords = (const char **) palloc(n * sizeof(char *));
324  values = (const char **) palloc(n * sizeof(char *));
325 
326  n = 0;
327  n += ExtractConnectionOptions(server->options,
328  keywords + n, values + n);
330  keywords + n, values + n);
331 
332  /* Use "postgres_fdw" as fallback_application_name. */
333  keywords[n] = "fallback_application_name";
334  values[n] = "postgres_fdw";
335  n++;
336 
337  /* Set client_encoding so that libpq can convert encoding properly. */
338  keywords[n] = "client_encoding";
339  values[n] = GetDatabaseEncodingName();
340  n++;
341 
342  keywords[n] = values[n] = NULL;
343 
344  /* verify the set of connection parameters */
345  check_conn_params(keywords, values, user);
346 
347  /*
348  * We must obey fd.c's limit on non-virtual file descriptors. Assume
349  * that a PGconn represents one long-lived FD. (Doing this here also
350  * ensures that VFDs are closed if needed to make room.)
351  */
352  if (!AcquireExternalFD())
353  {
354 #ifndef WIN32 /* can't write #if within ereport() macro */
355  ereport(ERROR,
356  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
357  errmsg("could not connect to server \"%s\"",
358  server->servername),
359  errdetail("There are too many open files on the local server."),
360  errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
361 #else
362  ereport(ERROR,
363  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
364  errmsg("could not connect to server \"%s\"",
365  server->servername),
366  errdetail("There are too many open files on the local server."),
367  errhint("Raise the server's max_files_per_process setting.")));
368 #endif
369  }
370 
371  /* OK to make connection */
372  conn = PQconnectdbParams(keywords, values, false);
373 
374  if (!conn)
375  ReleaseExternalFD(); /* because the PG_CATCH block won't */
376 
377  if (!conn || PQstatus(conn) != CONNECTION_OK)
378  ereport(ERROR,
379  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
380  errmsg("could not connect to server \"%s\"",
381  server->servername),
382  errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
383 
384  /*
385  * Check that non-superuser has used password to establish connection;
386  * otherwise, he's piggybacking on the postgres server's user
387  * identity. See also dblink_security_check() in contrib/dblink and
388  * check_conn_params.
389  */
390  if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
392  ereport(ERROR,
393  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
394  errmsg("password is required"),
395  errdetail("Non-superuser cannot connect if the server does not request a password."),
396  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
397 
398  /* Prepare new session for use */
400 
401  pfree(keywords);
402  pfree(values);
403  }
404  PG_CATCH();
405  {
406  /* Release PGconn data structure if we managed to create one */
407  if (conn)
408  {
409  PQfinish(conn);
411  }
412  PG_RE_THROW();
413  }
414  PG_END_TRY();
415 
416  return conn;
417 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6625
static void configure_remote_session(PGconn *conn)
Definition: connection.c:498
int errhint(const char *fmt,...)
Definition: elog.c:1162
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:461
int errcode(int sqlerrcode)
Definition: elog.c:704
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4120
Oid userid
Definition: foreign.h:48
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:647
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1075
char * pchomp(const char *in)
Definition: mcxt.c:1215
void pfree(void *pointer)
Definition: mcxt.c:1057
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:333
#define ERROR
Definition: elog.h:45
PGconn * conn
Definition: streamutil.c:54
int errdetail(const char *fmt,...)
Definition: elog.c:1048
List * options
Definition: foreign.h:50
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:438
bool AcquireExternalFD(void)
Definition: fd.c:1076
#define ereport(elevel,...)
Definition: elog.h:155
#define PG_CATCH()
Definition: elog.h:319
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1157
static int list_length(const List *l)
Definition: pg_list.h:149
#define PG_RE_THROW()
Definition: elog.h:350
static Datum values[MAXATTR]
Definition: bootstrap.c:165
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6674
void ReleaseExternalFD(void)
Definition: fd.c:1129
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:915
char * servername
Definition: foreign.h:39
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6572
#define PG_TRY()
Definition: elog.h:309
List * options
Definition: foreign.h:42
#define PG_END_TRY()
Definition: elog.h:334

◆ disconnect_pg_server()

static void disconnect_pg_server ( ConnCacheEntry entry)
static

Definition at line 423 of file connection.c.

References ConnCacheEntry::conn, PQfinish(), and ReleaseExternalFD().

Referenced by GetConnection(), pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), and pgfdw_xact_callback().

424 {
425  if (entry->conn != NULL)
426  {
427  PQfinish(entry->conn);
428  entry->conn = NULL;
430  }
431 }
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4120
PGconn * conn
Definition: connection.c:54
void ReleaseExternalFD(void)
Definition: fd.c:1129

◆ do_sql_command()

static void do_sql_command ( PGconn conn,
const char *  sql 
)
static

Definition at line 534 of file connection.c.

References ERROR, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), and PQsendQuery().

Referenced by begin_remote_xact(), configure_remote_session(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

535 {
536  PGresult *res;
537 
538  if (!PQsendQuery(conn, sql))
539  pgfdw_report_error(ERROR, NULL, conn, false, sql);
540  res = pgfdw_get_result(conn, sql);
541  if (PQresultStatus(res) != PGRES_COMMAND_OK)
542  pgfdw_report_error(ERROR, res, conn, true, sql);
543  PQclear(res);
544 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2651
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1187
#define ERROR
Definition: elog.h:45
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:737
void PQclear(PGresult *res)
Definition: fe-exec.c:676
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:672

◆ GetConnection()

PGconn* GetConnection ( UserMapping user,
bool  will_prep_stmt 
)

Definition at line 117 of file connection.c.

References Assert, begin_remote_xact(), CacheRegisterSyscacheCallback(), ConnCacheEntry::conn, CONNECTION_BAD, CopyErrorData(), CurrentMemoryContext, DEBUG3, disconnect_pg_server(), elog, HASHCTL::entrysize, ereport, errdetail_internal(), errmsg_internal(), FlushErrorState(), FOREIGNSERVEROID, FreeErrorData(), HASH_BLOBS, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, ConnCacheEntry::key, HASHCTL::keysize, make_new_connection(), MemoryContextSwitchTo(), pchomp(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_subxact_callback(), pgfdw_xact_callback(), PQerrorMessage(), PQstatus(), RegisterSubXactCallback(), RegisterXactCallback(), ErrorData::sqlerrcode, UserMapping::umid, USERMAPPINGOID, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by create_foreign_modify(), dumpBlobs(), dumpDatabase(), dumpDatabaseConfig(), dumpTableData_copy(), estimate_path_cost_size(), expand_foreign_server_name_patterns(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignScan(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

118 {
119  bool found;
120  bool retry = false;
121  ConnCacheEntry *entry;
124 
125  /* First time through, initialize connection cache hashtable */
126  if (ConnectionHash == NULL)
127  {
128  HASHCTL ctl;
129 
130  ctl.keysize = sizeof(ConnCacheKey);
131  ctl.entrysize = sizeof(ConnCacheEntry);
132  ConnectionHash = hash_create("postgres_fdw connections", 8,
133  &ctl,
135 
136  /*
137  * Register some callback functions that manage connection cleanup.
138  * This should be done just once in each backend.
139  */
146  }
147 
148  /* Set flag that we did GetConnection during the current transaction */
149  xact_got_connection = true;
150 
151  /* Create hash key for the entry. Assume no pad bytes in key struct */
152  key = user->umid;
153 
154  /*
155  * Find or create cached entry for requested connection.
156  */
157  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
158  if (!found)
159  {
160  /*
161  * We need only clear "conn" here; remaining fields will be filled
162  * later when "conn" is set.
163  */
164  entry->conn = NULL;
165  }
166 
167  /* Reject further use of connections which failed abort cleanup. */
169 
170  /*
171  * If the connection needs to be remade due to invalidation, disconnect as
172  * soon as we're out of all transactions.
173  */
174  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
175  {
176  elog(DEBUG3, "closing connection %p for option changes to take effect",
177  entry->conn);
178  disconnect_pg_server(entry);
179  }
180 
181  /*
182  * If cache entry doesn't have a connection, we have to establish a new
183  * connection. (If connect_pg_server throws an error, the cache entry
184  * will remain in a valid empty state, ie conn == NULL.)
185  */
186  if (entry->conn == NULL)
187  make_new_connection(entry, user);
188 
189  /*
190  * We check the health of the cached connection here when starting a new
191  * remote transaction. If a broken connection is detected, we try to
192  * reestablish a new connection later.
193  */
194  PG_TRY();
195  {
196  /* Start a new transaction or subtransaction if needed. */
197  begin_remote_xact(entry);
198  }
199  PG_CATCH();
200  {
202  ErrorData *errdata = CopyErrorData();
203 
204  /*
205  * If connection failure is reported when starting a new remote
206  * transaction (not subtransaction), new connection will be
207  * reestablished later.
208  *
209  * After a broken connection is detected in libpq, any error other
210  * than connection failure (e.g., out-of-memory) can be thrown
211  * somewhere between return from libpq and the expected ereport() call
212  * in pgfdw_report_error(). In this case, since PQstatus() indicates
213  * CONNECTION_BAD, checking only PQstatus() causes the false detection
214  * of connection failure. To avoid this, we also verify that the
215  * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
216  * checking only the sqlstate can cause another false detection
217  * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
218  * for any libpq-originated error condition.
219  */
220  if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
221  PQstatus(entry->conn) != CONNECTION_BAD ||
222  entry->xact_depth > 0)
223  {
224  MemoryContextSwitchTo(ecxt);
225  PG_RE_THROW();
226  }
227 
228  /* Clean up the error state */
229  FlushErrorState();
230  FreeErrorData(errdata);
231  errdata = NULL;
232 
233  retry = true;
234  }
235  PG_END_TRY();
236 
237  /*
238  * If a broken connection is detected, disconnect it, reestablish a new
239  * connection and retry a new remote transaction. If connection failure is
240  * reported again, we give up getting a connection.
241  */
242  if (retry)
243  {
244  Assert(entry->xact_depth == 0);
245 
246  ereport(DEBUG3,
247  (errmsg_internal("could not start remote transaction on connection %p",
248  entry->conn)),
249  errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
250 
251  elog(DEBUG3, "closing connection %p to reestablish a new one",
252  entry->conn);
253  disconnect_pg_server(entry);
254 
255  if (entry->conn == NULL)
256  make_new_connection(entry, user);
257 
258  begin_remote_xact(entry);
259  }
260 
261  /* Remember if caller will prepare statements */
262  entry->have_prep_stmt |= will_prep_stmt;
263 
264  return entry->conn;
265 }
Oid umid
Definition: foreign.h:47
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6625
#define HASH_ELEM
Definition: hsearch.h:95
int sqlerrcode
Definition: elog.h:378
#define DEBUG3
Definition: elog.h:23
struct ConnCacheEntry ConnCacheEntry
ErrorData * CopyErrorData(void)
Definition: elog.c:1565
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:76
bool have_prep_stmt
Definition: connection.c:58
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
void FlushErrorState(void)
Definition: elog.c:1659
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1075
char * pchomp(const char *in)
Definition: mcxt.c:1215
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1621
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
bool invalidated
Definition: connection.c:61
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1098
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:792
static HTAB * ConnectionHash
Definition: connection.c:70
uintptr_t Datum
Definition: postgres.h:367
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3584
Size keysize
Definition: hsearch.h:75
#define ereport(elevel,...)
Definition: elog.h:155
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1002
#define PG_CATCH()
Definition: elog.h:319
PGconn * conn
Definition: connection.c:54
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:423
#define Assert(condition)
Definition: c.h:792
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3529
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1148
static bool xact_got_connection
Definition: connection.c:77
#define PG_RE_THROW()
Definition: elog.h:350
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition: connection.c:272
#define elog(elevel,...)
Definition: elog.h:228
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:981
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6572
#define PG_TRY()
Definition: elog.h:309
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:557
Oid ConnCacheKey
Definition: connection.c:49
#define PG_END_TRY()
Definition: elog.h:334

◆ GetCursorNumber()

unsigned int GetCursorNumber ( PGconn conn)

Definition at line 621 of file connection.c.

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

622 {
623  return ++cursor_number;
624 }
static unsigned int cursor_number
Definition: connection.c:73

◆ GetPrepStmtNumber()

unsigned int GetPrepStmtNumber ( PGconn conn)

Definition at line 635 of file connection.c.

References prep_stmt_number.

Referenced by prepare_foreign_modify().

636 {
637  return ++prep_stmt_number;
638 }
static unsigned int prep_stmt_number
Definition: connection.c:74

◆ make_new_connection()

static void make_new_connection ( ConnCacheEntry entry,
UserMapping user 
)
static

Definition at line 272 of file connection.c.

References Assert, ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, elog, FOREIGNSERVEROID, GetForeignServer(), GetSysCacheHashValue1, ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, ConnCacheEntry::invalidated, ConnCacheEntry::mapping_hashvalue, ObjectIdGetDatum, ConnCacheEntry::server_hashvalue, ForeignServer::serverid, UserMapping::serverid, ConnCacheEntry::serverid, ForeignServer::servername, UserMapping::umid, UserMapping::userid, USERMAPPINGOID, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

273 {
274  ForeignServer *server = GetForeignServer(user->serverid);
275 
276  Assert(entry->conn == NULL);
277 
278  /* Reset all transient state fields, to be sure all are clean */
279  entry->xact_depth = 0;
280  entry->have_prep_stmt = false;
281  entry->have_error = false;
282  entry->changing_xact_state = false;
283  entry->invalidated = false;
284  entry->serverid = server->serverid;
285  entry->server_hashvalue =
287  ObjectIdGetDatum(server->serverid));
288  entry->mapping_hashvalue =
290  ObjectIdGetDatum(user->umid));
291 
292  /* Now try to make the connection */
293  entry->conn = connect_pg_server(server, user);
294 
295  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
296  entry->conn, server->servername, user->umid, user->userid);
297 }
Oid umid
Definition: foreign.h:47
#define DEBUG3
Definition: elog.h:23
bool have_prep_stmt
Definition: connection.c:58
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:202
uint32 server_hashvalue
Definition: connection.c:63
Oid userid
Definition: foreign.h:48
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
bool changing_xact_state
Definition: connection.c:60
bool invalidated
Definition: connection.c:61
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
uint32 mapping_hashvalue
Definition: connection.c:64
PGconn * conn
Definition: connection.c:54
#define Assert(condition)
Definition: c.h:792
Oid serverid
Definition: foreign.h:49
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:228
Oid serverid
Definition: foreign.h:36
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:303

◆ PG_FUNCTION_INFO_V1()

PG_FUNCTION_INFO_V1 ( postgres_fdw_get_connections  )

◆ pgfdw_cancel_query()

static bool pgfdw_cancel_query ( PGconn conn)
static

Definition at line 1174 of file connection.c.

References ereport, errcode(), errmsg(), GetCurrentTimestamp(), pgfdw_get_cleanup_result(), PQcancel(), PQclear(), PQfreeCancel(), PQgetCancel(), TimestampTzPlusMilliseconds, and WARNING.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

1175 {
1176  PGcancel *cancel;
1177  char errbuf[256];
1178  PGresult *result = NULL;
1179  TimestampTz endtime;
1180 
1181  /*
1182  * If it takes too long to cancel the query and discard the result, assume
1183  * the connection is dead.
1184  */
1186 
1187  /*
1188  * Issue cancel request. Unfortunately, there's no good way to limit the
1189  * amount of time that we might block inside PQgetCancel().
1190  */
1191  if ((cancel = PQgetCancel(conn)))
1192  {
1193  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1194  {
1195  ereport(WARNING,
1196  (errcode(ERRCODE_CONNECTION_FAILURE),
1197  errmsg("could not send cancel request: %s",
1198  errbuf)));
1199  PQfreeCancel(cancel);
1200  return false;
1201  }
1202  PQfreeCancel(cancel);
1203  }
1204 
1205  /* Get and discard the result of the query. */
1206  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1207  return false;
1208  PQclear(result);
1209 
1210  return true;
1211 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1275
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4264
int errcode(int sqlerrcode)
Definition: elog.c:704
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4241
#define WARNING
Definition: elog.h:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:155
void PQclear(PGresult *res)
Definition: fe-exec.c:676
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4396
int errmsg(const char *fmt,...)
Definition: elog.c:915

◆ pgfdw_exec_cleanup_query()

static bool pgfdw_exec_cleanup_query ( PGconn conn,
const char *  query,
bool  ignore_errors 
)
static

Definition at line 1221 of file connection.c.

References GetCurrentTimestamp(), pgfdw_get_cleanup_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), PQsendQuery(), TimestampTzPlusMilliseconds, and WARNING.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

1222 {
1223  PGresult *result = NULL;
1224  TimestampTz endtime;
1225 
1226  /*
1227  * If it takes too long to execute a cleanup query, assume the connection
1228  * is dead. It's fairly likely that this is why we aborted in the first
1229  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1230  * be too long.
1231  */
1233 
1234  /*
1235  * Submit a query. Since we don't use non-blocking mode, this also can
1236  * block. But its risk is relatively small, so we ignore that for now.
1237  */
1238  if (!PQsendQuery(conn, query))
1239  {
1240  pgfdw_report_error(WARNING, NULL, conn, false, query);
1241  return false;
1242  }
1243 
1244  /* Get the result of the query. */
1245  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1246  return false;
1247 
1248  /* Issue a warning if not successful. */
1249  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1250  {
1251  pgfdw_report_error(WARNING, result, conn, true, query);
1252  return ignore_errors;
1253  }
1254  PQclear(result);
1255 
1256  return true;
1257 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1275
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2651
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1187
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:737
#define WARNING
Definition: elog.h:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void PQclear(PGresult *res)
Definition: fe-exec.c:676

◆ pgfdw_exec_query()

PGresult* pgfdw_exec_query ( PGconn conn,
const char *  query 
)

Definition at line 648 of file connection.c.

References ERROR, pgfdw_get_result(), pgfdw_report_error(), and PQsendQuery().

Referenced by close_cursor(), fetch_more_data(), finish_foreign_modify(), get_remote_estimate(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresImportForeignSchema(), and postgresReScanForeignScan().

649 {
650  /*
651  * Submit a query. Since we don't use non-blocking mode, this also can
652  * block. But its risk is relatively small, so we ignore that for now.
653  */
654  if (!PQsendQuery(conn, query))
655  pgfdw_report_error(ERROR, NULL, conn, false, query);
656 
657  /* Wait for the result. */
658  return pgfdw_get_result(conn, query);
659 }
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1187
#define ERROR
Definition: elog.h:45
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:737
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:672

◆ pgfdw_get_cleanup_result()

static bool pgfdw_get_cleanup_result ( PGconn conn,
TimestampTz  endtime,
PGresult **  result 
)
static

Definition at line 1275 of file connection.c.

References CHECK_FOR_INTERRUPTS, GetCurrentTimestamp(), MyLatch, now(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PG_WAIT_EXTENSION, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ResetLatch(), TimestampDifferenceMilliseconds(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by pgfdw_cancel_query(), and pgfdw_exec_cleanup_query().

1276 {
1277  volatile bool timed_out = false;
1278  PGresult *volatile last_res = NULL;
1279 
1280  /* In what follows, do not leak any PGresults on an error. */
1281  PG_TRY();
1282  {
1283  for (;;)
1284  {
1285  PGresult *res;
1286 
1287  while (PQisBusy(conn))
1288  {
1289  int wc;
1291  long cur_timeout;
1292 
1293  /* If timeout has expired, give up, else get sleep time. */
1294  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1295  if (cur_timeout <= 0)
1296  {
1297  timed_out = true;
1298  goto exit;
1299  }
1300 
1301  /* Sleep until there's something to do */
1305  PQsocket(conn),
1306  cur_timeout, PG_WAIT_EXTENSION);
1308 
1310 
1311  /* Data available in socket? */
1312  if (wc & WL_SOCKET_READABLE)
1313  {
1314  if (!PQconsumeInput(conn))
1315  {
1316  /* connection trouble; treat the same as a timeout */
1317  timed_out = true;
1318  goto exit;
1319  }
1320  }
1321  }
1322 
1323  res = PQgetResult(conn);
1324  if (res == NULL)
1325  break; /* query is complete */
1326 
1327  PQclear(last_res);
1328  last_res = res;
1329  }
1330 exit: ;
1331  }
1332  PG_CATCH();
1333  {
1334  PQclear(last_res);
1335  PG_RE_THROW();
1336  }
1337  PG_END_TRY();
1338 
1339  if (timed_out)
1340  PQclear(last_res);
1341  else
1342  *result = last_res;
1343  return timed_out;
1344 }
#define WL_TIMEOUT
Definition: latch.h:127
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(Latch *latch)
Definition: latch.c:588
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
#define PG_WAIT_EXTENSION
Definition: pgstat.h:900
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1672
void PQclear(PGresult *res)
Definition: fe-exec.c:676
#define PG_CATCH()
Definition: elog.h:319
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1722
#define PG_RE_THROW()
Definition: elog.h:350
struct Latch * MyLatch
Definition: globals.c:55
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
#define PG_TRY()
Definition: elog.h:309
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6643
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1746
#define PG_END_TRY()
Definition: elog.h:334
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1691

◆ pgfdw_get_result()

PGresult* pgfdw_get_result ( PGconn conn,
const char *  query 
)

Definition at line 672 of file connection.c.

References CHECK_FOR_INTERRUPTS, ERROR, MyLatch, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PG_WAIT_EXTENSION, pgfdw_report_error(), PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ResetLatch(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by create_cursor(), do_sql_command(), execute_dml_stmt(), execute_foreign_modify(), pgfdw_exec_query(), and prepare_foreign_modify().

673 {
674  PGresult *volatile last_res = NULL;
675 
676  /* In what follows, do not leak any PGresults on an error. */
677  PG_TRY();
678  {
679  for (;;)
680  {
681  PGresult *res;
682 
683  while (PQisBusy(conn))
684  {
685  int wc;
686 
687  /* Sleep until there's something to do */
691  PQsocket(conn),
692  -1L, PG_WAIT_EXTENSION);
694 
696 
697  /* Data available in socket? */
698  if (wc & WL_SOCKET_READABLE)
699  {
700  if (!PQconsumeInput(conn))
701  pgfdw_report_error(ERROR, NULL, conn, false, query);
702  }
703  }
704 
705  res = PQgetResult(conn);
706  if (res == NULL)
707  break; /* query is complete */
708 
709  PQclear(last_res);
710  last_res = res;
711  }
712  }
713  PG_CATCH();
714  {
715  PQclear(last_res);
716  PG_RE_THROW();
717  }
718  PG_END_TRY();
719 
720  return last_res;
721 }
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(Latch *latch)
Definition: latch.c:588
#define ERROR
Definition: elog.h:45
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:737
#define PG_WAIT_EXTENSION
Definition: pgstat.h:900
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1672
void PQclear(PGresult *res)
Definition: fe-exec.c:676
#define PG_CATCH()
Definition: elog.h:319
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1722
#define PG_RE_THROW()
Definition: elog.h:350
struct Latch * MyLatch
Definition: globals.c:55
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
#define PG_TRY()
Definition: elog.h:309
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6643
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1746
#define PG_END_TRY()
Definition: elog.h:334
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ pgfdw_inval_callback()

static void pgfdw_inval_callback ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1098 of file connection.c.

References Assert, ConnCacheEntry::conn, DEBUG3, disconnect_pg_server(), elog, FOREIGNSERVEROID, hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, ConnCacheEntry::mapping_hashvalue, ConnCacheEntry::server_hashvalue, USERMAPPINGOID, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

1099 {
1100  HASH_SEQ_STATUS scan;
1101  ConnCacheEntry *entry;
1102 
1103  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1104 
1105  /* ConnectionHash must exist already, if we're registered */
1106  hash_seq_init(&scan, ConnectionHash);
1107  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1108  {
1109  /* Ignore invalid entries */
1110  if (entry->conn == NULL)
1111  continue;
1112 
1113  /* hashvalue == 0 means a cache reset, must clear all state */
1114  if (hashvalue == 0 ||
1115  (cacheid == FOREIGNSERVEROID &&
1116  entry->server_hashvalue == hashvalue) ||
1117  (cacheid == USERMAPPINGOID &&
1118  entry->mapping_hashvalue == hashvalue))
1119  {
1120  /*
1121  * Close the connection immediately if it's not used yet in this
1122  * transaction. Otherwise mark it as invalid so that
1123  * pgfdw_xact_callback() can close it at the end of this
1124  * transaction.
1125  */
1126  if (entry->xact_depth == 0)
1127  {
1128  elog(DEBUG3, "discarding connection %p", entry->conn);
1129  disconnect_pg_server(entry);
1130  }
1131  else
1132  entry->invalidated = true;
1133  }
1134  }
1135 }
#define DEBUG3
Definition: elog.h:23
uint32 server_hashvalue
Definition: connection.c:63
bool invalidated
Definition: connection.c:61
static HTAB * ConnectionHash
Definition: connection.c:70
uint32 mapping_hashvalue
Definition: connection.c:64
PGconn * conn
Definition: connection.c:54
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:423
#define Assert(condition)
Definition: c.h:792
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
#define elog(elevel,...)
Definition: elog.h:228

◆ pgfdw_reject_incomplete_xact_state_change()

static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry entry)
static

Definition at line 1148 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, disconnect_pg_server(), ereport, errcode(), errmsg(), ERROR, GetForeignServer(), ConnCacheEntry::serverid, and ForeignServer::servername.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

1149 {
1150  ForeignServer *server;
1151 
1152  /* nothing to do for inactive entries and entries of sane state */
1153  if (entry->conn == NULL || !entry->changing_xact_state)
1154  return;
1155 
1156  /* make sure this entry is inactive */
1157  disconnect_pg_server(entry);
1158 
1159  /* find server name to be shown in the message below */
1160  server = GetForeignServer(entry->serverid);
1161 
1162  ereport(ERROR,
1163  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1164  errmsg("connection to server \"%s\" was lost",
1165  server->servername)));
1166 }
int errcode(int sqlerrcode)
Definition: elog.c:704
#define ERROR
Definition: elog.h:45
bool changing_xact_state
Definition: connection.c:60
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
#define ereport(elevel,...)
Definition: elog.h:155
PGconn * conn
Definition: connection.c:54
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:423
int errmsg(const char *fmt,...)
Definition: elog.c:915
char * servername
Definition: foreign.h:39

◆ pgfdw_report_error()

void pgfdw_report_error ( int  elevel,
PGresult res,
PGconn conn,
bool  clear,
const char *  sql 
)

Definition at line 737 of file connection.c.

References ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, pchomp(), PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_FINALLY, PG_TRY, PQclear(), PQerrorMessage(), and PQresultErrorField().

Referenced by close_cursor(), create_cursor(), do_sql_command(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), finish_foreign_modify(), get_remote_estimate(), pgfdw_exec_cleanup_query(), pgfdw_exec_query(), pgfdw_get_result(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresImportForeignSchema(), postgresReScanForeignScan(), and prepare_foreign_modify().

739 {
740  /* If requested, PGresult must be released before leaving this function. */
741  PG_TRY();
742  {
743  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
744  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
745  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
746  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
747  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
748  int sqlstate;
749 
750  if (diag_sqlstate)
751  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
752  diag_sqlstate[1],
753  diag_sqlstate[2],
754  diag_sqlstate[3],
755  diag_sqlstate[4]);
756  else
757  sqlstate = ERRCODE_CONNECTION_FAILURE;
758 
759  /*
760  * If we don't get a message from the PGresult, try the PGconn. This
761  * is needed because for connection-level failures, PQexec may just
762  * return NULL, not a PGresult at all.
763  */
764  if (message_primary == NULL)
765  message_primary = pchomp(PQerrorMessage(conn));
766 
767  ereport(elevel,
768  (errcode(sqlstate),
769  message_primary ? errmsg_internal("%s", message_primary) :
770  errmsg("could not obtain message string for remote error"),
771  message_detail ? errdetail_internal("%s", message_detail) : 0,
772  message_hint ? errhint("%s", message_hint) : 0,
773  message_context ? errcontext("%s", message_context) : 0,
774  sql ? errcontext("remote SQL command: %s", sql) : 0));
775  }
776  PG_FINALLY();
777  {
778  if (clear)
779  PQclear(res);
780  }
781  PG_END_TRY();
782 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6625
int errhint(const char *fmt,...)
Definition: elog.c:1162
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:62
int errcode(int sqlerrcode)
Definition: elog.c:704
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1075
char * pchomp(const char *in)
Definition: mcxt.c:1215
static int elevel
Definition: vacuumlazy.c:333
#define PG_FINALLY()
Definition: elog.h:326
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
#define ereport(elevel,...)
Definition: elog.h:155
void PQclear(PGresult *res)
Definition: fe-exec.c:676
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1002
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2713
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define errcontext
Definition: elog.h:199
#define PG_TRY()
Definition: elog.h:309
#define PG_END_TRY()
Definition: elog.h:334
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64

◆ pgfdw_subxact_callback()

static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
)
static

Definition at line 981 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, do_sql_command(), elog, ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, in_error_recursion_trouble(), pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), pgfdw_reject_incomplete_xact_state_change(), PQTRANS_ACTIVE, PQtransactionStatus(), snprintf, SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

983 {
984  HASH_SEQ_STATUS scan;
985  ConnCacheEntry *entry;
986  int curlevel;
987 
988  /* Nothing to do at subxact start, nor after commit. */
989  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
990  event == SUBXACT_EVENT_ABORT_SUB))
991  return;
992 
993  /* Quick exit if no connections were touched in this transaction. */
994  if (!xact_got_connection)
995  return;
996 
997  /*
998  * Scan all connection cache entries to find open remote subtransactions
999  * of the current level, and close them.
1000  */
1001  curlevel = GetCurrentTransactionNestLevel();
1002  hash_seq_init(&scan, ConnectionHash);
1003  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1004  {
1005  char sql[100];
1006 
1007  /*
1008  * We only care about connections with open remote subtransactions of
1009  * the current level.
1010  */
1011  if (entry->conn == NULL || entry->xact_depth < curlevel)
1012  continue;
1013 
1014  if (entry->xact_depth > curlevel)
1015  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1016  entry->xact_depth);
1017 
1018  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1019  {
1020  /*
1021  * If abort cleanup previously failed for this connection, we
1022  * can't issue any more commands against it.
1023  */
1025 
1026  /* Commit all remote subtransactions during pre-commit */
1027  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1028  entry->changing_xact_state = true;
1029  do_sql_command(entry->conn, sql);
1030  entry->changing_xact_state = false;
1031  }
1032  else if (in_error_recursion_trouble())
1033  {
1034  /*
1035  * Don't try to clean up the connection if we're already in error
1036  * recursion trouble.
1037  */
1038  entry->changing_xact_state = true;
1039  }
1040  else if (!entry->changing_xact_state)
1041  {
1042  bool abort_cleanup_failure = false;
1043 
1044  /* Remember that abort cleanup is in progress. */
1045  entry->changing_xact_state = true;
1046 
1047  /* Assume we might have lost track of prepared statements */
1048  entry->have_error = true;
1049 
1050  /*
1051  * If a command has been submitted to the remote server by using
1052  * an asynchronous execution function, the command might not have
1053  * yet completed. Check to see if a command is still being
1054  * processed by the remote server, and if so, request cancellation
1055  * of the command.
1056  */
1057  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1058  !pgfdw_cancel_query(entry->conn))
1059  abort_cleanup_failure = true;
1060  else
1061  {
1062  /* Rollback all remote subtransactions during abort */
1063  snprintf(sql, sizeof(sql),
1064  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
1065  curlevel, curlevel);
1066  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1067  abort_cleanup_failure = true;
1068  }
1069 
1070  /* Disarm changing_xact_state if it all worked. */
1071  entry->changing_xact_state = abort_cleanup_failure;
1072  }
1073 
1074  /* OK, we're outta that level of subtransaction */
1075  entry->xact_depth--;
1076  }
1077 }
#define ERROR
Definition: elog.h:45
bool changing_xact_state
Definition: connection.c:60
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:534
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6580
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1221
static HTAB * ConnectionHash
Definition: connection.c:70
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1174
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
bool in_error_recursion_trouble(void)
Definition: elog.c:291
PGconn * conn
Definition: connection.c:54
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1148
static bool xact_got_connection
Definition: connection.c:77
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
#define elog(elevel,...)
Definition: elog.h:228
#define snprintf
Definition: port.h:215

◆ pgfdw_xact_callback()

static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
)
static

Definition at line 792 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_OK, cursor_number, DEBUG3, disconnect_pg_server(), do_sql_command(), elog, ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, in_error_recursion_trouble(), ConnCacheEntry::invalidated, pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), pgfdw_reject_incomplete_xact_state_change(), PQclear(), PQexec(), PQstatus(), PQTRANS_ACTIVE, PQTRANS_IDLE, PQtransactionStatus(), ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

793 {
794  HASH_SEQ_STATUS scan;
795  ConnCacheEntry *entry;
796 
797  /* Quick exit if no connections were touched in this transaction. */
798  if (!xact_got_connection)
799  return;
800 
801  /*
802  * Scan all connection cache entries to find open remote transactions, and
803  * close them.
804  */
806  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
807  {
808  PGresult *res;
809 
810  /* Ignore cache entry if no open connection right now */
811  if (entry->conn == NULL)
812  continue;
813 
814  /* If it has an open remote transaction, try to close it */
815  if (entry->xact_depth > 0)
816  {
817  bool abort_cleanup_failure = false;
818 
819  elog(DEBUG3, "closing remote transaction on connection %p",
820  entry->conn);
821 
822  switch (event)
823  {
826 
827  /*
828  * If abort cleanup previously failed for this connection,
829  * we can't issue any more commands against it.
830  */
832 
833  /* Commit all remote transactions during pre-commit */
834  entry->changing_xact_state = true;
835  do_sql_command(entry->conn, "COMMIT TRANSACTION");
836  entry->changing_xact_state = false;
837 
838  /*
839  * If there were any errors in subtransactions, and we
840  * made prepared statements, do a DEALLOCATE ALL to make
841  * sure we get rid of all prepared statements. This is
842  * annoying and not terribly bulletproof, but it's
843  * probably not worth trying harder.
844  *
845  * DEALLOCATE ALL only exists in 8.3 and later, so this
846  * constrains how old a server postgres_fdw can
847  * communicate with. We intentionally ignore errors in
848  * the DEALLOCATE, so that we can hobble along to some
849  * extent with older servers (leaking prepared statements
850  * as we go; but we don't really support update operations
851  * pre-8.3 anyway).
852  */
853  if (entry->have_prep_stmt && entry->have_error)
854  {
855  res = PQexec(entry->conn, "DEALLOCATE ALL");
856  PQclear(res);
857  }
858  entry->have_prep_stmt = false;
859  entry->have_error = false;
860  break;
862 
863  /*
864  * We disallow any remote transactions, since it's not
865  * very reasonable to hold them open until the prepared
866  * transaction is committed. For the moment, throw error
867  * unconditionally; later we might allow read-only cases.
868  * Note that the error will cause us to come right back
869  * here with event == XACT_EVENT_ABORT, so we'll clean up
870  * the connection state at that point.
871  */
872  ereport(ERROR,
873  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
874  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
875  break;
877  case XACT_EVENT_COMMIT:
878  case XACT_EVENT_PREPARE:
879  /* Pre-commit should have closed the open transaction */
880  elog(ERROR, "missed cleaning up connection during pre-commit");
881  break;
883  case XACT_EVENT_ABORT:
884 
885  /*
886  * Don't try to clean up the connection if we're already
887  * in error recursion trouble.
888  */
890  entry->changing_xact_state = true;
891 
892  /*
893  * If connection is already unsalvageable, don't touch it
894  * further.
895  */
896  if (entry->changing_xact_state)
897  break;
898 
899  /*
900  * Mark this connection as in the process of changing
901  * transaction state.
902  */
903  entry->changing_xact_state = true;
904 
905  /* Assume we might have lost track of prepared statements */
906  entry->have_error = true;
907 
908  /*
909  * If a command has been submitted to the remote server by
910  * using an asynchronous execution function, the command
911  * might not have yet completed. Check to see if a
912  * command is still being processed by the remote server,
913  * and if so, request cancellation of the command.
914  */
915  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
916  !pgfdw_cancel_query(entry->conn))
917  {
918  /* Unable to cancel running query. */
919  abort_cleanup_failure = true;
920  }
921  else if (!pgfdw_exec_cleanup_query(entry->conn,
922  "ABORT TRANSACTION",
923  false))
924  {
925  /* Unable to abort remote transaction. */
926  abort_cleanup_failure = true;
927  }
928  else if (entry->have_prep_stmt && entry->have_error &&
930  "DEALLOCATE ALL",
931  true))
932  {
933  /* Trouble clearing prepared statements. */
934  abort_cleanup_failure = true;
935  }
936  else
937  {
938  entry->have_prep_stmt = false;
939  entry->have_error = false;
940  }
941 
942  /* Disarm changing_xact_state if it all worked. */
943  entry->changing_xact_state = abort_cleanup_failure;
944  break;
945  }
946  }
947 
948  /* Reset state to show we're out of a transaction */
949  entry->xact_depth = 0;
950 
951  /*
952  * If the connection isn't in a good idle state or it is marked as
953  * invalid, then discard it to recover. Next GetConnection will open a
954  * new connection.
955  */
956  if (PQstatus(entry->conn) != CONNECTION_OK ||
958  entry->changing_xact_state ||
959  entry->invalidated)
960  {
961  elog(DEBUG3, "discarding connection %p", entry->conn);
962  disconnect_pg_server(entry);
963  }
964  }
965 
966  /*
967  * Regardless of the event type, we can now mark ourselves as out of the
968  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
969  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
970  */
971  xact_got_connection = false;
972 
973  /* Also reset cursor numbering for next transaction */
974  cursor_number = 0;
975 }
#define DEBUG3
Definition: elog.h:23
int errcode(int sqlerrcode)
Definition: elog.c:704
bool have_prep_stmt
Definition: connection.c:58
#define ERROR
Definition: elog.h:45
bool changing_xact_state
Definition: connection.c:60
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:534
static unsigned int cursor_number
Definition: connection.c:73
bool invalidated
Definition: connection.c:61
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6580
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1221
static HTAB * ConnectionHash
Definition: connection.c:70
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1174
#define ereport(elevel,...)
Definition: elog.h:155
void PQclear(PGresult *res)
Definition: fe-exec.c:676
bool in_error_recursion_trouble(void)
Definition: elog.c:291
PGconn * conn
Definition: connection.c:54
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:423
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1148
static bool xact_got_connection
Definition: connection.c:77
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define elog(elevel,...)
Definition: elog.h:228
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1907
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6572

◆ postgres_fdw_get_connections()

Datum postgres_fdw_get_connections ( PG_FUNCTION_ARGS  )

Definition at line 1360 of file connection.c.

References ReturnSetInfo::allowedModes, Assert, BoolGetDatum, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, FSV_MISSING_OK, get_call_result_type(), GetForeignServerExtended(), hash_seq_init(), hash_seq_search(), IsA, MemoryContextSwitchTo(), MemSet, PG_RETURN_VOID, POSTGRES_FDW_GET_CONNECTIONS_COLS, ReturnSetInfo::returnMode, ForeignServer::servername, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, and work_mem.

1361 {
1362 #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1363  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1364  TupleDesc tupdesc;
1365  Tuplestorestate *tupstore;
1366  MemoryContext per_query_ctx;
1367  MemoryContext oldcontext;
1368  HASH_SEQ_STATUS scan;
1369  ConnCacheEntry *entry;
1370 
1371  /* check to see if caller supports us returning a tuplestore */
1372  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1373  ereport(ERROR,
1374  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1375  errmsg("set-valued function called in context that cannot accept a set")));
1376  if (!(rsinfo->allowedModes & SFRM_Materialize))
1377  ereport(ERROR,
1378  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1379  errmsg("materialize mode required, but it is not allowed in this context")));
1380 
1381  /* Build a tuple descriptor for our result type */
1382  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1383  elog(ERROR, "return type must be a row type");
1384 
1385  /* Build tuplestore to hold the result rows */
1386  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1387  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1388 
1389  tupstore = tuplestore_begin_heap(true, false, work_mem);
1390  rsinfo->returnMode = SFRM_Materialize;
1391  rsinfo->setResult = tupstore;
1392  rsinfo->setDesc = tupdesc;
1393 
1394  MemoryContextSwitchTo(oldcontext);
1395 
1396  /* If cache doesn't exist, we return no records */
1397  if (!ConnectionHash)
1398  {
1399  /* clean up and return the tuplestore */
1400  tuplestore_donestoring(tupstore);
1401 
1402  PG_RETURN_VOID();
1403  }
1404 
1405  hash_seq_init(&scan, ConnectionHash);
1406  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1407  {
1408  ForeignServer *server;
1411 
1412  /* We only look for open remote connections */
1413  if (!entry->conn)
1414  continue;
1415 
1416  server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
1417 
1418  MemSet(values, 0, sizeof(values));
1419  MemSet(nulls, 0, sizeof(nulls));
1420 
1421  /*
1422  * The foreign server may have been dropped in current explicit
1423  * transaction. It is not possible to drop the server from another
1424  * session when the connection associated with it is in use in the
1425  * current transaction, if tried so, the drop query in another session
1426  * blocks until the current transaction finishes.
1427  *
1428  * Even though the server is dropped in the current transaction, the
1429  * cache can still have associated active connection entry, say we
1430  * call such connections dangling. Since we can not fetch the server
1431  * name from system catalogs for dangling connections, instead we
1432  * show NULL value for server name in output.
1433  *
1434  * We could have done better by storing the server name in the cache
1435  * entry instead of server oid so that it could be used in the output.
1436  * But the server name in each cache entry requires 64 bytes of
1437  * memory, which is huge, when there are many cached connections and
1438  * the use case i.e. dropping the foreign server within the explicit
1439  * current transaction seems rare. So, we chose to show NULL value for
1440  * server name in output.
1441  *
1442  * Such dangling connections get closed either in next use or at the
1443  * end of current explicit transaction in pgfdw_xact_callback.
1444  */
1445  if (!server)
1446  {
1447  /*
1448  * If the server has been dropped in the current explicit
1449  * transaction, then this entry would have been invalidated in
1450  * pgfdw_inval_callback at the end of drop sever command. Note
1451  * that this connection would not have been closed in
1452  * pgfdw_inval_callback because it is still being used in the
1453  * current explicit transaction. So, assert that here.
1454  */
1455  Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
1456 
1457  /* Show null, if no server name was found */
1458  nulls[0] = true;
1459  }
1460  else
1461  values[0] = CStringGetTextDatum(server->servername);
1462 
1463  values[1] = BoolGetDatum(!entry->invalidated);
1464 
1465  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1466  }
1467 
1468  /* clean up and return the tuplestore */
1469  tuplestore_donestoring(tupstore);
1470 
1471  PG_RETURN_VOID();
1472 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int errcode(int sqlerrcode)
Definition: elog.c:704
#define MemSet(start, val, len)
Definition: c.h:996
#define ERROR
Definition: elog.h:45
#define FSV_MISSING_OK
Definition: foreign.h:61
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
static HTAB * ConnectionHash
Definition: connection.c:70
uintptr_t Datum
Definition: postgres.h:367
int work_mem
Definition: globals.c:122
#define BoolGetDatum(X)
Definition: postgres.h:402
#define ereport(elevel,...)
Definition: elog.h:155
int allowedModes
Definition: execnodes.h:304
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition: foreign.c:121
#define PG_RETURN_VOID()
Definition: fmgr.h:349
SetFunctionReturnMode returnMode
Definition: execnodes.h:306
#define Assert(condition)
Definition: c.h:792
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:232
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
Tuplestorestate * setResult
Definition: execnodes.h:309
static Datum values[MAXATTR]
Definition: bootstrap.c:165
ExprContext * econtext
Definition: execnodes.h:302
TupleDesc setDesc
Definition: execnodes.h:310
int errmsg(const char *fmt,...)
Definition: elog.c:915
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:228
#define CStringGetTextDatum(s)
Definition: builtins.h:82
#define POSTGRES_FDW_GET_CONNECTIONS_COLS

◆ ReleaseConnection()

void ReleaseConnection ( PGconn conn)

Definition at line 600 of file connection.c.

Referenced by estimate_path_cost_size(), finish_foreign_modify(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndDirectModify(), postgresEndForeignScan(), and postgresImportForeignSchema().

601 {
602  /*
603  * Currently, we don't actually track connection references because all
604  * cleanup is managed on a transaction or subtransaction basis instead. So
605  * there's nothing to do here.
606  */
607 }

◆ UserMappingPasswordRequired()

static bool UserMappingPasswordRequired ( UserMapping user)
static

Definition at line 438 of file connection.c.

References defGetBoolean(), DefElem::defname, lfirst, and UserMapping::options.

Referenced by check_conn_params(), and connect_pg_server().

439 {
440  ListCell *cell;
441 
442  foreach(cell, user->options)
443  {
444  DefElem *def = (DefElem *) lfirst(cell);
445 
446  if (strcmp(def->defname, "password_required") == 0)
447  return defGetBoolean(def);
448  }
449 
450  return true;
451 }
bool defGetBoolean(DefElem *def)
Definition: define.c:111
List * options
Definition: foreign.h:50
#define lfirst(lc)
Definition: pg_list.h:169
char * defname
Definition: parsenodes.h:733

Variable Documentation

◆ ConnectionHash

HTAB* ConnectionHash = NULL
static

Definition at line 70 of file connection.c.

◆ cursor_number

unsigned int cursor_number = 0
static

Definition at line 73 of file connection.c.

Referenced by GetCursorNumber(), and pgfdw_xact_callback().

◆ prep_stmt_number

unsigned int prep_stmt_number = 0
static

Definition at line 74 of file connection.c.

Referenced by GetPrepStmtNumber().

◆ xact_got_connection

bool xact_got_connection = false
static

Definition at line 77 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().