PostgreSQL Source Code  git master
connection.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postgres_fdw.h"
#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheEntry
 

Typedefs

typedef Oid ConnCacheKey
 
typedef struct ConnCacheEntry ConnCacheEntry
 

Functions

static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
 
static void disconnect_pg_server (ConnCacheEntry *entry)
 
static void check_conn_params (const char **keywords, const char **values, UserMapping *user)
 
static void configure_remote_session (PGconn *conn)
 
static void do_sql_command (PGconn *conn, const char *sql)
 
static void begin_remote_xact (ConnCacheEntry *entry)
 
static void pgfdw_xact_callback (XactEvent event, void *arg)
 
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
 
static void pgfdw_inval_callback (Datum arg, int cacheid, uint32 hashvalue)
 
static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry *entry)
 
static bool pgfdw_cancel_query (PGconn *conn)
 
static bool pgfdw_exec_cleanup_query (PGconn *conn, const char *query, bool ignore_errors)
 
static bool pgfdw_get_cleanup_result (PGconn *conn, TimestampTz endtime, PGresult **result)
 
static bool UserMappingPasswordRequired (UserMapping *user)
 
PGconnGetConnection (UserMapping *user, bool will_prep_stmt)
 
void ReleaseConnection (PGconn *conn)
 
unsigned int GetCursorNumber (PGconn *conn)
 
unsigned int GetPrepStmtNumber (PGconn *conn)
 
PGresultpgfdw_exec_query (PGconn *conn, const char *query)
 
PGresultpgfdw_get_result (PGconn *conn, const char *query)
 
void pgfdw_report_error (int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
 

Variables

static HTABConnectionHash = NULL
 
static unsigned int cursor_number = 0
 
static unsigned int prep_stmt_number = 0
 
static bool xact_got_connection = false
 

Typedef Documentation

◆ ConnCacheEntry

◆ ConnCacheKey

typedef Oid ConnCacheKey

Definition at line 45 of file connection.c.

Function Documentation

◆ begin_remote_xact()

static void begin_remote_xact ( ConnCacheEntry entry)
static

Definition at line 441 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog, GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

442 {
443  int curlevel = GetCurrentTransactionNestLevel();
444 
445  /* Start main transaction if we haven't yet */
446  if (entry->xact_depth <= 0)
447  {
448  const char *sql;
449 
450  elog(DEBUG3, "starting remote transaction on connection %p",
451  entry->conn);
452 
454  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
455  else
456  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
457  entry->changing_xact_state = true;
458  do_sql_command(entry->conn, sql);
459  entry->xact_depth = 1;
460  entry->changing_xact_state = false;
461  }
462 
463  /*
464  * If we're in a subtransaction, stack up savepoints to match our level.
465  * This ensures we can rollback just the desired effects when a
466  * subtransaction aborts.
467  */
468  while (entry->xact_depth < curlevel)
469  {
470  char sql[64];
471 
472  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
473  entry->changing_xact_state = true;
474  do_sql_command(entry->conn, sql);
475  entry->xact_depth++;
476  entry->changing_xact_state = false;
477  }
478 }
#define DEBUG3
Definition: elog.h:23
bool changing_xact_state
Definition: connection.c:56
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:418
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
PGconn * conn
Definition: connection.c:50
#define IsolationIsSerializable()
Definition: xact.h:52
#define elog(elevel,...)
Definition: elog.h:228
#define snprintf
Definition: port.h:192

◆ check_conn_params()

static void check_conn_params ( const char **  keywords,
const char **  values,
UserMapping user 
)
static

Definition at line 345 of file connection.c.

References ereport, errcode(), errdetail(), errmsg(), ERROR, i, superuser_arg(), UserMapping::userid, and UserMappingPasswordRequired().

Referenced by connect_pg_server().

346 {
347  int i;
348 
349  /* no check required if superuser */
350  if (superuser_arg(user->userid))
351  return;
352 
353  /* ok if params contain a non-empty password */
354  for (i = 0; keywords[i] != NULL; i++)
355  {
356  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
357  return;
358  }
359 
360  /* ok if the superuser explicitly said so at user mapping creation time */
361  if (!UserMappingPasswordRequired(user))
362  return;
363 
364  ereport(ERROR,
365  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
366  errmsg("password is required"),
367  errdetail("Non-superusers must provide a password in the user mapping.")));
368 }
int errcode(int sqlerrcode)
Definition: elog.c:608
Oid userid
Definition: foreign.h:48
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:955
#define ereport(elevel, rest)
Definition: elog.h:141
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:323
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
int i

◆ configure_remote_session()

static void configure_remote_session ( PGconn conn)
static

Definition at line 382 of file connection.c.

References do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

383 {
384  int remoteversion = PQserverVersion(conn);
385 
386  /* Force the search path to contain only pg_catalog (see deparse.c) */
387  do_sql_command(conn, "SET search_path = pg_catalog");
388 
389  /*
390  * Set remote timezone; this is basically just cosmetic, since all
391  * transmitted and returned timestamptzs should specify a zone explicitly
392  * anyway. However it makes the regression test outputs more predictable.
393  *
394  * We don't risk setting remote zone equal to ours, since the remote
395  * server might use a different timezone database. Instead, use UTC
396  * (quoted, because very old servers are picky about case).
397  */
398  do_sql_command(conn, "SET timezone = 'UTC'");
399 
400  /*
401  * Set values needed to ensure unambiguous data output from remote. (This
402  * logic should match what pg_dump does. See also set_transmission_modes
403  * in postgres_fdw.c.)
404  */
405  do_sql_command(conn, "SET datestyle = ISO");
406  if (remoteversion >= 80400)
407  do_sql_command(conn, "SET intervalstyle = postgres");
408  if (remoteversion >= 90000)
409  do_sql_command(conn, "SET extra_float_digits = 3");
410  else
411  do_sql_command(conn, "SET extra_float_digits = 2");
412 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6674
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:418

◆ connect_pg_server()

static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
)
static

Definition at line 221 of file connection.c.

References check_conn_params(), configure_remote_session(), ConnCacheEntry::conn, CONNECTION_OK, ereport, errcode(), errdetail(), errdetail_internal(), errhint(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), list_length(), ForeignServer::options, UserMapping::options, palloc(), pchomp(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQconnectdbParams(), PQconnectionUsedPassword(), PQerrorMessage(), PQfinish(), PQstatus(), ForeignServer::servername, superuser_arg(), UserMapping::userid, UserMappingPasswordRequired(), and values.

Referenced by GetConnection().

222 {
223  PGconn *volatile conn = NULL;
224 
225  /*
226  * Use PG_TRY block to ensure closing connection on error.
227  */
228  PG_TRY();
229  {
230  const char **keywords;
231  const char **values;
232  int n;
233 
234  /*
235  * Construct connection params from generic options of ForeignServer
236  * and UserMapping. (Some of them might not be libpq options, in
237  * which case we'll just waste a few array slots.) Add 3 extra slots
238  * for fallback_application_name, client_encoding, end marker.
239  */
240  n = list_length(server->options) + list_length(user->options) + 3;
241  keywords = (const char **) palloc(n * sizeof(char *));
242  values = (const char **) palloc(n * sizeof(char *));
243 
244  n = 0;
245  n += ExtractConnectionOptions(server->options,
246  keywords + n, values + n);
248  keywords + n, values + n);
249 
250  /* Use "postgres_fdw" as fallback_application_name. */
251  keywords[n] = "fallback_application_name";
252  values[n] = "postgres_fdw";
253  n++;
254 
255  /* Set client_encoding so that libpq can convert encoding properly. */
256  keywords[n] = "client_encoding";
257  values[n] = GetDatabaseEncodingName();
258  n++;
259 
260  keywords[n] = values[n] = NULL;
261 
262  /* verify connection parameters and make connection */
263  check_conn_params(keywords, values, user);
264 
265  conn = PQconnectdbParams(keywords, values, false);
266  if (!conn || PQstatus(conn) != CONNECTION_OK)
267  ereport(ERROR,
268  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
269  errmsg("could not connect to server \"%s\"",
270  server->servername),
271  errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
272 
273  /*
274  * Check that non-superuser has used password to establish connection;
275  * otherwise, he's piggybacking on the postgres server's user
276  * identity. See also dblink_security_check() in contrib/dblink
277  * and check_conn_params.
278  */
279  if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
281  ereport(ERROR,
282  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
283  errmsg("password is required"),
284  errdetail("Non-superuser cannot connect if the server does not request a password."),
285  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
286 
287  /* Prepare new session for use */
289 
290  pfree(keywords);
291  pfree(values);
292  }
293  PG_CATCH();
294  {
295  /* Release PGconn data structure if we managed to create one */
296  if (conn)
297  PQfinish(conn);
298  PG_RE_THROW();
299  }
300  PG_END_TRY();
301 
302  return conn;
303 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6684
static void configure_remote_session(PGconn *conn)
Definition: connection.c:382
int errhint(const char *fmt,...)
Definition: elog.c:1069
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:345
int errcode(int sqlerrcode)
Definition: elog.c:608
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4182
Oid userid
Definition: foreign.h:48
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:640
int errdetail_internal(const char *fmt,...)
Definition: elog.c:982
char * pchomp(const char *in)
Definition: mcxt.c:1214
void pfree(void *pointer)
Definition: mcxt.c:1056
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:333
#define ERROR
Definition: elog.h:43
PGconn * conn
Definition: streamutil.c:54
int errdetail(const char *fmt,...)
Definition: elog.c:955
List * options
Definition: foreign.h:50
#define ereport(elevel, rest)
Definition: elog.h:141
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:323
#define PG_CATCH()
Definition: elog.h:332
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1052
static int list_length(const List *l)
Definition: pg_list.h:169
#define PG_RE_THROW()
Definition: elog.h:363
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6733
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
char * servername
Definition: foreign.h:39
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6631
#define PG_TRY()
Definition: elog.h:322
List * options
Definition: foreign.h:42
#define PG_END_TRY()
Definition: elog.h:347

◆ disconnect_pg_server()

static void disconnect_pg_server ( ConnCacheEntry entry)
static

Definition at line 309 of file connection.c.

References ConnCacheEntry::conn, and PQfinish().

Referenced by GetConnection(), pgfdw_reject_incomplete_xact_state_change(), and pgfdw_xact_callback().

310 {
311  if (entry->conn != NULL)
312  {
313  PQfinish(entry->conn);
314  entry->conn = NULL;
315  }
316 }
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4182
PGconn * conn
Definition: connection.c:50

◆ do_sql_command()

static void do_sql_command ( PGconn conn,
const char *  sql 
)
static

Definition at line 418 of file connection.c.

References ERROR, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), and PQsendQuery().

Referenced by begin_remote_xact(), configure_remote_session(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

419 {
420  PGresult *res;
421 
422  if (!PQsendQuery(conn, sql))
423  pgfdw_report_error(ERROR, NULL, conn, false, sql);
424  res = pgfdw_get_result(conn, sql);
425  if (PQresultStatus(res) != PGRES_COMMAND_OK)
426  pgfdw_report_error(ERROR, res, conn, true, sql);
427  PQclear(res);
428 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1234
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:621
void PQclear(PGresult *res)
Definition: fe-exec.c:694
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:556

◆ GetConnection()

PGconn* GetConnection ( UserMapping user,
bool  will_prep_stmt 
)

Definition at line 106 of file connection.c.

References begin_remote_xact(), CacheMemoryContext, CacheRegisterSyscacheCallback(), ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, disconnect_pg_server(), elog, HASHCTL::entrysize, FOREIGNSERVEROID, GetForeignServer(), GetSysCacheHashValue1, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, HASHCTL::hcxt, ConnCacheEntry::invalidated, ConnCacheEntry::key, HASHCTL::keysize, ConnCacheEntry::mapping_hashvalue, MemSet, ObjectIdGetDatum, pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_subxact_callback(), pgfdw_xact_callback(), RegisterSubXactCallback(), RegisterXactCallback(), ConnCacheEntry::server_hashvalue, ForeignServer::serverid, UserMapping::serverid, ForeignServer::servername, UserMapping::umid, UserMapping::userid, USERMAPPINGOID, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by create_foreign_modify(), dumpBlobs(), dumpDatabase(), dumpDatabaseConfig(), dumpTableData_copy(), estimate_path_cost_size(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignScan(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

107 {
108  bool found;
109  ConnCacheEntry *entry;
111 
112  /* First time through, initialize connection cache hashtable */
113  if (ConnectionHash == NULL)
114  {
115  HASHCTL ctl;
116 
117  MemSet(&ctl, 0, sizeof(ctl));
118  ctl.keysize = sizeof(ConnCacheKey);
119  ctl.entrysize = sizeof(ConnCacheEntry);
120  /* allocate ConnectionHash in the cache context */
121  ctl.hcxt = CacheMemoryContext;
122  ConnectionHash = hash_create("postgres_fdw connections", 8,
123  &ctl,
125 
126  /*
127  * Register some callback functions that manage connection cleanup.
128  * This should be done just once in each backend.
129  */
136  }
137 
138  /* Set flag that we did GetConnection during the current transaction */
139  xact_got_connection = true;
140 
141  /* Create hash key for the entry. Assume no pad bytes in key struct */
142  key = user->umid;
143 
144  /*
145  * Find or create cached entry for requested connection.
146  */
147  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
148  if (!found)
149  {
150  /*
151  * We need only clear "conn" here; remaining fields will be filled
152  * later when "conn" is set.
153  */
154  entry->conn = NULL;
155  }
156 
157  /* Reject further use of connections which failed abort cleanup. */
159 
160  /*
161  * If the connection needs to be remade due to invalidation, disconnect as
162  * soon as we're out of all transactions.
163  */
164  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
165  {
166  elog(DEBUG3, "closing connection %p for option changes to take effect",
167  entry->conn);
168  disconnect_pg_server(entry);
169  }
170 
171  /*
172  * We don't check the health of cached connection here, because it would
173  * require some overhead. Broken connection will be detected when the
174  * connection is actually used.
175  */
176 
177  /*
178  * If cache entry doesn't have a connection, we have to establish a new
179  * connection. (If connect_pg_server throws an error, the cache entry
180  * will remain in a valid empty state, ie conn == NULL.)
181  */
182  if (entry->conn == NULL)
183  {
184  ForeignServer *server = GetForeignServer(user->serverid);
185 
186  /* Reset all transient state fields, to be sure all are clean */
187  entry->xact_depth = 0;
188  entry->have_prep_stmt = false;
189  entry->have_error = false;
190  entry->changing_xact_state = false;
191  entry->invalidated = false;
192  entry->server_hashvalue =
194  ObjectIdGetDatum(server->serverid));
195  entry->mapping_hashvalue =
197  ObjectIdGetDatum(user->umid));
198 
199  /* Now try to make the connection */
200  entry->conn = connect_pg_server(server, user);
201 
202  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
203  entry->conn, server->servername, user->umid, user->userid);
204  }
205 
206  /*
207  * Start a new transaction or subtransaction if needed.
208  */
209  begin_remote_xact(entry);
210 
211  /* Remember if caller will prepare statements */
212  entry->have_prep_stmt |= will_prep_stmt;
213 
214  return entry->conn;
215 }
Oid umid
Definition: foreign.h:47
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define DEBUG3
Definition: elog.h:23
struct ConnCacheEntry ConnCacheEntry
Size entrysize
Definition: hsearch.h:73
#define MemSet(start, val, len)
Definition: c.h:971
bool have_prep_stmt
Definition: connection.c:54
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:201
uint32 server_hashvalue
Definition: connection.c:58
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
Oid userid
Definition: foreign.h:48
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
bool changing_xact_state
Definition: connection.c:56
bool invalidated
Definition: connection.c:57
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:973
#define HASH_BLOBS
Definition: hsearch.h:88
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:672
static HTAB * ConnectionHash
Definition: connection.c:65
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
uintptr_t Datum
Definition: postgres.h:367
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3534
Size keysize
Definition: hsearch.h:72
uint32 mapping_hashvalue
Definition: connection.c:59
PGconn * conn
Definition: connection.c:50
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:309
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3479
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1009
static bool xact_got_connection
Definition: connection.c:72
Oid serverid
Definition: foreign.h:49
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:228
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:859
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:441
Oid ConnCacheKey
Definition: connection.c:45
Oid serverid
Definition: foreign.h:36
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:221

◆ GetCursorNumber()

unsigned int GetCursorNumber ( PGconn conn)

Definition at line 505 of file connection.c.

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

506 {
507  return ++cursor_number;
508 }
static unsigned int cursor_number
Definition: connection.c:68

◆ GetPrepStmtNumber()

unsigned int GetPrepStmtNumber ( PGconn conn)

Definition at line 519 of file connection.c.

References prep_stmt_number.

Referenced by prepare_foreign_modify().

520 {
521  return ++prep_stmt_number;
522 }
static unsigned int prep_stmt_number
Definition: connection.c:69

◆ pgfdw_cancel_query()

static bool pgfdw_cancel_query ( PGconn conn)
static

Definition at line 1043 of file connection.c.

References ereport, errcode(), errmsg(), GetCurrentTimestamp(), pgfdw_get_cleanup_result(), PQcancel(), PQclear(), PQfreeCancel(), PQgetCancel(), TimestampTzPlusMilliseconds, and WARNING.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

1044 {
1045  PGcancel *cancel;
1046  char errbuf[256];
1047  PGresult *result = NULL;
1048  TimestampTz endtime;
1049 
1050  /*
1051  * If it takes too long to cancel the query and discard the result, assume
1052  * the connection is dead.
1053  */
1055 
1056  /*
1057  * Issue cancel request. Unfortunately, there's no good way to limit the
1058  * amount of time that we might block inside PQgetCancel().
1059  */
1060  if ((cancel = PQgetCancel(conn)))
1061  {
1062  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1063  {
1064  ereport(WARNING,
1065  (errcode(ERRCODE_CONNECTION_FAILURE),
1066  errmsg("could not send cancel request: %s",
1067  errbuf)));
1068  PQfreeCancel(cancel);
1069  return false;
1070  }
1071  PQfreeCancel(cancel);
1072  }
1073 
1074  /* Get and discard the result of the query. */
1075  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1076  return false;
1077  PQclear(result);
1078 
1079  return true;
1080 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1144
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4326
int errcode(int sqlerrcode)
Definition: elog.c:608
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4303
#define ereport(elevel, rest)
Definition: elog.h:141
#define WARNING
Definition: elog.h:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void PQclear(PGresult *res)
Definition: fe-exec.c:694
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4458
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ pgfdw_exec_cleanup_query()

static bool pgfdw_exec_cleanup_query ( PGconn conn,
const char *  query,
bool  ignore_errors 
)
static

Definition at line 1090 of file connection.c.

References GetCurrentTimestamp(), pgfdw_get_cleanup_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), PQsendQuery(), TimestampTzPlusMilliseconds, and WARNING.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

1091 {
1092  PGresult *result = NULL;
1093  TimestampTz endtime;
1094 
1095  /*
1096  * If it takes too long to execute a cleanup query, assume the connection
1097  * is dead. It's fairly likely that this is why we aborted in the first
1098  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1099  * be too long.
1100  */
1102 
1103  /*
1104  * Submit a query. Since we don't use non-blocking mode, this also can
1105  * block. But its risk is relatively small, so we ignore that for now.
1106  */
1107  if (!PQsendQuery(conn, query))
1108  {
1109  pgfdw_report_error(WARNING, NULL, conn, false, query);
1110  return false;
1111  }
1112 
1113  /* Get the result of the query. */
1114  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1115  return false;
1116 
1117  /* Issue a warning if not successful. */
1118  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1119  {
1120  pgfdw_report_error(WARNING, result, conn, true, query);
1121  return ignore_errors;
1122  }
1123  PQclear(result);
1124 
1125  return true;
1126 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1144
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1234
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:621
#define WARNING
Definition: elog.h:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void PQclear(PGresult *res)
Definition: fe-exec.c:694

◆ pgfdw_exec_query()

PGresult* pgfdw_exec_query ( PGconn conn,
const char *  query 
)

Definition at line 532 of file connection.c.

References ERROR, pgfdw_get_result(), pgfdw_report_error(), and PQsendQuery().

Referenced by close_cursor(), fetch_more_data(), finish_foreign_modify(), get_remote_estimate(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresImportForeignSchema(), and postgresReScanForeignScan().

533 {
534  /*
535  * Submit a query. Since we don't use non-blocking mode, this also can
536  * block. But its risk is relatively small, so we ignore that for now.
537  */
538  if (!PQsendQuery(conn, query))
539  pgfdw_report_error(ERROR, NULL, conn, false, query);
540 
541  /* Wait for the result. */
542  return pgfdw_get_result(conn, query);
543 }
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1234
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:621
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:556

◆ pgfdw_get_cleanup_result()

static bool pgfdw_get_cleanup_result ( PGconn conn,
TimestampTz  endtime,
PGresult **  result 
)
static

Definition at line 1144 of file connection.c.

References CHECK_FOR_INTERRUPTS, GetCurrentTimestamp(), Min, MyLatch, now(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PG_WAIT_EXTENSION, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ResetLatch(), TimestampDifference(), USECS_PER_SEC, WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by pgfdw_cancel_query(), and pgfdw_exec_cleanup_query().

1145 {
1146  volatile bool timed_out = false;
1147  PGresult *volatile last_res = NULL;
1148 
1149  /* In what follows, do not leak any PGresults on an error. */
1150  PG_TRY();
1151  {
1152  for (;;)
1153  {
1154  PGresult *res;
1155 
1156  while (PQisBusy(conn))
1157  {
1158  int wc;
1160  long secs;
1161  int microsecs;
1162  long cur_timeout;
1163 
1164  /* If timeout has expired, give up, else get sleep time. */
1165  if (now >= endtime)
1166  {
1167  timed_out = true;
1168  goto exit;
1169  }
1170  TimestampDifference(now, endtime, &secs, &microsecs);
1171 
1172  /* To protect against clock skew, limit sleep to one minute. */
1173  cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
1174 
1175  /* Sleep until there's something to do */
1179  PQsocket(conn),
1180  cur_timeout, PG_WAIT_EXTENSION);
1182 
1184 
1185  /* Data available in socket? */
1186  if (wc & WL_SOCKET_READABLE)
1187  {
1188  if (!PQconsumeInput(conn))
1189  {
1190  /* connection trouble; treat the same as a timeout */
1191  timed_out = true;
1192  goto exit;
1193  }
1194  }
1195  }
1196 
1197  res = PQgetResult(conn);
1198  if (res == NULL)
1199  break; /* query is complete */
1200 
1201  PQclear(last_res);
1202  last_res = res;
1203  }
1204 exit: ;
1205  }
1206  PG_CATCH();
1207  {
1208  PQclear(last_res);
1209  PG_RE_THROW();
1210  }
1211  PG_END_TRY();
1212 
1213  if (timed_out)
1214  PQclear(last_res);
1215  else
1216  *result = last_res;
1217  return timed_out;
1218 }
#define WL_TIMEOUT
Definition: latch.h:127
#define USECS_PER_SEC
Definition: timestamp.h:94
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
#define Min(x, y)
Definition: c.h:920
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(Latch *latch)
Definition: latch.c:531
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:381
#define PG_WAIT_EXTENSION
Definition: pgstat.h:759
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
void PQclear(PGresult *res)
Definition: fe-exec.c:694
#define PG_CATCH()
Definition: elog.h:332
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
#define PG_RE_THROW()
Definition: elog.h:363
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1657
#define PG_TRY()
Definition: elog.h:322
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6702
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
#define PG_END_TRY()
Definition: elog.h:347
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ pgfdw_get_result()

PGresult* pgfdw_get_result ( PGconn conn,
const char *  query 
)

Definition at line 556 of file connection.c.

References CHECK_FOR_INTERRUPTS, ERROR, MyLatch, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PG_WAIT_EXTENSION, pgfdw_report_error(), PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ResetLatch(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by create_cursor(), do_sql_command(), execute_dml_stmt(), execute_foreign_modify(), pgfdw_exec_query(), and prepare_foreign_modify().

557 {
558  PGresult *volatile last_res = NULL;
559 
560  /* In what follows, do not leak any PGresults on an error. */
561  PG_TRY();
562  {
563  for (;;)
564  {
565  PGresult *res;
566 
567  while (PQisBusy(conn))
568  {
569  int wc;
570 
571  /* Sleep until there's something to do */
575  PQsocket(conn),
576  -1L, PG_WAIT_EXTENSION);
578 
580 
581  /* Data available in socket? */
582  if (wc & WL_SOCKET_READABLE)
583  {
584  if (!PQconsumeInput(conn))
585  pgfdw_report_error(ERROR, NULL, conn, false, query);
586  }
587  }
588 
589  res = PQgetResult(conn);
590  if (res == NULL)
591  break; /* query is complete */
592 
593  PQclear(last_res);
594  last_res = res;
595  }
596  }
597  PG_CATCH();
598  {
599  PQclear(last_res);
600  PG_RE_THROW();
601  }
602  PG_END_TRY();
603 
604  return last_res;
605 }
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(Latch *latch)
Definition: latch.c:531
#define ERROR
Definition: elog.h:43
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:381
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:621
#define PG_WAIT_EXTENSION
Definition: pgstat.h:759
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
void PQclear(PGresult *res)
Definition: fe-exec.c:694
#define PG_CATCH()
Definition: elog.h:332
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
#define PG_RE_THROW()
Definition: elog.h:363
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define PG_TRY()
Definition: elog.h:322
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6702
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
#define PG_END_TRY()
Definition: elog.h:347
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ pgfdw_inval_callback()

static void pgfdw_inval_callback ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 973 of file connection.c.

References Assert, ConnCacheEntry::conn, FOREIGNSERVEROID, hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, ConnCacheEntry::mapping_hashvalue, ConnCacheEntry::server_hashvalue, and USERMAPPINGOID.

Referenced by GetConnection().

974 {
975  HASH_SEQ_STATUS scan;
976  ConnCacheEntry *entry;
977 
978  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
979 
980  /* ConnectionHash must exist already, if we're registered */
982  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
983  {
984  /* Ignore invalid entries */
985  if (entry->conn == NULL)
986  continue;
987 
988  /* hashvalue == 0 means a cache reset, must clear all state */
989  if (hashvalue == 0 ||
990  (cacheid == FOREIGNSERVEROID &&
991  entry->server_hashvalue == hashvalue) ||
992  (cacheid == USERMAPPINGOID &&
993  entry->mapping_hashvalue == hashvalue))
994  entry->invalidated = true;
995  }
996 }
uint32 server_hashvalue
Definition: connection.c:58
bool invalidated
Definition: connection.c:57
static HTAB * ConnectionHash
Definition: connection.c:65
uint32 mapping_hashvalue
Definition: connection.c:59
PGconn * conn
Definition: connection.c:50
#define Assert(condition)
Definition: c.h:738
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379

◆ pgfdw_reject_incomplete_xact_state_change()

static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry entry)
static

Definition at line 1009 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, disconnect_pg_server(), elog, ereport, errcode(), errmsg(), ERROR, GetForeignServer(), GETSTRUCT, HeapTupleIsValid, ConnCacheEntry::key, ObjectIdGetDatum, ReleaseSysCache(), SearchSysCache1(), ForeignServer::servername, and USERMAPPINGOID.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

1010 {
1011  HeapTuple tup;
1012  Form_pg_user_mapping umform;
1013  ForeignServer *server;
1014 
1015  /* nothing to do for inactive entries and entries of sane state */
1016  if (entry->conn == NULL || !entry->changing_xact_state)
1017  return;
1018 
1019  /* make sure this entry is inactive */
1020  disconnect_pg_server(entry);
1021 
1022  /* find server name to be shown in the message below */
1024  ObjectIdGetDatum(entry->key));
1025  if (!HeapTupleIsValid(tup))
1026  elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
1027  umform = (Form_pg_user_mapping) GETSTRUCT(tup);
1028  server = GetForeignServer(umform->umserver);
1029  ReleaseSysCache(tup);
1030 
1031  ereport(ERROR,
1032  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1033  errmsg("connection to server \"%s\" was lost",
1034  server->servername)));
1035 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
ConnCacheKey key
Definition: connection.c:49
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
bool changing_xact_state
Definition: connection.c:56
FormData_pg_user_mapping * Form_pg_user_mapping
#define ereport(elevel, rest)
Definition: elog.h:141
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
PGconn * conn
Definition: connection.c:50
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:309
int errmsg(const char *fmt,...)
Definition: elog.c:822
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:228

◆ pgfdw_report_error()

void pgfdw_report_error ( int  elevel,
PGresult res,
PGconn conn,
bool  clear,
const char *  sql 
)

Definition at line 621 of file connection.c.

References ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, pchomp(), PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_FINALLY, PG_TRY, PQclear(), PQerrorMessage(), and PQresultErrorField().

Referenced by close_cursor(), create_cursor(), do_sql_command(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), finish_foreign_modify(), get_remote_estimate(), pgfdw_exec_cleanup_query(), pgfdw_exec_query(), pgfdw_get_result(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresImportForeignSchema(), postgresReScanForeignScan(), and prepare_foreign_modify().

623 {
624  /* If requested, PGresult must be released before leaving this function. */
625  PG_TRY();
626  {
627  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
628  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
629  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
630  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
631  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
632  int sqlstate;
633 
634  if (diag_sqlstate)
635  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
636  diag_sqlstate[1],
637  diag_sqlstate[2],
638  diag_sqlstate[3],
639  diag_sqlstate[4]);
640  else
641  sqlstate = ERRCODE_CONNECTION_FAILURE;
642 
643  /*
644  * If we don't get a message from the PGresult, try the PGconn. This
645  * is needed because for connection-level failures, PQexec may just
646  * return NULL, not a PGresult at all.
647  */
648  if (message_primary == NULL)
649  message_primary = pchomp(PQerrorMessage(conn));
650 
651  ereport(elevel,
652  (errcode(sqlstate),
653  message_primary ? errmsg_internal("%s", message_primary) :
654  errmsg("could not obtain message string for remote error"),
655  message_detail ? errdetail_internal("%s", message_detail) : 0,
656  message_hint ? errhint("%s", message_hint) : 0,
657  message_context ? errcontext("%s", message_context) : 0,
658  sql ? errcontext("remote SQL command: %s", sql) : 0));
659  }
660  PG_FINALLY();
661  {
662  if (clear)
663  PQclear(res);
664  }
665  PG_END_TRY();
666 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6684
int errhint(const char *fmt,...)
Definition: elog.c:1069
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:62
int errcode(int sqlerrcode)
Definition: elog.c:608
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
int errdetail_internal(const char *fmt,...)
Definition: elog.c:982
char * pchomp(const char *in)
Definition: mcxt.c:1214
#define ereport(elevel, rest)
Definition: elog.h:141
static int elevel
Definition: vacuumlazy.c:297
#define PG_FINALLY()
Definition: elog.h:339
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
void PQclear(PGresult *res)
Definition: fe-exec.c:694
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2754
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define errcontext
Definition: elog.h:183
#define PG_TRY()
Definition: elog.h:322
#define PG_END_TRY()
Definition: elog.h:347
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64

◆ pgfdw_subxact_callback()

static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
)
static

Definition at line 859 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, do_sql_command(), elog, ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, in_error_recursion_trouble(), pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), pgfdw_reject_incomplete_xact_state_change(), PQTRANS_ACTIVE, PQtransactionStatus(), snprintf, SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

861 {
862  HASH_SEQ_STATUS scan;
863  ConnCacheEntry *entry;
864  int curlevel;
865 
866  /* Nothing to do at subxact start, nor after commit. */
867  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
868  event == SUBXACT_EVENT_ABORT_SUB))
869  return;
870 
871  /* Quick exit if no connections were touched in this transaction. */
872  if (!xact_got_connection)
873  return;
874 
875  /*
876  * Scan all connection cache entries to find open remote subtransactions
877  * of the current level, and close them.
878  */
879  curlevel = GetCurrentTransactionNestLevel();
881  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
882  {
883  char sql[100];
884 
885  /*
886  * We only care about connections with open remote subtransactions of
887  * the current level.
888  */
889  if (entry->conn == NULL || entry->xact_depth < curlevel)
890  continue;
891 
892  if (entry->xact_depth > curlevel)
893  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
894  entry->xact_depth);
895 
896  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
897  {
898  /*
899  * If abort cleanup previously failed for this connection, we
900  * can't issue any more commands against it.
901  */
903 
904  /* Commit all remote subtransactions during pre-commit */
905  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
906  entry->changing_xact_state = true;
907  do_sql_command(entry->conn, sql);
908  entry->changing_xact_state = false;
909  }
910  else if (in_error_recursion_trouble())
911  {
912  /*
913  * Don't try to clean up the connection if we're already in error
914  * recursion trouble.
915  */
916  entry->changing_xact_state = true;
917  }
918  else if (!entry->changing_xact_state)
919  {
920  bool abort_cleanup_failure = false;
921 
922  /* Remember that abort cleanup is in progress. */
923  entry->changing_xact_state = true;
924 
925  /* Assume we might have lost track of prepared statements */
926  entry->have_error = true;
927 
928  /*
929  * If a command has been submitted to the remote server by using
930  * an asynchronous execution function, the command might not have
931  * yet completed. Check to see if a command is still being
932  * processed by the remote server, and if so, request cancellation
933  * of the command.
934  */
935  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
936  !pgfdw_cancel_query(entry->conn))
937  abort_cleanup_failure = true;
938  else
939  {
940  /* Rollback all remote subtransactions during abort */
941  snprintf(sql, sizeof(sql),
942  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
943  curlevel, curlevel);
944  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
945  abort_cleanup_failure = true;
946  }
947 
948  /* Disarm changing_xact_state if it all worked. */
949  entry->changing_xact_state = abort_cleanup_failure;
950  }
951 
952  /* OK, we're outta that level of subtransaction */
953  entry->xact_depth--;
954  }
955 }
#define ERROR
Definition: elog.h:43
bool changing_xact_state
Definition: connection.c:56
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:418
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6639
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1090
static HTAB * ConnectionHash
Definition: connection.c:65
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1043
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
bool in_error_recursion_trouble(void)
Definition: elog.c:197
PGconn * conn
Definition: connection.c:50
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1009
static bool xact_got_connection
Definition: connection.c:72
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
#define elog(elevel,...)
Definition: elog.h:228
#define snprintf
Definition: port.h:192

◆ pgfdw_xact_callback()

static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
)
static

Definition at line 672 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_OK, cursor_number, DEBUG3, disconnect_pg_server(), do_sql_command(), elog, ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, in_error_recursion_trouble(), pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), pgfdw_reject_incomplete_xact_state_change(), PQclear(), PQexec(), PQstatus(), PQTRANS_ACTIVE, PQTRANS_IDLE, PQtransactionStatus(), ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

673 {
674  HASH_SEQ_STATUS scan;
675  ConnCacheEntry *entry;
676 
677  /* Quick exit if no connections were touched in this transaction. */
678  if (!xact_got_connection)
679  return;
680 
681  /*
682  * Scan all connection cache entries to find open remote transactions, and
683  * close them.
684  */
686  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
687  {
688  PGresult *res;
689 
690  /* Ignore cache entry if no open connection right now */
691  if (entry->conn == NULL)
692  continue;
693 
694  /* If it has an open remote transaction, try to close it */
695  if (entry->xact_depth > 0)
696  {
697  bool abort_cleanup_failure = false;
698 
699  elog(DEBUG3, "closing remote transaction on connection %p",
700  entry->conn);
701 
702  switch (event)
703  {
706 
707  /*
708  * If abort cleanup previously failed for this connection,
709  * we can't issue any more commands against it.
710  */
712 
713  /* Commit all remote transactions during pre-commit */
714  entry->changing_xact_state = true;
715  do_sql_command(entry->conn, "COMMIT TRANSACTION");
716  entry->changing_xact_state = false;
717 
718  /*
719  * If there were any errors in subtransactions, and we
720  * made prepared statements, do a DEALLOCATE ALL to make
721  * sure we get rid of all prepared statements. This is
722  * annoying and not terribly bulletproof, but it's
723  * probably not worth trying harder.
724  *
725  * DEALLOCATE ALL only exists in 8.3 and later, so this
726  * constrains how old a server postgres_fdw can
727  * communicate with. We intentionally ignore errors in
728  * the DEALLOCATE, so that we can hobble along to some
729  * extent with older servers (leaking prepared statements
730  * as we go; but we don't really support update operations
731  * pre-8.3 anyway).
732  */
733  if (entry->have_prep_stmt && entry->have_error)
734  {
735  res = PQexec(entry->conn, "DEALLOCATE ALL");
736  PQclear(res);
737  }
738  entry->have_prep_stmt = false;
739  entry->have_error = false;
740  break;
742 
743  /*
744  * We disallow any remote transactions, since it's not
745  * very reasonable to hold them open until the prepared
746  * transaction is committed. For the moment, throw error
747  * unconditionally; later we might allow read-only cases.
748  * Note that the error will cause us to come right back
749  * here with event == XACT_EVENT_ABORT, so we'll clean up
750  * the connection state at that point.
751  */
752  ereport(ERROR,
753  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
754  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
755  break;
757  case XACT_EVENT_COMMIT:
758  case XACT_EVENT_PREPARE:
759  /* Pre-commit should have closed the open transaction */
760  elog(ERROR, "missed cleaning up connection during pre-commit");
761  break;
763  case XACT_EVENT_ABORT:
764 
765  /*
766  * Don't try to clean up the connection if we're already
767  * in error recursion trouble.
768  */
770  entry->changing_xact_state = true;
771 
772  /*
773  * If connection is already unsalvageable, don't touch it
774  * further.
775  */
776  if (entry->changing_xact_state)
777  break;
778 
779  /*
780  * Mark this connection as in the process of changing
781  * transaction state.
782  */
783  entry->changing_xact_state = true;
784 
785  /* Assume we might have lost track of prepared statements */
786  entry->have_error = true;
787 
788  /*
789  * If a command has been submitted to the remote server by
790  * using an asynchronous execution function, the command
791  * might not have yet completed. Check to see if a
792  * command is still being processed by the remote server,
793  * and if so, request cancellation of the command.
794  */
795  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
796  !pgfdw_cancel_query(entry->conn))
797  {
798  /* Unable to cancel running query. */
799  abort_cleanup_failure = true;
800  }
801  else if (!pgfdw_exec_cleanup_query(entry->conn,
802  "ABORT TRANSACTION",
803  false))
804  {
805  /* Unable to abort remote transaction. */
806  abort_cleanup_failure = true;
807  }
808  else if (entry->have_prep_stmt && entry->have_error &&
810  "DEALLOCATE ALL",
811  true))
812  {
813  /* Trouble clearing prepared statements. */
814  abort_cleanup_failure = true;
815  }
816  else
817  {
818  entry->have_prep_stmt = false;
819  entry->have_error = false;
820  }
821 
822  /* Disarm changing_xact_state if it all worked. */
823  entry->changing_xact_state = abort_cleanup_failure;
824  break;
825  }
826  }
827 
828  /* Reset state to show we're out of a transaction */
829  entry->xact_depth = 0;
830 
831  /*
832  * If the connection isn't in a good idle state, discard it to
833  * recover. Next GetConnection will open a new connection.
834  */
835  if (PQstatus(entry->conn) != CONNECTION_OK ||
837  entry->changing_xact_state)
838  {
839  elog(DEBUG3, "discarding connection %p", entry->conn);
840  disconnect_pg_server(entry);
841  }
842  }
843 
844  /*
845  * Regardless of the event type, we can now mark ourselves as out of the
846  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
847  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
848  */
849  xact_got_connection = false;
850 
851  /* Also reset cursor numbering for next transaction */
852  cursor_number = 0;
853 }
#define DEBUG3
Definition: elog.h:23
int errcode(int sqlerrcode)
Definition: elog.c:608
bool have_prep_stmt
Definition: connection.c:54
#define ERROR
Definition: elog.h:43
bool changing_xact_state
Definition: connection.c:56
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:418
static unsigned int cursor_number
Definition: connection.c:68
#define ereport(elevel, rest)
Definition: elog.h:141
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6639
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1090
static HTAB * ConnectionHash
Definition: connection.c:65
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1043
void PQclear(PGresult *res)
Definition: fe-exec.c:694
bool in_error_recursion_trouble(void)
Definition: elog.c:197
PGconn * conn
Definition: connection.c:50
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:309
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1009
static bool xact_got_connection
Definition: connection.c:72
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1939
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6631

◆ ReleaseConnection()

void ReleaseConnection ( PGconn conn)

Definition at line 484 of file connection.c.

Referenced by estimate_path_cost_size(), finish_foreign_modify(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndDirectModify(), postgresEndForeignScan(), and postgresImportForeignSchema().

485 {
486  /*
487  * Currently, we don't actually track connection references because all
488  * cleanup is managed on a transaction or subtransaction basis instead. So
489  * there's nothing to do here.
490  */
491 }

◆ UserMappingPasswordRequired()

static bool UserMappingPasswordRequired ( UserMapping user)
static

Definition at line 323 of file connection.c.

References defGetBoolean(), DefElem::defname, lfirst, and UserMapping::options.

Referenced by check_conn_params(), and connect_pg_server().

324 {
325  ListCell *cell;
326 
327  foreach(cell, user->options)
328  {
329  DefElem *def = (DefElem *) lfirst(cell);
330  if (strcmp(def->defname, "password_required") == 0)
331  return defGetBoolean(def);
332  }
333 
334  return true;
335 }
bool defGetBoolean(DefElem *def)
Definition: define.c:111
List * options
Definition: foreign.h:50
#define lfirst(lc)
Definition: pg_list.h:190
char * defname
Definition: parsenodes.h:730

Variable Documentation

◆ ConnectionHash

HTAB* ConnectionHash = NULL
static

Definition at line 65 of file connection.c.

◆ cursor_number

unsigned int cursor_number = 0
static

Definition at line 68 of file connection.c.

Referenced by GetCursorNumber(), and pgfdw_xact_callback().

◆ prep_stmt_number

unsigned int prep_stmt_number = 0
static

Definition at line 69 of file connection.c.

Referenced by GetPrepStmtNumber().

◆ xact_got_connection

bool xact_got_connection = false
static

Definition at line 72 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().