PostgreSQL Source Code  git master
connection.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_user_mapping.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postgres_fdw.h"
#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheEntry
 

Typedefs

typedef Oid ConnCacheKey
 
typedef struct ConnCacheEntry ConnCacheEntry
 

Functions

static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
 
static void disconnect_pg_server (ConnCacheEntry *entry)
 
static void check_conn_params (const char **keywords, const char **values, UserMapping *user)
 
static void configure_remote_session (PGconn *conn)
 
static void do_sql_command (PGconn *conn, const char *sql)
 
static void begin_remote_xact (ConnCacheEntry *entry)
 
static void pgfdw_xact_callback (XactEvent event, void *arg)
 
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
 
static void pgfdw_inval_callback (Datum arg, int cacheid, uint32 hashvalue)
 
static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry *entry)
 
static bool pgfdw_cancel_query (PGconn *conn)
 
static bool pgfdw_exec_cleanup_query (PGconn *conn, const char *query, bool ignore_errors)
 
static bool pgfdw_get_cleanup_result (PGconn *conn, TimestampTz endtime, PGresult **result)
 
PGconnGetConnection (UserMapping *user, bool will_prep_stmt)
 
void ReleaseConnection (PGconn *conn)
 
unsigned int GetCursorNumber (PGconn *conn)
 
unsigned int GetPrepStmtNumber (PGconn *conn)
 
PGresultpgfdw_exec_query (PGconn *conn, const char *query)
 
PGresultpgfdw_get_result (PGconn *conn, const char *query)
 
void pgfdw_report_error (int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
 

Variables

static HTABConnectionHash = NULL
 
static unsigned int cursor_number = 0
 
static unsigned int prep_stmt_number = 0
 
static bool xact_got_connection = false
 

Typedef Documentation

◆ ConnCacheEntry

◆ ConnCacheKey

typedef Oid ConnCacheKey

Definition at line 44 of file connection.c.

Function Documentation

◆ begin_remote_xact()

static void begin_remote_xact ( ConnCacheEntry entry)
static

Definition at line 415 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog, GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf, and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

416 {
417  int curlevel = GetCurrentTransactionNestLevel();
418 
419  /* Start main transaction if we haven't yet */
420  if (entry->xact_depth <= 0)
421  {
422  const char *sql;
423 
424  elog(DEBUG3, "starting remote transaction on connection %p",
425  entry->conn);
426 
428  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
429  else
430  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
431  entry->changing_xact_state = true;
432  do_sql_command(entry->conn, sql);
433  entry->xact_depth = 1;
434  entry->changing_xact_state = false;
435  }
436 
437  /*
438  * If we're in a subtransaction, stack up savepoints to match our level.
439  * This ensures we can rollback just the desired effects when a
440  * subtransaction aborts.
441  */
442  while (entry->xact_depth < curlevel)
443  {
444  char sql[64];
445 
446  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
447  entry->changing_xact_state = true;
448  do_sql_command(entry->conn, sql);
449  entry->xact_depth++;
450  entry->changing_xact_state = false;
451  }
452 }
#define DEBUG3
Definition: elog.h:23
bool changing_xact_state
Definition: connection.c:55
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:392
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
PGconn * conn
Definition: connection.c:49
#define IsolationIsSerializable()
Definition: xact.h:52
#define elog(elevel,...)
Definition: elog.h:228
#define snprintf
Definition: port.h:192

◆ check_conn_params()

static void check_conn_params ( const char **  keywords,
const char **  values,
UserMapping user 
)
static

Definition at line 323 of file connection.c.

References ereport, errcode(), errdetail(), errmsg(), ERROR, i, superuser_arg(), and UserMapping::userid.

Referenced by connect_pg_server().

324 {
325  int i;
326 
327  /* no check required if superuser */
328  if (superuser_arg(user->userid))
329  return;
330 
331  /* ok if params contain a non-empty password */
332  for (i = 0; keywords[i] != NULL; i++)
333  {
334  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
335  return;
336  }
337 
338  ereport(ERROR,
339  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
340  errmsg("password is required"),
341  errdetail("Non-superusers must provide a password in the user mapping.")));
342 }
int errcode(int sqlerrcode)
Definition: elog.c:608
Oid userid
Definition: foreign.h:48
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:955
#define ereport(elevel, rest)
Definition: elog.h:141
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
int i

◆ configure_remote_session()

static void configure_remote_session ( PGconn conn)
static

Definition at line 356 of file connection.c.

References do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

357 {
358  int remoteversion = PQserverVersion(conn);
359 
360  /* Force the search path to contain only pg_catalog (see deparse.c) */
361  do_sql_command(conn, "SET search_path = pg_catalog");
362 
363  /*
364  * Set remote timezone; this is basically just cosmetic, since all
365  * transmitted and returned timestamptzs should specify a zone explicitly
366  * anyway. However it makes the regression test outputs more predictable.
367  *
368  * We don't risk setting remote zone equal to ours, since the remote
369  * server might use a different timezone database. Instead, use UTC
370  * (quoted, because very old servers are picky about case).
371  */
372  do_sql_command(conn, "SET timezone = 'UTC'");
373 
374  /*
375  * Set values needed to ensure unambiguous data output from remote. (This
376  * logic should match what pg_dump does. See also set_transmission_modes
377  * in postgres_fdw.c.)
378  */
379  do_sql_command(conn, "SET datestyle = ISO");
380  if (remoteversion >= 80400)
381  do_sql_command(conn, "SET intervalstyle = postgres");
382  if (remoteversion >= 90000)
383  do_sql_command(conn, "SET extra_float_digits = 3");
384  else
385  do_sql_command(conn, "SET extra_float_digits = 2");
386 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6613
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:392

◆ connect_pg_server()

static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
)
static

Definition at line 220 of file connection.c.

References check_conn_params(), configure_remote_session(), ConnCacheEntry::conn, CONNECTION_OK, ereport, errcode(), errdetail(), errdetail_internal(), errhint(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), list_length(), ForeignServer::options, UserMapping::options, palloc(), pchomp(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQconnectdbParams(), PQconnectionUsedPassword(), PQerrorMessage(), PQfinish(), PQstatus(), ForeignServer::servername, superuser_arg(), UserMapping::userid, and values.

Referenced by GetConnection().

221 {
222  PGconn *volatile conn = NULL;
223 
224  /*
225  * Use PG_TRY block to ensure closing connection on error.
226  */
227  PG_TRY();
228  {
229  const char **keywords;
230  const char **values;
231  int n;
232 
233  /*
234  * Construct connection params from generic options of ForeignServer
235  * and UserMapping. (Some of them might not be libpq options, in
236  * which case we'll just waste a few array slots.) Add 3 extra slots
237  * for fallback_application_name, client_encoding, end marker.
238  */
239  n = list_length(server->options) + list_length(user->options) + 3;
240  keywords = (const char **) palloc(n * sizeof(char *));
241  values = (const char **) palloc(n * sizeof(char *));
242 
243  n = 0;
244  n += ExtractConnectionOptions(server->options,
245  keywords + n, values + n);
247  keywords + n, values + n);
248 
249  /* Use "postgres_fdw" as fallback_application_name. */
250  keywords[n] = "fallback_application_name";
251  values[n] = "postgres_fdw";
252  n++;
253 
254  /* Set client_encoding so that libpq can convert encoding properly. */
255  keywords[n] = "client_encoding";
256  values[n] = GetDatabaseEncodingName();
257  n++;
258 
259  keywords[n] = values[n] = NULL;
260 
261  /* verify connection parameters and make connection */
262  check_conn_params(keywords, values, user);
263 
264  conn = PQconnectdbParams(keywords, values, false);
265  if (!conn || PQstatus(conn) != CONNECTION_OK)
266  ereport(ERROR,
267  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
268  errmsg("could not connect to server \"%s\"",
269  server->servername),
270  errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
271 
272  /*
273  * Check that non-superuser has used password to establish connection;
274  * otherwise, he's piggybacking on the postgres server's user
275  * identity. See also dblink_security_check() in contrib/dblink.
276  */
277  if (!superuser_arg(user->userid) && !PQconnectionUsedPassword(conn))
278  ereport(ERROR,
279  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
280  errmsg("password is required"),
281  errdetail("Non-superuser cannot connect if the server does not request a password."),
282  errhint("Target server's authentication method must be changed.")));
283 
284  /* Prepare new session for use */
286 
287  pfree(keywords);
288  pfree(values);
289  }
290  PG_CATCH();
291  {
292  /* Release PGconn data structure if we managed to create one */
293  if (conn)
294  PQfinish(conn);
295  PG_RE_THROW();
296  }
297  PG_END_TRY();
298 
299  return conn;
300 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6623
static void configure_remote_session(PGconn *conn)
Definition: connection.c:356
int errhint(const char *fmt,...)
Definition: elog.c:1069
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:323
int errcode(int sqlerrcode)
Definition: elog.c:608
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4125
Oid userid
Definition: foreign.h:48
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:624
int errdetail_internal(const char *fmt,...)
Definition: elog.c:982
char * pchomp(const char *in)
Definition: mcxt.c:1214
void pfree(void *pointer)
Definition: mcxt.c:1056
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:295
#define ERROR
Definition: elog.h:43
PGconn * conn
Definition: streamutil.c:54
int errdetail(const char *fmt,...)
Definition: elog.c:955
List * options
Definition: foreign.h:50
#define ereport(elevel, rest)
Definition: elog.h:141
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
#define PG_CATCH()
Definition: elog.h:332
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1052
static int list_length(const List *l)
Definition: pg_list.h:169
#define PG_RE_THROW()
Definition: elog.h:363
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6672
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
char * servername
Definition: foreign.h:39
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6570
#define PG_TRY()
Definition: elog.h:322
List * options
Definition: foreign.h:42
#define PG_END_TRY()
Definition: elog.h:347

◆ disconnect_pg_server()

static void disconnect_pg_server ( ConnCacheEntry entry)
static

Definition at line 306 of file connection.c.

References ConnCacheEntry::conn, and PQfinish().

Referenced by GetConnection(), pgfdw_reject_incomplete_xact_state_change(), and pgfdw_xact_callback().

307 {
308  if (entry->conn != NULL)
309  {
310  PQfinish(entry->conn);
311  entry->conn = NULL;
312  }
313 }
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4125
PGconn * conn
Definition: connection.c:49

◆ do_sql_command()

static void do_sql_command ( PGconn conn,
const char *  sql 
)
static

Definition at line 392 of file connection.c.

References ERROR, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), and PQsendQuery().

Referenced by begin_remote_xact(), configure_remote_session(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

393 {
394  PGresult *res;
395 
396  if (!PQsendQuery(conn, sql))
397  pgfdw_report_error(ERROR, NULL, conn, false, sql);
398  res = pgfdw_get_result(conn, sql);
399  if (PQresultStatus(res) != PGRES_COMMAND_OK)
400  pgfdw_report_error(ERROR, res, conn, true, sql);
401  PQclear(res);
402 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1234
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:595
void PQclear(PGresult *res)
Definition: fe-exec.c:694
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:530

◆ GetConnection()

PGconn* GetConnection ( UserMapping user,
bool  will_prep_stmt 
)

Definition at line 105 of file connection.c.

References begin_remote_xact(), CacheMemoryContext, CacheRegisterSyscacheCallback(), ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, disconnect_pg_server(), elog, HASHCTL::entrysize, FOREIGNSERVEROID, GetForeignServer(), GetSysCacheHashValue1, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, HASHCTL::hcxt, ConnCacheEntry::invalidated, ConnCacheEntry::key, HASHCTL::keysize, ConnCacheEntry::mapping_hashvalue, MemSet, ObjectIdGetDatum, pgfdw_inval_callback(), pgfdw_reject_incomplete_xact_state_change(), pgfdw_subxact_callback(), pgfdw_xact_callback(), RegisterSubXactCallback(), RegisterXactCallback(), ConnCacheEntry::server_hashvalue, ForeignServer::serverid, UserMapping::serverid, ForeignServer::servername, UserMapping::umid, UserMapping::userid, USERMAPPINGOID, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by create_foreign_modify(), dumpBlobs(), dumpDatabase(), dumpDatabaseConfig(), dumpTableData_copy(), estimate_path_cost_size(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignScan(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

106 {
107  bool found;
108  ConnCacheEntry *entry;
110 
111  /* First time through, initialize connection cache hashtable */
112  if (ConnectionHash == NULL)
113  {
114  HASHCTL ctl;
115 
116  MemSet(&ctl, 0, sizeof(ctl));
117  ctl.keysize = sizeof(ConnCacheKey);
118  ctl.entrysize = sizeof(ConnCacheEntry);
119  /* allocate ConnectionHash in the cache context */
120  ctl.hcxt = CacheMemoryContext;
121  ConnectionHash = hash_create("postgres_fdw connections", 8,
122  &ctl,
124 
125  /*
126  * Register some callback functions that manage connection cleanup.
127  * This should be done just once in each backend.
128  */
135  }
136 
137  /* Set flag that we did GetConnection during the current transaction */
138  xact_got_connection = true;
139 
140  /* Create hash key for the entry. Assume no pad bytes in key struct */
141  key = user->umid;
142 
143  /*
144  * Find or create cached entry for requested connection.
145  */
146  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
147  if (!found)
148  {
149  /*
150  * We need only clear "conn" here; remaining fields will be filled
151  * later when "conn" is set.
152  */
153  entry->conn = NULL;
154  }
155 
156  /* Reject further use of connections which failed abort cleanup. */
158 
159  /*
160  * If the connection needs to be remade due to invalidation, disconnect as
161  * soon as we're out of all transactions.
162  */
163  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
164  {
165  elog(DEBUG3, "closing connection %p for option changes to take effect",
166  entry->conn);
167  disconnect_pg_server(entry);
168  }
169 
170  /*
171  * We don't check the health of cached connection here, because it would
172  * require some overhead. Broken connection will be detected when the
173  * connection is actually used.
174  */
175 
176  /*
177  * If cache entry doesn't have a connection, we have to establish a new
178  * connection. (If connect_pg_server throws an error, the cache entry
179  * will remain in a valid empty state, ie conn == NULL.)
180  */
181  if (entry->conn == NULL)
182  {
183  ForeignServer *server = GetForeignServer(user->serverid);
184 
185  /* Reset all transient state fields, to be sure all are clean */
186  entry->xact_depth = 0;
187  entry->have_prep_stmt = false;
188  entry->have_error = false;
189  entry->changing_xact_state = false;
190  entry->invalidated = false;
191  entry->server_hashvalue =
193  ObjectIdGetDatum(server->serverid));
194  entry->mapping_hashvalue =
196  ObjectIdGetDatum(user->umid));
197 
198  /* Now try to make the connection */
199  entry->conn = connect_pg_server(server, user);
200 
201  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
202  entry->conn, server->servername, user->umid, user->userid);
203  }
204 
205  /*
206  * Start a new transaction or subtransaction if needed.
207  */
208  begin_remote_xact(entry);
209 
210  /* Remember if caller will prepare statements */
211  entry->have_prep_stmt |= will_prep_stmt;
212 
213  return entry->conn;
214 }
Oid umid
Definition: foreign.h:47
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define DEBUG3
Definition: elog.h:23
struct ConnCacheEntry ConnCacheEntry
Size entrysize
Definition: hsearch.h:73
#define MemSet(start, val, len)
Definition: c.h:962
bool have_prep_stmt
Definition: connection.c:53
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:201
uint32 server_hashvalue
Definition: connection.c:57
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
Oid userid
Definition: foreign.h:48
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
bool changing_xact_state
Definition: connection.c:55
bool invalidated
Definition: connection.c:56
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:947
#define HASH_BLOBS
Definition: hsearch.h:88
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:646
static HTAB * ConnectionHash
Definition: connection.c:64
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
uintptr_t Datum
Definition: postgres.h:367
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3534
Size keysize
Definition: hsearch.h:72
uint32 mapping_hashvalue
Definition: connection.c:58
PGconn * conn
Definition: connection.c:49
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:306
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3479
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:983
static bool xact_got_connection
Definition: connection.c:71
Oid serverid
Definition: foreign.h:49
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:228
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:833
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:415
Oid ConnCacheKey
Definition: connection.c:44
Oid serverid
Definition: foreign.h:36
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:220

◆ GetCursorNumber()

unsigned int GetCursorNumber ( PGconn conn)

Definition at line 479 of file connection.c.

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

480 {
481  return ++cursor_number;
482 }
static unsigned int cursor_number
Definition: connection.c:67

◆ GetPrepStmtNumber()

unsigned int GetPrepStmtNumber ( PGconn conn)

Definition at line 493 of file connection.c.

References prep_stmt_number.

Referenced by prepare_foreign_modify().

494 {
495  return ++prep_stmt_number;
496 }
static unsigned int prep_stmt_number
Definition: connection.c:68

◆ pgfdw_cancel_query()

static bool pgfdw_cancel_query ( PGconn conn)
static

Definition at line 1017 of file connection.c.

References ereport, errcode(), errmsg(), GetCurrentTimestamp(), pgfdw_get_cleanup_result(), PQcancel(), PQclear(), PQfreeCancel(), PQgetCancel(), TimestampTzPlusMilliseconds, and WARNING.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

1018 {
1019  PGcancel *cancel;
1020  char errbuf[256];
1021  PGresult *result = NULL;
1022  TimestampTz endtime;
1023 
1024  /*
1025  * If it takes too long to cancel the query and discard the result, assume
1026  * the connection is dead.
1027  */
1029 
1030  /*
1031  * Issue cancel request. Unfortunately, there's no good way to limit the
1032  * amount of time that we might block inside PQgetCancel().
1033  */
1034  if ((cancel = PQgetCancel(conn)))
1035  {
1036  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1037  {
1038  ereport(WARNING,
1039  (errcode(ERRCODE_CONNECTION_FAILURE),
1040  errmsg("could not send cancel request: %s",
1041  errbuf)));
1042  PQfreeCancel(cancel);
1043  return false;
1044  }
1045  PQfreeCancel(cancel);
1046  }
1047 
1048  /* Get and discard the result of the query. */
1049  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1050  return false;
1051  PQclear(result);
1052 
1053  return true;
1054 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1118
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4269
int errcode(int sqlerrcode)
Definition: elog.c:608
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4246
#define ereport(elevel, rest)
Definition: elog.h:141
#define WARNING
Definition: elog.h:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void PQclear(PGresult *res)
Definition: fe-exec.c:694
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4401
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ pgfdw_exec_cleanup_query()

static bool pgfdw_exec_cleanup_query ( PGconn conn,
const char *  query,
bool  ignore_errors 
)
static

Definition at line 1064 of file connection.c.

References GetCurrentTimestamp(), pgfdw_get_cleanup_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), PQsendQuery(), TimestampTzPlusMilliseconds, and WARNING.

Referenced by pgfdw_subxact_callback(), and pgfdw_xact_callback().

1065 {
1066  PGresult *result = NULL;
1067  TimestampTz endtime;
1068 
1069  /*
1070  * If it takes too long to execute a cleanup query, assume the connection
1071  * is dead. It's fairly likely that this is why we aborted in the first
1072  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1073  * be too long.
1074  */
1076 
1077  /*
1078  * Submit a query. Since we don't use non-blocking mode, this also can
1079  * block. But its risk is relatively small, so we ignore that for now.
1080  */
1081  if (!PQsendQuery(conn, query))
1082  {
1083  pgfdw_report_error(WARNING, NULL, conn, false, query);
1084  return false;
1085  }
1086 
1087  /* Get the result of the query. */
1088  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1089  return false;
1090 
1091  /* Issue a warning if not successful. */
1092  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1093  {
1094  pgfdw_report_error(WARNING, result, conn, true, query);
1095  return ignore_errors;
1096  }
1097  PQclear(result);
1098 
1099  return true;
1100 }
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1118
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1234
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:595
#define WARNING
Definition: elog.h:40
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void PQclear(PGresult *res)
Definition: fe-exec.c:694

◆ pgfdw_exec_query()

PGresult* pgfdw_exec_query ( PGconn conn,
const char *  query 
)

Definition at line 506 of file connection.c.

References ERROR, pgfdw_get_result(), pgfdw_report_error(), and PQsendQuery().

Referenced by close_cursor(), fetch_more_data(), finish_foreign_modify(), get_remote_estimate(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresImportForeignSchema(), and postgresReScanForeignScan().

507 {
508  /*
509  * Submit a query. Since we don't use non-blocking mode, this also can
510  * block. But its risk is relatively small, so we ignore that for now.
511  */
512  if (!PQsendQuery(conn, query))
513  pgfdw_report_error(ERROR, NULL, conn, false, query);
514 
515  /* Wait for the result. */
516  return pgfdw_get_result(conn, query);
517 }
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1234
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:595
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:530

◆ pgfdw_get_cleanup_result()

static bool pgfdw_get_cleanup_result ( PGconn conn,
TimestampTz  endtime,
PGresult **  result 
)
static

Definition at line 1118 of file connection.c.

References CHECK_FOR_INTERRUPTS, GetCurrentTimestamp(), Min, MyLatch, now(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PG_WAIT_EXTENSION, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ResetLatch(), TimestampDifference(), USECS_PER_SEC, WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by pgfdw_cancel_query(), and pgfdw_exec_cleanup_query().

1119 {
1120  volatile bool timed_out = false;
1121  PGresult *volatile last_res = NULL;
1122 
1123  /* In what follows, do not leak any PGresults on an error. */
1124  PG_TRY();
1125  {
1126  for (;;)
1127  {
1128  PGresult *res;
1129 
1130  while (PQisBusy(conn))
1131  {
1132  int wc;
1134  long secs;
1135  int microsecs;
1136  long cur_timeout;
1137 
1138  /* If timeout has expired, give up, else get sleep time. */
1139  if (now >= endtime)
1140  {
1141  timed_out = true;
1142  goto exit;
1143  }
1144  TimestampDifference(now, endtime, &secs, &microsecs);
1145 
1146  /* To protect against clock skew, limit sleep to one minute. */
1147  cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
1148 
1149  /* Sleep until there's something to do */
1153  PQsocket(conn),
1154  cur_timeout, PG_WAIT_EXTENSION);
1156 
1158 
1159  /* Data available in socket? */
1160  if (wc & WL_SOCKET_READABLE)
1161  {
1162  if (!PQconsumeInput(conn))
1163  {
1164  /* connection trouble; treat the same as a timeout */
1165  timed_out = true;
1166  goto exit;
1167  }
1168  }
1169  }
1170 
1171  res = PQgetResult(conn);
1172  if (res == NULL)
1173  break; /* query is complete */
1174 
1175  PQclear(last_res);
1176  last_res = res;
1177  }
1178 exit: ;
1179  }
1180  PG_CATCH();
1181  {
1182  PQclear(last_res);
1183  PG_RE_THROW();
1184  }
1185  PG_END_TRY();
1186 
1187  if (timed_out)
1188  PQclear(last_res);
1189  else
1190  *result = last_res;
1191  return timed_out;
1192 }
#define WL_TIMEOUT
Definition: latch.h:127
#define USECS_PER_SEC
Definition: timestamp.h:94
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
#define Min(x, y)
Definition: c.h:911
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(Latch *latch)
Definition: latch.c:519
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
#define PG_WAIT_EXTENSION
Definition: pgstat.h:759
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
void PQclear(PGresult *res)
Definition: fe-exec.c:694
#define PG_CATCH()
Definition: elog.h:332
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
#define PG_RE_THROW()
Definition: elog.h:363
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1657
#define PG_TRY()
Definition: elog.h:322
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6641
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
#define PG_END_TRY()
Definition: elog.h:347
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ pgfdw_get_result()

PGresult* pgfdw_get_result ( PGconn conn,
const char *  query 
)

Definition at line 530 of file connection.c.

References CHECK_FOR_INTERRUPTS, ERROR, MyLatch, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PG_WAIT_EXTENSION, pgfdw_report_error(), PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ResetLatch(), WaitLatchOrSocket(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by create_cursor(), do_sql_command(), execute_dml_stmt(), execute_foreign_modify(), pgfdw_exec_query(), and prepare_foreign_modify().

531 {
532  PGresult *volatile last_res = NULL;
533 
534  /* In what follows, do not leak any PGresults on an error. */
535  PG_TRY();
536  {
537  for (;;)
538  {
539  PGresult *res;
540 
541  while (PQisBusy(conn))
542  {
543  int wc;
544 
545  /* Sleep until there's something to do */
549  PQsocket(conn),
550  -1L, PG_WAIT_EXTENSION);
552 
554 
555  /* Data available in socket? */
556  if (wc & WL_SOCKET_READABLE)
557  {
558  if (!PQconsumeInput(conn))
559  pgfdw_report_error(ERROR, NULL, conn, false, query);
560  }
561  }
562 
563  res = PQgetResult(conn);
564  if (res == NULL)
565  break; /* query is complete */
566 
567  PQclear(last_res);
568  last_res = res;
569  }
570  }
571  PG_CATCH();
572  {
573  PQclear(last_res);
574  PG_RE_THROW();
575  }
576  PG_END_TRY();
577 
578  return last_res;
579 }
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(Latch *latch)
Definition: latch.c:519
#define ERROR
Definition: elog.h:43
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:595
#define PG_WAIT_EXTENSION
Definition: pgstat.h:759
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
void PQclear(PGresult *res)
Definition: fe-exec.c:694
#define PG_CATCH()
Definition: elog.h:332
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
#define PG_RE_THROW()
Definition: elog.h:363
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define PG_TRY()
Definition: elog.h:322
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6641
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
#define PG_END_TRY()
Definition: elog.h:347
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ pgfdw_inval_callback()

static void pgfdw_inval_callback ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 947 of file connection.c.

References Assert, ConnCacheEntry::conn, FOREIGNSERVEROID, hash_seq_init(), hash_seq_search(), ConnCacheEntry::invalidated, ConnCacheEntry::mapping_hashvalue, ConnCacheEntry::server_hashvalue, and USERMAPPINGOID.

Referenced by GetConnection().

948 {
949  HASH_SEQ_STATUS scan;
950  ConnCacheEntry *entry;
951 
952  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
953 
954  /* ConnectionHash must exist already, if we're registered */
956  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
957  {
958  /* Ignore invalid entries */
959  if (entry->conn == NULL)
960  continue;
961 
962  /* hashvalue == 0 means a cache reset, must clear all state */
963  if (hashvalue == 0 ||
964  (cacheid == FOREIGNSERVEROID &&
965  entry->server_hashvalue == hashvalue) ||
966  (cacheid == USERMAPPINGOID &&
967  entry->mapping_hashvalue == hashvalue))
968  entry->invalidated = true;
969  }
970 }
uint32 server_hashvalue
Definition: connection.c:57
bool invalidated
Definition: connection.c:56
static HTAB * ConnectionHash
Definition: connection.c:64
uint32 mapping_hashvalue
Definition: connection.c:58
PGconn * conn
Definition: connection.c:49
#define Assert(condition)
Definition: c.h:739
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379

◆ pgfdw_reject_incomplete_xact_state_change()

static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry entry)
static

Definition at line 983 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, disconnect_pg_server(), elog, ereport, errcode(), errmsg(), ERROR, GetForeignServer(), GETSTRUCT, HeapTupleIsValid, ConnCacheEntry::key, ObjectIdGetDatum, ReleaseSysCache(), SearchSysCache1(), ForeignServer::servername, and USERMAPPINGOID.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

984 {
985  HeapTuple tup;
986  Form_pg_user_mapping umform;
987  ForeignServer *server;
988 
989  /* nothing to do for inactive entries and entries of sane state */
990  if (entry->conn == NULL || !entry->changing_xact_state)
991  return;
992 
993  /* make sure this entry is inactive */
994  disconnect_pg_server(entry);
995 
996  /* find server name to be shown in the message below */
998  ObjectIdGetDatum(entry->key));
999  if (!HeapTupleIsValid(tup))
1000  elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
1001  umform = (Form_pg_user_mapping) GETSTRUCT(tup);
1002  server = GetForeignServer(umform->umserver);
1003  ReleaseSysCache(tup);
1004 
1005  ereport(ERROR,
1006  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1007  errmsg("connection to server \"%s\" was lost",
1008  server->servername)));
1009 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
ConnCacheKey key
Definition: connection.c:48
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
bool changing_xact_state
Definition: connection.c:55
FormData_pg_user_mapping * Form_pg_user_mapping
#define ereport(elevel, rest)
Definition: elog.h:141
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
PGconn * conn
Definition: connection.c:49
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:306
int errmsg(const char *fmt,...)
Definition: elog.c:822
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:228

◆ pgfdw_report_error()

void pgfdw_report_error ( int  elevel,
PGresult res,
PGconn conn,
bool  clear,
const char *  sql 
)

Definition at line 595 of file connection.c.

References ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, pchomp(), PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_FINALLY, PG_TRY, PQclear(), PQerrorMessage(), and PQresultErrorField().

Referenced by close_cursor(), create_cursor(), do_sql_command(), execute_dml_stmt(), execute_foreign_modify(), fetch_more_data(), finish_foreign_modify(), get_remote_estimate(), pgfdw_exec_cleanup_query(), pgfdw_exec_query(), pgfdw_get_result(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresImportForeignSchema(), postgresReScanForeignScan(), and prepare_foreign_modify().

597 {
598  /* If requested, PGresult must be released before leaving this function. */
599  PG_TRY();
600  {
601  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
602  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
603  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
604  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
605  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
606  int sqlstate;
607 
608  if (diag_sqlstate)
609  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
610  diag_sqlstate[1],
611  diag_sqlstate[2],
612  diag_sqlstate[3],
613  diag_sqlstate[4]);
614  else
615  sqlstate = ERRCODE_CONNECTION_FAILURE;
616 
617  /*
618  * If we don't get a message from the PGresult, try the PGconn. This
619  * is needed because for connection-level failures, PQexec may just
620  * return NULL, not a PGresult at all.
621  */
622  if (message_primary == NULL)
623  message_primary = pchomp(PQerrorMessage(conn));
624 
625  ereport(elevel,
626  (errcode(sqlstate),
627  message_primary ? errmsg_internal("%s", message_primary) :
628  errmsg("could not obtain message string for remote error"),
629  message_detail ? errdetail_internal("%s", message_detail) : 0,
630  message_hint ? errhint("%s", message_hint) : 0,
631  message_context ? errcontext("%s", message_context) : 0,
632  sql ? errcontext("remote SQL command: %s", sql) : 0));
633  }
634  PG_FINALLY();
635  {
636  if (clear)
637  PQclear(res);
638  }
639  PG_END_TRY();
640 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6623
int errhint(const char *fmt,...)
Definition: elog.c:1069
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:62
int errcode(int sqlerrcode)
Definition: elog.c:608
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
int errdetail_internal(const char *fmt,...)
Definition: elog.c:982
char * pchomp(const char *in)
Definition: mcxt.c:1214
#define ereport(elevel, rest)
Definition: elog.h:141
static int elevel
Definition: vacuumlazy.c:143
#define PG_FINALLY()
Definition: elog.h:339
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
void PQclear(PGresult *res)
Definition: fe-exec.c:694
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2754
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define errcontext
Definition: elog.h:183
#define PG_TRY()
Definition: elog.h:322
#define PG_END_TRY()
Definition: elog.h:347
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64

◆ pgfdw_subxact_callback()

static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
)
static

Definition at line 833 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, do_sql_command(), elog, ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, in_error_recursion_trouble(), pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), pgfdw_reject_incomplete_xact_state_change(), PQTRANS_ACTIVE, PQtransactionStatus(), snprintf, SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

835 {
836  HASH_SEQ_STATUS scan;
837  ConnCacheEntry *entry;
838  int curlevel;
839 
840  /* Nothing to do at subxact start, nor after commit. */
841  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
842  event == SUBXACT_EVENT_ABORT_SUB))
843  return;
844 
845  /* Quick exit if no connections were touched in this transaction. */
846  if (!xact_got_connection)
847  return;
848 
849  /*
850  * Scan all connection cache entries to find open remote subtransactions
851  * of the current level, and close them.
852  */
853  curlevel = GetCurrentTransactionNestLevel();
855  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
856  {
857  char sql[100];
858 
859  /*
860  * We only care about connections with open remote subtransactions of
861  * the current level.
862  */
863  if (entry->conn == NULL || entry->xact_depth < curlevel)
864  continue;
865 
866  if (entry->xact_depth > curlevel)
867  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
868  entry->xact_depth);
869 
870  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
871  {
872  /*
873  * If abort cleanup previously failed for this connection, we
874  * can't issue any more commands against it.
875  */
877 
878  /* Commit all remote subtransactions during pre-commit */
879  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
880  entry->changing_xact_state = true;
881  do_sql_command(entry->conn, sql);
882  entry->changing_xact_state = false;
883  }
884  else if (in_error_recursion_trouble())
885  {
886  /*
887  * Don't try to clean up the connection if we're already in error
888  * recursion trouble.
889  */
890  entry->changing_xact_state = true;
891  }
892  else if (!entry->changing_xact_state)
893  {
894  bool abort_cleanup_failure = false;
895 
896  /* Remember that abort cleanup is in progress. */
897  entry->changing_xact_state = true;
898 
899  /* Assume we might have lost track of prepared statements */
900  entry->have_error = true;
901 
902  /*
903  * If a command has been submitted to the remote server by using
904  * an asynchronous execution function, the command might not have
905  * yet completed. Check to see if a command is still being
906  * processed by the remote server, and if so, request cancellation
907  * of the command.
908  */
909  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
910  !pgfdw_cancel_query(entry->conn))
911  abort_cleanup_failure = true;
912  else
913  {
914  /* Rollback all remote subtransactions during abort */
915  snprintf(sql, sizeof(sql),
916  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
917  curlevel, curlevel);
918  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
919  abort_cleanup_failure = true;
920  }
921 
922  /* Disarm changing_xact_state if it all worked. */
923  entry->changing_xact_state = abort_cleanup_failure;
924  }
925 
926  /* OK, we're outta that level of subtransaction */
927  entry->xact_depth--;
928  }
929 }
#define ERROR
Definition: elog.h:43
bool changing_xact_state
Definition: connection.c:55
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:392
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6578
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1064
static HTAB * ConnectionHash
Definition: connection.c:64
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1017
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
bool in_error_recursion_trouble(void)
Definition: elog.c:197
PGconn * conn
Definition: connection.c:49
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:983
static bool xact_got_connection
Definition: connection.c:71
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
#define elog(elevel,...)
Definition: elog.h:228
#define snprintf
Definition: port.h:192

◆ pgfdw_xact_callback()

static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
)
static

Definition at line 646 of file connection.c.

References ConnCacheEntry::changing_xact_state, ConnCacheEntry::conn, CONNECTION_OK, cursor_number, DEBUG3, disconnect_pg_server(), do_sql_command(), elog, ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, in_error_recursion_trouble(), pgfdw_cancel_query(), pgfdw_exec_cleanup_query(), pgfdw_reject_incomplete_xact_state_change(), PQclear(), PQexec(), PQstatus(), PQTRANS_ACTIVE, PQTRANS_IDLE, PQtransactionStatus(), ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

647 {
648  HASH_SEQ_STATUS scan;
649  ConnCacheEntry *entry;
650 
651  /* Quick exit if no connections were touched in this transaction. */
652  if (!xact_got_connection)
653  return;
654 
655  /*
656  * Scan all connection cache entries to find open remote transactions, and
657  * close them.
658  */
660  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
661  {
662  PGresult *res;
663 
664  /* Ignore cache entry if no open connection right now */
665  if (entry->conn == NULL)
666  continue;
667 
668  /* If it has an open remote transaction, try to close it */
669  if (entry->xact_depth > 0)
670  {
671  bool abort_cleanup_failure = false;
672 
673  elog(DEBUG3, "closing remote transaction on connection %p",
674  entry->conn);
675 
676  switch (event)
677  {
680 
681  /*
682  * If abort cleanup previously failed for this connection,
683  * we can't issue any more commands against it.
684  */
686 
687  /* Commit all remote transactions during pre-commit */
688  entry->changing_xact_state = true;
689  do_sql_command(entry->conn, "COMMIT TRANSACTION");
690  entry->changing_xact_state = false;
691 
692  /*
693  * If there were any errors in subtransactions, and we
694  * made prepared statements, do a DEALLOCATE ALL to make
695  * sure we get rid of all prepared statements. This is
696  * annoying and not terribly bulletproof, but it's
697  * probably not worth trying harder.
698  *
699  * DEALLOCATE ALL only exists in 8.3 and later, so this
700  * constrains how old a server postgres_fdw can
701  * communicate with. We intentionally ignore errors in
702  * the DEALLOCATE, so that we can hobble along to some
703  * extent with older servers (leaking prepared statements
704  * as we go; but we don't really support update operations
705  * pre-8.3 anyway).
706  */
707  if (entry->have_prep_stmt && entry->have_error)
708  {
709  res = PQexec(entry->conn, "DEALLOCATE ALL");
710  PQclear(res);
711  }
712  entry->have_prep_stmt = false;
713  entry->have_error = false;
714  break;
716 
717  /*
718  * We disallow any remote transactions, since it's not
719  * very reasonable to hold them open until the prepared
720  * transaction is committed. For the moment, throw error
721  * unconditionally; later we might allow read-only cases.
722  * Note that the error will cause us to come right back
723  * here with event == XACT_EVENT_ABORT, so we'll clean up
724  * the connection state at that point.
725  */
726  ereport(ERROR,
727  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
728  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
729  break;
731  case XACT_EVENT_COMMIT:
732  case XACT_EVENT_PREPARE:
733  /* Pre-commit should have closed the open transaction */
734  elog(ERROR, "missed cleaning up connection during pre-commit");
735  break;
737  case XACT_EVENT_ABORT:
738 
739  /*
740  * Don't try to clean up the connection if we're already
741  * in error recursion trouble.
742  */
744  entry->changing_xact_state = true;
745 
746  /*
747  * If connection is already unsalvageable, don't touch it
748  * further.
749  */
750  if (entry->changing_xact_state)
751  break;
752 
753  /*
754  * Mark this connection as in the process of changing
755  * transaction state.
756  */
757  entry->changing_xact_state = true;
758 
759  /* Assume we might have lost track of prepared statements */
760  entry->have_error = true;
761 
762  /*
763  * If a command has been submitted to the remote server by
764  * using an asynchronous execution function, the command
765  * might not have yet completed. Check to see if a
766  * command is still being processed by the remote server,
767  * and if so, request cancellation of the command.
768  */
769  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
770  !pgfdw_cancel_query(entry->conn))
771  {
772  /* Unable to cancel running query. */
773  abort_cleanup_failure = true;
774  }
775  else if (!pgfdw_exec_cleanup_query(entry->conn,
776  "ABORT TRANSACTION",
777  false))
778  {
779  /* Unable to abort remote transaction. */
780  abort_cleanup_failure = true;
781  }
782  else if (entry->have_prep_stmt && entry->have_error &&
784  "DEALLOCATE ALL",
785  true))
786  {
787  /* Trouble clearing prepared statements. */
788  abort_cleanup_failure = true;
789  }
790  else
791  {
792  entry->have_prep_stmt = false;
793  entry->have_error = false;
794  }
795 
796  /* Disarm changing_xact_state if it all worked. */
797  entry->changing_xact_state = abort_cleanup_failure;
798  break;
799  }
800  }
801 
802  /* Reset state to show we're out of a transaction */
803  entry->xact_depth = 0;
804 
805  /*
806  * If the connection isn't in a good idle state, discard it to
807  * recover. Next GetConnection will open a new connection.
808  */
809  if (PQstatus(entry->conn) != CONNECTION_OK ||
811  entry->changing_xact_state)
812  {
813  elog(DEBUG3, "discarding connection %p", entry->conn);
814  disconnect_pg_server(entry);
815  }
816  }
817 
818  /*
819  * Regardless of the event type, we can now mark ourselves as out of the
820  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
821  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
822  */
823  xact_got_connection = false;
824 
825  /* Also reset cursor numbering for next transaction */
826  cursor_number = 0;
827 }
#define DEBUG3
Definition: elog.h:23
int errcode(int sqlerrcode)
Definition: elog.c:608
bool have_prep_stmt
Definition: connection.c:53
#define ERROR
Definition: elog.h:43
bool changing_xact_state
Definition: connection.c:55
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:392
static unsigned int cursor_number
Definition: connection.c:67
#define ereport(elevel, rest)
Definition: elog.h:141
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6578
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1064
static HTAB * ConnectionHash
Definition: connection.c:64
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1017
void PQclear(PGresult *res)
Definition: fe-exec.c:694
bool in_error_recursion_trouble(void)
Definition: elog.c:197
PGconn * conn
Definition: connection.c:49
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:306
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:983
static bool xact_got_connection
Definition: connection.c:71
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1939
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6570

◆ ReleaseConnection()

void ReleaseConnection ( PGconn conn)

Definition at line 458 of file connection.c.

Referenced by estimate_path_cost_size(), finish_foreign_modify(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndDirectModify(), postgresEndForeignScan(), and postgresImportForeignSchema().

459 {
460  /*
461  * Currently, we don't actually track connection references because all
462  * cleanup is managed on a transaction or subtransaction basis instead. So
463  * there's nothing to do here.
464  */
465 }

Variable Documentation

◆ ConnectionHash

HTAB* ConnectionHash = NULL
static

Definition at line 64 of file connection.c.

◆ cursor_number

unsigned int cursor_number = 0
static

Definition at line 67 of file connection.c.

Referenced by GetCursorNumber(), and pgfdw_xact_callback().

◆ prep_stmt_number

unsigned int prep_stmt_number = 0
static

Definition at line 68 of file connection.c.

Referenced by GetPrepStmtNumber().

◆ xact_got_connection

bool xact_got_connection = false
static

Definition at line 71 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().