PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
connection.c File Reference
#include "postgres.h"
#include "postgres_fdw.h"
#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheEntry
 

Typedefs

typedef Oid ConnCacheKey
 
typedef struct ConnCacheEntry ConnCacheEntry
 

Functions

static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
 
static void check_conn_params (const char **keywords, const char **values)
 
static void configure_remote_session (PGconn *conn)
 
static void do_sql_command (PGconn *conn, const char *sql)
 
static void begin_remote_xact (ConnCacheEntry *entry)
 
static void pgfdw_xact_callback (XactEvent event, void *arg)
 
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
 
PGconnGetConnection (UserMapping *user, bool will_prep_stmt)
 
void ReleaseConnection (PGconn *conn)
 
unsigned int GetCursorNumber (PGconn *conn)
 
unsigned int GetPrepStmtNumber (PGconn *conn)
 
PGresultpgfdw_exec_query (PGconn *conn, const char *query)
 
PGresultpgfdw_get_result (PGconn *conn, const char *query)
 
void pgfdw_report_error (int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
 

Variables

static HTABConnectionHash = NULL
 
static unsigned int cursor_number = 0
 
static unsigned int prep_stmt_number = 0
 
static bool xact_got_connection = false
 

Typedef Documentation

Definition at line 42 of file connection.c.

Function Documentation

static void begin_remote_xact ( ConnCacheEntry entry)
static

Definition at line 363 of file connection.c.

References ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog, GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf(), and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

364 {
365  int curlevel = GetCurrentTransactionNestLevel();
366 
367  /* Start main transaction if we haven't yet */
368  if (entry->xact_depth <= 0)
369  {
370  const char *sql;
371 
372  elog(DEBUG3, "starting remote transaction on connection %p",
373  entry->conn);
374 
376  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
377  else
378  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
379  do_sql_command(entry->conn, sql);
380  entry->xact_depth = 1;
381  }
382 
383  /*
384  * If we're in a subtransaction, stack up savepoints to match our level.
385  * This ensures we can rollback just the desired effects when a
386  * subtransaction aborts.
387  */
388  while (entry->xact_depth < curlevel)
389  {
390  char sql[64];
391 
392  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
393  do_sql_command(entry->conn, sql);
394  entry->xact_depth++;
395  }
396 }
#define DEBUG3
Definition: elog.h:23
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:342
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:761
PGconn * conn
Definition: connection.c:47
#define IsolationIsSerializable()
Definition: xact.h:44
#define elog
Definition: elog.h:219
static void check_conn_params ( const char **  keywords,
const char **  values 
)
static

Definition at line 273 of file connection.c.

References ereport, errcode(), errdetail(), errmsg(), ERROR, i, NULL, and superuser().

Referenced by connect_pg_server().

274 {
275  int i;
276 
277  /* no check required if superuser */
278  if (superuser())
279  return;
280 
281  /* ok if params contain a non-empty password */
282  for (i = 0; keywords[i] != NULL; i++)
283  {
284  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
285  return;
286  }
287 
288  ereport(ERROR,
289  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
290  errmsg("password is required"),
291  errdetail("Non-superusers must provide a password in the user mapping.")));
292 }
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:229
static Datum values[MAXATTR]
Definition: bootstrap.c:162
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static void configure_remote_session ( PGconn conn)
static

Definition at line 306 of file connection.c.

References do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

307 {
308  int remoteversion = PQserverVersion(conn);
309 
310  /* Force the search path to contain only pg_catalog (see deparse.c) */
311  do_sql_command(conn, "SET search_path = pg_catalog");
312 
313  /*
314  * Set remote timezone; this is basically just cosmetic, since all
315  * transmitted and returned timestamptzs should specify a zone explicitly
316  * anyway. However it makes the regression test outputs more predictable.
317  *
318  * We don't risk setting remote zone equal to ours, since the remote
319  * server might use a different timezone database. Instead, use UTC
320  * (quoted, because very old servers are picky about case).
321  */
322  do_sql_command(conn, "SET timezone = 'UTC'");
323 
324  /*
325  * Set values needed to ensure unambiguous data output from remote. (This
326  * logic should match what pg_dump does. See also set_transmission_modes
327  * in postgres_fdw.c.)
328  */
329  do_sql_command(conn, "SET datestyle = ISO");
330  if (remoteversion >= 80400)
331  do_sql_command(conn, "SET intervalstyle = postgres");
332  if (remoteversion >= 90000)
333  do_sql_command(conn, "SET extra_float_digits = 3");
334  else
335  do_sql_command(conn, "SET extra_float_digits = 2");
336 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6001
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:342
static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
)
static

Definition at line 183 of file connection.c.

References check_conn_params(), configure_remote_session(), conn, CONNECTION_OK, ereport, errcode(), errdetail(), errdetail_internal(), errhint(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), list_length(), NULL, ForeignServer::options, UserMapping::options, palloc(), pchomp(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQconnectdbParams(), PQconnectionUsedPassword(), PQerrorMessage(), PQfinish(), PQstatus(), ForeignServer::servername, superuser(), and values.

Referenced by GetConnection().

184 {
185  PGconn *volatile conn = NULL;
186 
187  /*
188  * Use PG_TRY block to ensure closing connection on error.
189  */
190  PG_TRY();
191  {
192  const char **keywords;
193  const char **values;
194  int n;
195 
196  /*
197  * Construct connection params from generic options of ForeignServer
198  * and UserMapping. (Some of them might not be libpq options, in
199  * which case we'll just waste a few array slots.) Add 3 extra slots
200  * for fallback_application_name, client_encoding, end marker.
201  */
202  n = list_length(server->options) + list_length(user->options) + 3;
203  keywords = (const char **) palloc(n * sizeof(char *));
204  values = (const char **) palloc(n * sizeof(char *));
205 
206  n = 0;
207  n += ExtractConnectionOptions(server->options,
208  keywords + n, values + n);
210  keywords + n, values + n);
211 
212  /* Use "postgres_fdw" as fallback_application_name. */
213  keywords[n] = "fallback_application_name";
214  values[n] = "postgres_fdw";
215  n++;
216 
217  /* Set client_encoding so that libpq can convert encoding properly. */
218  keywords[n] = "client_encoding";
219  values[n] = GetDatabaseEncodingName();
220  n++;
221 
222  keywords[n] = values[n] = NULL;
223 
224  /* verify connection parameters and make connection */
225  check_conn_params(keywords, values);
226 
227  conn = PQconnectdbParams(keywords, values, false);
228  if (!conn || PQstatus(conn) != CONNECTION_OK)
229  ereport(ERROR,
230  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
231  errmsg("could not connect to server \"%s\"",
232  server->servername),
233  errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
234 
235  /*
236  * Check that non-superuser has used password to establish connection;
237  * otherwise, he's piggybacking on the postgres server's user
238  * identity. See also dblink_security_check() in contrib/dblink.
239  */
240  if (!superuser() && !PQconnectionUsedPassword(conn))
241  ereport(ERROR,
242  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
243  errmsg("password is required"),
244  errdetail("Non-superuser cannot connect if the server does not request a password."),
245  errhint("Target server's authentication method must be changed.")));
246 
247  /* Prepare new session for use */
249 
250  pfree(keywords);
251  pfree(values);
252  }
253  PG_CATCH();
254  {
255  /* Release PGconn data structure if we managed to create one */
256  if (conn)
257  PQfinish(conn);
258  PG_RE_THROW();
259  }
260  PG_END_TRY();
261 
262  return conn;
263 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6011
static void configure_remote_session(PGconn *conn)
Definition: connection.c:306
int errhint(const char *fmt,...)
Definition: elog.c:987
int errcode(int sqlerrcode)
Definition: elog.c:575
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3568
bool superuser(void)
Definition: superuser.c:47
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:470
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
char * pchomp(const char *in)
Definition: mcxt.c:1101
void pfree(void *pointer)
Definition: mcxt.c:950
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:297
#define ERROR
Definition: elog.h:43
PGconn * conn
Definition: streamutil.c:42
int errdetail(const char *fmt,...)
Definition: elog.c:873
List * options
Definition: foreign.h:61
#define ereport(elevel, rest)
Definition: elog.h:122
static void check_conn_params(const char **keywords, const char **values)
Definition: connection.c:273
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:229
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1021
static int list_length(const List *l)
Definition: pg_list.h:89
#define PG_RE_THROW()
Definition: elog.h:314
static Datum values[MAXATTR]
Definition: bootstrap.c:162
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6060
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * servername
Definition: foreign.h:50
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:5958
#define PG_TRY()
Definition: elog.h:284
List * options
Definition: foreign.h:53
#define PG_END_TRY()
Definition: elog.h:300
static void do_sql_command ( PGconn conn,
const char *  sql 
)
static

Definition at line 342 of file connection.c.

References ERROR, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexec(), and PQresultStatus().

Referenced by begin_remote_xact(), configure_remote_session(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

343 {
344  PGresult *res;
345 
346  res = PQexec(conn, sql);
347  if (PQresultStatus(res) != PGRES_COMMAND_OK)
348  pgfdw_report_error(ERROR, res, conn, true, sql);
349  PQclear(res);
350 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:528
void PQclear(PGresult *res)
Definition: fe-exec.c:650
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
PGconn* GetConnection ( UserMapping user,
bool  will_prep_stmt 
)

Definition at line 97 of file connection.c.

References begin_remote_xact(), CacheMemoryContext, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, elog, HASHCTL::entrysize, GetForeignServer(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, HASHCTL::hcxt, HASHCTL::keysize, MemSet, NULL, pgfdw_subxact_callback(), pgfdw_xact_callback(), RegisterSubXactCallback(), RegisterXactCallback(), UserMapping::serverid, ForeignServer::servername, UserMapping::umid, UserMapping::userid, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by dumpBlobs(), dumpDatabase(), dumpTableData_copy(), estimate_path_cost_size(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignModify(), postgresBeginForeignScan(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

98 {
99  bool found;
100  ConnCacheEntry *entry;
101  ConnCacheKey key;
102 
103  /* First time through, initialize connection cache hashtable */
104  if (ConnectionHash == NULL)
105  {
106  HASHCTL ctl;
107 
108  MemSet(&ctl, 0, sizeof(ctl));
109  ctl.keysize = sizeof(ConnCacheKey);
110  ctl.entrysize = sizeof(ConnCacheEntry);
111  /* allocate ConnectionHash in the cache context */
112  ctl.hcxt = CacheMemoryContext;
113  ConnectionHash = hash_create("postgres_fdw connections", 8,
114  &ctl,
116 
117  /*
118  * Register some callback functions that manage connection cleanup.
119  * This should be done just once in each backend.
120  */
123  }
124 
125  /* Set flag that we did GetConnection during the current transaction */
126  xact_got_connection = true;
127 
128  /* Create hash key for the entry. Assume no pad bytes in key struct */
129  key = user->umid;
130 
131  /*
132  * Find or create cached entry for requested connection.
133  */
134  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
135  if (!found)
136  {
137  /* initialize new hashtable entry (key is already filled in) */
138  entry->conn = NULL;
139  entry->xact_depth = 0;
140  entry->have_prep_stmt = false;
141  entry->have_error = false;
142  }
143 
144  /*
145  * We don't check the health of cached connection here, because it would
146  * require some overhead. Broken connection will be detected when the
147  * connection is actually used.
148  */
149 
150  /*
151  * If cache entry doesn't have a connection, we have to establish a new
152  * connection. (If connect_pg_server throws an error, the cache entry
153  * will be left in a valid empty state.)
154  */
155  if (entry->conn == NULL)
156  {
157  ForeignServer *server = GetForeignServer(user->serverid);
158 
159  entry->xact_depth = 0; /* just to be sure */
160  entry->have_prep_stmt = false;
161  entry->have_error = false;
162  entry->conn = connect_pg_server(server, user);
163 
164  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
165  entry->conn, server->servername, user->umid, user->userid);
166  }
167 
168  /*
169  * Start a new transaction or subtransaction if needed.
170  */
171  begin_remote_xact(entry);
172 
173  /* Remember if caller will prepare statements */
174  entry->have_prep_stmt |= will_prep_stmt;
175 
176  return entry->conn;
177 }
Oid umid
Definition: foreign.h:58
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define DEBUG3
Definition: elog.h:23
struct ConnCacheEntry ConnCacheEntry
Size entrysize
Definition: hsearch.h:73
#define MemSet(start, val, len)
Definition: c.h:857
bool have_prep_stmt
Definition: connection.c:50
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
Oid userid
Definition: foreign.h:59
#define HASH_BLOBS
Definition: hsearch.h:88
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:582
static HTAB * ConnectionHash
Definition: connection.c:57
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:93
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3360
Size keysize
Definition: hsearch.h:72
PGconn * conn
Definition: connection.c:47
#define NULL
Definition: c.h:229
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3305
static bool xact_got_connection
Definition: connection.c:64
Oid serverid
Definition: foreign.h:60
char * servername
Definition: foreign.h:50
#define elog
Definition: elog.h:219
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:742
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:363
Oid ConnCacheKey
Definition: connection.c:42
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:183
unsigned int GetCursorNumber ( PGconn conn)

Definition at line 423 of file connection.c.

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

424 {
425  return ++cursor_number;
426 }
static unsigned int cursor_number
Definition: connection.c:60
unsigned int GetPrepStmtNumber ( PGconn conn)

Definition at line 437 of file connection.c.

References prep_stmt_number.

Referenced by prepare_foreign_modify().

438 {
439  return ++prep_stmt_number;
440 }
static unsigned int prep_stmt_number
Definition: connection.c:61
PGresult* pgfdw_exec_query ( PGconn conn,
const char *  query 
)

Definition at line 450 of file connection.c.

References ERROR, NULL, pgfdw_get_result(), pgfdw_report_error(), and PQsendQuery().

Referenced by close_cursor(), fetch_more_data(), get_remote_estimate(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndForeignModify(), postgresImportForeignSchema(), and postgresReScanForeignScan().

451 {
452  /*
453  * Submit a query. Since we don't use non-blocking mode, this also can
454  * block. But its risk is relatively small, so we ignore that for now.
455  */
456  if (!PQsendQuery(conn, query))
457  pgfdw_report_error(ERROR, NULL, conn, false, query);
458 
459  /* Wait for the result. */
460  return pgfdw_get_result(conn, query);
461 }
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1132
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:528
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:474
#define NULL
Definition: c.h:229
PGresult* pgfdw_get_result ( PGconn conn,
const char *  query 
)

Definition at line 474 of file connection.c.

References CHECK_FOR_INTERRUPTS, ERROR, MyLatch, NULL, PG_WAIT_EXTENSION, pgfdw_report_error(), PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ResetLatch(), WaitLatchOrSocket(), WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by create_cursor(), execute_dml_stmt(), pgfdw_exec_query(), postgresExecForeignDelete(), postgresExecForeignInsert(), postgresExecForeignUpdate(), and prepare_foreign_modify().

475 {
476  PGresult *last_res = NULL;
477 
478  for (;;)
479  {
480  PGresult *res;
481 
482  while (PQisBusy(conn))
483  {
484  int wc;
485 
486  /* Sleep until there's something to do */
489  PQsocket(conn),
490  -1L, PG_WAIT_EXTENSION);
492 
494 
495  /* Data available in socket */
496  if (wc & WL_SOCKET_READABLE)
497  {
498  if (!PQconsumeInput(conn))
499  pgfdw_report_error(ERROR, NULL, conn, false, query);
500  }
501  }
502 
503  res = PQgetResult(conn);
504  if (res == NULL)
505  break; /* query is complete */
506 
507  PQclear(last_res);
508  last_res = res;
509  }
510 
511  return last_res;
512 }
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:528
#define PG_WAIT_EXTENSION
Definition: pgstat.h:742
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1631
void PQclear(PGresult *res)
Definition: fe-exec.c:650
#define NULL
Definition: c.h:229
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1681
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6029
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1702
void pgfdw_report_error ( int  elevel,
PGresult res,
PGconn conn,
bool  clear,
const char *  sql 
)

Definition at line 528 of file connection.c.

References ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, NULL, pchomp(), PG_CATCH, PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_RE_THROW, PG_TRY, PQclear(), PQerrorMessage(), and PQresultErrorField().

Referenced by close_cursor(), create_cursor(), do_sql_command(), execute_dml_stmt(), fetch_more_data(), get_remote_estimate(), pgfdw_exec_query(), pgfdw_get_result(), pgfdw_subxact_callback(), pgfdw_xact_callback(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndForeignModify(), postgresExecForeignDelete(), postgresExecForeignInsert(), postgresExecForeignUpdate(), postgresImportForeignSchema(), postgresReScanForeignScan(), and prepare_foreign_modify().

530 {
531  /* If requested, PGresult must be released before leaving this function. */
532  PG_TRY();
533  {
534  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
535  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
536  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
537  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
538  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
539  int sqlstate;
540 
541  if (diag_sqlstate)
542  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
543  diag_sqlstate[1],
544  diag_sqlstate[2],
545  diag_sqlstate[3],
546  diag_sqlstate[4]);
547  else
548  sqlstate = ERRCODE_CONNECTION_FAILURE;
549 
550  /*
551  * If we don't get a message from the PGresult, try the PGconn. This
552  * is needed because for connection-level failures, PQexec may just
553  * return NULL, not a PGresult at all.
554  */
555  if (message_primary == NULL)
556  message_primary = pchomp(PQerrorMessage(conn));
557 
558  ereport(elevel,
559  (errcode(sqlstate),
560  message_primary ? errmsg_internal("%s", message_primary) :
561  errmsg("could not obtain message string for remote error"),
562  message_detail ? errdetail_internal("%s", message_detail) : 0,
563  message_hint ? errhint("%s", message_hint) : 0,
564  message_context ? errcontext("%s", message_context) : 0,
565  sql ? errcontext("Remote SQL command: %s", sql) : 0));
566  }
567  PG_CATCH();
568  {
569  if (clear)
570  PQclear(res);
571  PG_RE_THROW();
572  }
573  PG_END_TRY();
574  if (clear)
575  PQclear(res);
576 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6011
int errhint(const char *fmt,...)
Definition: elog.c:987
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:62
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
char * pchomp(const char *in)
Definition: mcxt.c:1101
#define ereport(elevel, rest)
Definition: elog.h:122
static int elevel
Definition: vacuumlazy.c:137
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
void PQclear(PGresult *res)
Definition: fe-exec.c:650
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define PG_CATCH()
Definition: elog.h:293
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2658
#define NULL
Definition: c.h:229
#define PG_RE_THROW()
Definition: elog.h:314
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define errcontext
Definition: elog.h:164
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64
static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
)
static

Definition at line 742 of file connection.c.

References ConnCacheEntry::conn, do_sql_command(), elog, ereport, errcode(), errmsg(), ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, NULL, pgfdw_report_error(), PGRES_COMMAND_OK, PQcancel(), PQclear(), PQexec(), PQfreeCancel(), PQgetCancel(), PQresultStatus(), PQTRANS_ACTIVE, PQtransactionStatus(), snprintf(), SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, WARNING, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

744 {
745  HASH_SEQ_STATUS scan;
746  ConnCacheEntry *entry;
747  int curlevel;
748 
749  /* Nothing to do at subxact start, nor after commit. */
750  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
751  event == SUBXACT_EVENT_ABORT_SUB))
752  return;
753 
754  /* Quick exit if no connections were touched in this transaction. */
755  if (!xact_got_connection)
756  return;
757 
758  /*
759  * Scan all connection cache entries to find open remote subtransactions
760  * of the current level, and close them.
761  */
762  curlevel = GetCurrentTransactionNestLevel();
764  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
765  {
766  PGresult *res;
767  char sql[100];
768 
769  /*
770  * We only care about connections with open remote subtransactions of
771  * the current level.
772  */
773  if (entry->conn == NULL || entry->xact_depth < curlevel)
774  continue;
775 
776  if (entry->xact_depth > curlevel)
777  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
778  entry->xact_depth);
779 
780  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
781  {
782  /* Commit all remote subtransactions during pre-commit */
783  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
784  do_sql_command(entry->conn, sql);
785  }
786  else
787  {
788  /* Assume we might have lost track of prepared statements */
789  entry->have_error = true;
790 
791  /*
792  * If a command has been submitted to the remote server by using
793  * an asynchronous execution function, the command might not have
794  * yet completed. Check to see if a command is still being
795  * processed by the remote server, and if so, request cancellation
796  * of the command.
797  */
798  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
799  {
800  PGcancel *cancel;
801  char errbuf[256];
802 
803  if ((cancel = PQgetCancel(entry->conn)))
804  {
805  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
807  (errcode(ERRCODE_CONNECTION_FAILURE),
808  errmsg("could not send cancel request: %s",
809  errbuf)));
810  PQfreeCancel(cancel);
811  }
812  }
813 
814  /* Rollback all remote subtransactions during abort */
815  snprintf(sql, sizeof(sql),
816  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
817  curlevel, curlevel);
818  res = PQexec(entry->conn, sql);
819  if (PQresultStatus(res) != PGRES_COMMAND_OK)
820  pgfdw_report_error(WARNING, res, entry->conn, true, sql);
821  else
822  PQclear(res);
823  }
824 
825  /* OK, we're outta that level of subtransaction */
826  entry->xact_depth--;
827  }
828 }
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:3712
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:342
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:3689
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:528
#define ereport(elevel, rest)
Definition: elog.h:122
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:5966
#define WARNING
Definition: elog.h:40
static HTAB * ConnectionHash
Definition: connection.c:57
void PQclear(PGresult *res)
Definition: fe-exec.c:650
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:761
PGconn * conn
Definition: connection.c:47
#define NULL
Definition: c.h:229
static bool xact_got_connection
Definition: connection.c:64
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1353
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1343
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:3844
int errmsg(const char *fmt,...)
Definition: elog.c:797
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
#define elog
Definition: elog.h:219
static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
)
static

Definition at line 582 of file connection.c.

References ConnCacheEntry::conn, CONNECTION_OK, cursor_number, DEBUG3, do_sql_command(), elog, ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, NULL, pgfdw_report_error(), PGRES_COMMAND_OK, PQcancel(), PQclear(), PQexec(), PQfinish(), PQfreeCancel(), PQgetCancel(), PQresultStatus(), PQstatus(), PQTRANS_ACTIVE, PQTRANS_IDLE, PQtransactionStatus(), WARNING, ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

583 {
584  HASH_SEQ_STATUS scan;
585  ConnCacheEntry *entry;
586 
587  /* Quick exit if no connections were touched in this transaction. */
588  if (!xact_got_connection)
589  return;
590 
591  /*
592  * Scan all connection cache entries to find open remote transactions, and
593  * close them.
594  */
596  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
597  {
598  PGresult *res;
599 
600  /* Ignore cache entry if no open connection right now */
601  if (entry->conn == NULL)
602  continue;
603 
604  /* If it has an open remote transaction, try to close it */
605  if (entry->xact_depth > 0)
606  {
607  elog(DEBUG3, "closing remote transaction on connection %p",
608  entry->conn);
609 
610  switch (event)
611  {
614  /* Commit all remote transactions during pre-commit */
615  do_sql_command(entry->conn, "COMMIT TRANSACTION");
616 
617  /*
618  * If there were any errors in subtransactions, and we
619  * made prepared statements, do a DEALLOCATE ALL to make
620  * sure we get rid of all prepared statements. This is
621  * annoying and not terribly bulletproof, but it's
622  * probably not worth trying harder.
623  *
624  * DEALLOCATE ALL only exists in 8.3 and later, so this
625  * constrains how old a server postgres_fdw can
626  * communicate with. We intentionally ignore errors in
627  * the DEALLOCATE, so that we can hobble along to some
628  * extent with older servers (leaking prepared statements
629  * as we go; but we don't really support update operations
630  * pre-8.3 anyway).
631  */
632  if (entry->have_prep_stmt && entry->have_error)
633  {
634  res = PQexec(entry->conn, "DEALLOCATE ALL");
635  PQclear(res);
636  }
637  entry->have_prep_stmt = false;
638  entry->have_error = false;
639  break;
641 
642  /*
643  * We disallow remote transactions that modified anything,
644  * since it's not very reasonable to hold them open until
645  * the prepared transaction is committed. For the moment,
646  * throw error unconditionally; later we might allow
647  * read-only cases. Note that the error will cause us to
648  * come right back here with event == XACT_EVENT_ABORT, so
649  * we'll clean up the connection state at that point.
650  */
651  ereport(ERROR,
652  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
653  errmsg("cannot prepare a transaction that modified remote tables")));
654  break;
656  case XACT_EVENT_COMMIT:
657  case XACT_EVENT_PREPARE:
658  /* Pre-commit should have closed the open transaction */
659  elog(ERROR, "missed cleaning up connection during pre-commit");
660  break;
662  case XACT_EVENT_ABORT:
663  /* Assume we might have lost track of prepared statements */
664  entry->have_error = true;
665 
666  /*
667  * If a command has been submitted to the remote server by
668  * using an asynchronous execution function, the command
669  * might not have yet completed. Check to see if a
670  * command is still being processed by the remote server,
671  * and if so, request cancellation of the command.
672  */
673  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
674  {
675  PGcancel *cancel;
676  char errbuf[256];
677 
678  if ((cancel = PQgetCancel(entry->conn)))
679  {
680  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
682  (errcode(ERRCODE_CONNECTION_FAILURE),
683  errmsg("could not send cancel request: %s",
684  errbuf)));
685  PQfreeCancel(cancel);
686  }
687  }
688 
689  /* If we're aborting, abort all remote transactions too */
690  res = PQexec(entry->conn, "ABORT TRANSACTION");
691  /* Note: can't throw ERROR, it would be infinite loop */
692  if (PQresultStatus(res) != PGRES_COMMAND_OK)
693  pgfdw_report_error(WARNING, res, entry->conn, true,
694  "ABORT TRANSACTION");
695  else
696  {
697  PQclear(res);
698  /* As above, make sure to clear any prepared stmts */
699  if (entry->have_prep_stmt && entry->have_error)
700  {
701  res = PQexec(entry->conn, "DEALLOCATE ALL");
702  PQclear(res);
703  }
704  entry->have_prep_stmt = false;
705  entry->have_error = false;
706  }
707  break;
708  }
709  }
710 
711  /* Reset state to show we're out of a transaction */
712  entry->xact_depth = 0;
713 
714  /*
715  * If the connection isn't in a good idle state, discard it to
716  * recover. Next GetConnection will open a new connection.
717  */
718  if (PQstatus(entry->conn) != CONNECTION_OK ||
720  {
721  elog(DEBUG3, "discarding connection %p", entry->conn);
722  PQfinish(entry->conn);
723  entry->conn = NULL;
724  }
725  }
726 
727  /*
728  * Regardless of the event type, we can now mark ourselves as out of the
729  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
730  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
731  */
732  xact_got_connection = false;
733 
734  /* Also reset cursor numbering for next transaction */
735  cursor_number = 0;
736 }
#define DEBUG3
Definition: elog.h:23
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:3712
int errcode(int sqlerrcode)
Definition: elog.c:575
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3568
bool have_prep_stmt
Definition: connection.c:50
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:342
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:3689
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:528
static unsigned int cursor_number
Definition: connection.c:60
#define ereport(elevel, rest)
Definition: elog.h:122
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:5966
#define WARNING
Definition: elog.h:40
static HTAB * ConnectionHash
Definition: connection.c:57
void PQclear(PGresult *res)
Definition: fe-exec.c:650
PGconn * conn
Definition: connection.c:47
#define NULL
Definition: c.h:229
static bool xact_got_connection
Definition: connection.c:64
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1353
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1343
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:3844
int errmsg(const char *fmt,...)
Definition: elog.c:797
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
#define elog
Definition: elog.h:219
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:5958
void ReleaseConnection ( PGconn conn)

Definition at line 402 of file connection.c.

Referenced by estimate_path_cost_size(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndDirectModify(), postgresEndForeignModify(), postgresEndForeignScan(), and postgresImportForeignSchema().

403 {
404  /*
405  * Currently, we don't actually track connection references because all
406  * cleanup is managed on a transaction or subtransaction basis instead. So
407  * there's nothing to do here.
408  */
409 }

Variable Documentation

HTAB* ConnectionHash = NULL
static

Definition at line 57 of file connection.c.

unsigned int cursor_number = 0
static
unsigned int prep_stmt_number = 0
static

Definition at line 61 of file connection.c.

Referenced by GetPrepStmtNumber().

bool xact_got_connection = false
static

Definition at line 64 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().