PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
connection.c File Reference
#include "postgres.h"
#include "postgres_fdw.h"
#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheEntry
 

Typedefs

typedef Oid ConnCacheKey
 
typedef struct ConnCacheEntry ConnCacheEntry
 

Functions

static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
 
static void check_conn_params (const char **keywords, const char **values)
 
static void configure_remote_session (PGconn *conn)
 
static void do_sql_command (PGconn *conn, const char *sql)
 
static void begin_remote_xact (ConnCacheEntry *entry)
 
static void pgfdw_xact_callback (XactEvent event, void *arg)
 
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
 
PGconnGetConnection (UserMapping *user, bool will_prep_stmt)
 
void ReleaseConnection (PGconn *conn)
 
unsigned int GetCursorNumber (PGconn *conn)
 
unsigned int GetPrepStmtNumber (PGconn *conn)
 
PGresultpgfdw_exec_query (PGconn *conn, const char *query)
 
PGresultpgfdw_get_result (PGconn *conn, const char *query)
 
void pgfdw_report_error (int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
 

Variables

static HTABConnectionHash = NULL
 
static unsigned int cursor_number = 0
 
static unsigned int prep_stmt_number = 0
 
static bool xact_got_connection = false
 

Typedef Documentation

Definition at line 42 of file connection.c.

Function Documentation

static void begin_remote_xact ( ConnCacheEntry entry)
static

Definition at line 373 of file connection.c.

References ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog, GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf(), and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

374 {
375  int curlevel = GetCurrentTransactionNestLevel();
376 
377  /* Start main transaction if we haven't yet */
378  if (entry->xact_depth <= 0)
379  {
380  const char *sql;
381 
382  elog(DEBUG3, "starting remote transaction on connection %p",
383  entry->conn);
384 
386  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
387  else
388  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
389  do_sql_command(entry->conn, sql);
390  entry->xact_depth = 1;
391  }
392 
393  /*
394  * If we're in a subtransaction, stack up savepoints to match our level.
395  * This ensures we can rollback just the desired effects when a
396  * subtransaction aborts.
397  */
398  while (entry->xact_depth < curlevel)
399  {
400  char sql[64];
401 
402  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
403  do_sql_command(entry->conn, sql);
404  entry->xact_depth++;
405  }
406 }
#define DEBUG3
Definition: elog.h:23
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:352
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:760
PGconn * conn
Definition: connection.c:47
#define IsolationIsSerializable()
Definition: xact.h:44
#define elog
Definition: elog.h:219
static void check_conn_params ( const char **  keywords,
const char **  values 
)
static

Definition at line 283 of file connection.c.

References ereport, errcode(), errdetail(), errmsg(), ERROR, i, NULL, and superuser().

Referenced by connect_pg_server().

284 {
285  int i;
286 
287  /* no check required if superuser */
288  if (superuser())
289  return;
290 
291  /* ok if params contain a non-empty password */
292  for (i = 0; keywords[i] != NULL; i++)
293  {
294  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
295  return;
296  }
297 
298  ereport(ERROR,
299  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
300  errmsg("password is required"),
301  errdetail("Non-superusers must provide a password in the user mapping.")));
302 }
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:226
static Datum values[MAXATTR]
Definition: bootstrap.c:162
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
static void configure_remote_session ( PGconn conn)
static

Definition at line 316 of file connection.c.

References do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

317 {
318  int remoteversion = PQserverVersion(conn);
319 
320  /* Force the search path to contain only pg_catalog (see deparse.c) */
321  do_sql_command(conn, "SET search_path = pg_catalog");
322 
323  /*
324  * Set remote timezone; this is basically just cosmetic, since all
325  * transmitted and returned timestamptzs should specify a zone explicitly
326  * anyway. However it makes the regression test outputs more predictable.
327  *
328  * We don't risk setting remote zone equal to ours, since the remote
329  * server might use a different timezone database. Instead, use UTC
330  * (quoted, because very old servers are picky about case).
331  */
332  do_sql_command(conn, "SET timezone = 'UTC'");
333 
334  /*
335  * Set values needed to ensure unambiguous data output from remote. (This
336  * logic should match what pg_dump does. See also set_transmission_modes
337  * in postgres_fdw.c.)
338  */
339  do_sql_command(conn, "SET datestyle = ISO");
340  if (remoteversion >= 80400)
341  do_sql_command(conn, "SET intervalstyle = postgres");
342  if (remoteversion >= 90000)
343  do_sql_command(conn, "SET extra_float_digits = 3");
344  else
345  do_sql_command(conn, "SET extra_float_digits = 2");
346 }
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:5949
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:352
static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
)
static

Definition at line 183 of file connection.c.

References check_conn_params(), configure_remote_session(), conn, CONNECTION_OK, ereport, errcode(), errdetail(), errdetail_internal(), errhint(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), list_length(), NULL, ForeignServer::options, UserMapping::options, palloc(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQconnectdbParams(), PQconnectionUsedPassword(), PQerrorMessage(), PQfinish(), PQstatus(), pstrdup(), ForeignServer::servername, superuser(), and values.

Referenced by GetConnection().

184 {
185  PGconn *volatile conn = NULL;
186 
187  /*
188  * Use PG_TRY block to ensure closing connection on error.
189  */
190  PG_TRY();
191  {
192  const char **keywords;
193  const char **values;
194  int n;
195 
196  /*
197  * Construct connection params from generic options of ForeignServer
198  * and UserMapping. (Some of them might not be libpq options, in
199  * which case we'll just waste a few array slots.) Add 3 extra slots
200  * for fallback_application_name, client_encoding, end marker.
201  */
202  n = list_length(server->options) + list_length(user->options) + 3;
203  keywords = (const char **) palloc(n * sizeof(char *));
204  values = (const char **) palloc(n * sizeof(char *));
205 
206  n = 0;
207  n += ExtractConnectionOptions(server->options,
208  keywords + n, values + n);
210  keywords + n, values + n);
211 
212  /* Use "postgres_fdw" as fallback_application_name. */
213  keywords[n] = "fallback_application_name";
214  values[n] = "postgres_fdw";
215  n++;
216 
217  /* Set client_encoding so that libpq can convert encoding properly. */
218  keywords[n] = "client_encoding";
219  values[n] = GetDatabaseEncodingName();
220  n++;
221 
222  keywords[n] = values[n] = NULL;
223 
224  /* verify connection parameters and make connection */
225  check_conn_params(keywords, values);
226 
227  conn = PQconnectdbParams(keywords, values, false);
228  if (!conn || PQstatus(conn) != CONNECTION_OK)
229  {
230  char *connmessage;
231  int msglen;
232 
233  /* libpq typically appends a newline, strip that */
234  connmessage = pstrdup(PQerrorMessage(conn));
235  msglen = strlen(connmessage);
236  if (msglen > 0 && connmessage[msglen - 1] == '\n')
237  connmessage[msglen - 1] = '\0';
238  ereport(ERROR,
239  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
240  errmsg("could not connect to server \"%s\"",
241  server->servername),
242  errdetail_internal("%s", connmessage)));
243  }
244 
245  /*
246  * Check that non-superuser has used password to establish connection;
247  * otherwise, he's piggybacking on the postgres server's user
248  * identity. See also dblink_security_check() in contrib/dblink.
249  */
250  if (!superuser() && !PQconnectionUsedPassword(conn))
251  ereport(ERROR,
252  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
253  errmsg("password is required"),
254  errdetail("Non-superuser cannot connect if the server does not request a password."),
255  errhint("Target server's authentication method must be changed.")));
256 
257  /* Prepare new session for use */
259 
260  pfree(keywords);
261  pfree(values);
262  }
263  PG_CATCH();
264  {
265  /* Release PGconn data structure if we managed to create one */
266  if (conn)
267  PQfinish(conn);
268  PG_RE_THROW();
269  }
270  PG_END_TRY();
271 
272  return conn;
273 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:5959
static void configure_remote_session(PGconn *conn)
Definition: connection.c:316
int errhint(const char *fmt,...)
Definition: elog.c:987
char * pstrdup(const char *in)
Definition: mcxt.c:1165
int errcode(int sqlerrcode)
Definition: elog.c:575
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3516
bool superuser(void)
Definition: superuser.c:47
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:470
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
void pfree(void *pointer)
Definition: mcxt.c:992
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:297
#define ERROR
Definition: elog.h:43
PGconn * conn
Definition: streamutil.c:42
int errdetail(const char *fmt,...)
Definition: elog.c:873
List * options
Definition: foreign.h:61
#define ereport(elevel, rest)
Definition: elog.h:122
static void check_conn_params(const char **keywords, const char **values)
Definition: connection.c:283
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:226
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1021
static int list_length(const List *l)
Definition: pg_list.h:89
#define PG_RE_THROW()
Definition: elog.h:314
static Datum values[MAXATTR]
Definition: bootstrap.c:162
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6008
void * palloc(Size size)
Definition: mcxt.c:891
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * servername
Definition: foreign.h:50
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:5906
#define PG_TRY()
Definition: elog.h:284
List * options
Definition: foreign.h:53
#define PG_END_TRY()
Definition: elog.h:300
static void do_sql_command ( PGconn conn,
const char *  sql 
)
static

Definition at line 352 of file connection.c.

References ERROR, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexec(), and PQresultStatus().

Referenced by begin_remote_xact(), configure_remote_session(), pgfdw_subxact_callback(), and pgfdw_xact_callback().

353 {
354  PGresult *res;
355 
356  res = PQexec(conn, sql);
357  if (PQresultStatus(res) != PGRES_COMMAND_OK)
358  pgfdw_report_error(ERROR, res, conn, true, sql);
359  PQclear(res);
360 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
void PQclear(PGresult *res)
Definition: fe-exec.c:650
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
PGconn* GetConnection ( UserMapping user,
bool  will_prep_stmt 
)

Definition at line 97 of file connection.c.

References begin_remote_xact(), CacheMemoryContext, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, elog, HASHCTL::entrysize, GetForeignServer(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, HASHCTL::hcxt, HASHCTL::keysize, MemSet, NULL, pgfdw_subxact_callback(), pgfdw_xact_callback(), RegisterSubXactCallback(), RegisterXactCallback(), UserMapping::serverid, ForeignServer::servername, UserMapping::umid, UserMapping::userid, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by dumpBlobs(), dumpDatabase(), dumpTableData_copy(), estimate_path_cost_size(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignModify(), postgresBeginForeignScan(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

98 {
99  bool found;
100  ConnCacheEntry *entry;
101  ConnCacheKey key;
102 
103  /* First time through, initialize connection cache hashtable */
104  if (ConnectionHash == NULL)
105  {
106  HASHCTL ctl;
107 
108  MemSet(&ctl, 0, sizeof(ctl));
109  ctl.keysize = sizeof(ConnCacheKey);
110  ctl.entrysize = sizeof(ConnCacheEntry);
111  /* allocate ConnectionHash in the cache context */
112  ctl.hcxt = CacheMemoryContext;
113  ConnectionHash = hash_create("postgres_fdw connections", 8,
114  &ctl,
116 
117  /*
118  * Register some callback functions that manage connection cleanup.
119  * This should be done just once in each backend.
120  */
123  }
124 
125  /* Set flag that we did GetConnection during the current transaction */
126  xact_got_connection = true;
127 
128  /* Create hash key for the entry. Assume no pad bytes in key struct */
129  key = user->umid;
130 
131  /*
132  * Find or create cached entry for requested connection.
133  */
134  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
135  if (!found)
136  {
137  /* initialize new hashtable entry (key is already filled in) */
138  entry->conn = NULL;
139  entry->xact_depth = 0;
140  entry->have_prep_stmt = false;
141  entry->have_error = false;
142  }
143 
144  /*
145  * We don't check the health of cached connection here, because it would
146  * require some overhead. Broken connection will be detected when the
147  * connection is actually used.
148  */
149 
150  /*
151  * If cache entry doesn't have a connection, we have to establish a new
152  * connection. (If connect_pg_server throws an error, the cache entry
153  * will be left in a valid empty state.)
154  */
155  if (entry->conn == NULL)
156  {
157  ForeignServer *server = GetForeignServer(user->serverid);
158 
159  entry->xact_depth = 0; /* just to be sure */
160  entry->have_prep_stmt = false;
161  entry->have_error = false;
162  entry->conn = connect_pg_server(server, user);
163 
164  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
165  entry->conn, server->servername, user->umid, user->userid);
166  }
167 
168  /*
169  * Start a new transaction or subtransaction if needed.
170  */
171  begin_remote_xact(entry);
172 
173  /* Remember if caller will prepare statements */
174  entry->have_prep_stmt |= will_prep_stmt;
175 
176  return entry->conn;
177 }
Oid umid
Definition: foreign.h:58
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define DEBUG3
Definition: elog.h:23
struct ConnCacheEntry ConnCacheEntry
Size entrysize
Definition: hsearch.h:73
#define MemSet(start, val, len)
Definition: c.h:853
bool have_prep_stmt
Definition: connection.c:50
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
Oid userid
Definition: foreign.h:59
#define HASH_BLOBS
Definition: hsearch.h:88
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:592
static HTAB * ConnectionHash
Definition: connection.c:57
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:93
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3358
Size keysize
Definition: hsearch.h:72
PGconn * conn
Definition: connection.c:47
#define NULL
Definition: c.h:226
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3303
static bool xact_got_connection
Definition: connection.c:64
Oid serverid
Definition: foreign.h:60
char * servername
Definition: foreign.h:50
#define elog
Definition: elog.h:219
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:752
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:373
Oid ConnCacheKey
Definition: connection.c:42
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:183
unsigned int GetCursorNumber ( PGconn conn)

Definition at line 433 of file connection.c.

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

434 {
435  return ++cursor_number;
436 }
static unsigned int cursor_number
Definition: connection.c:60
unsigned int GetPrepStmtNumber ( PGconn conn)

Definition at line 447 of file connection.c.

References prep_stmt_number.

Referenced by prepare_foreign_modify().

448 {
449  return ++prep_stmt_number;
450 }
static unsigned int prep_stmt_number
Definition: connection.c:61
PGresult* pgfdw_exec_query ( PGconn conn,
const char *  query 
)

Definition at line 460 of file connection.c.

References ERROR, NULL, pgfdw_get_result(), pgfdw_report_error(), and PQsendQuery().

Referenced by close_cursor(), fetch_more_data(), get_remote_estimate(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndForeignModify(), postgresImportForeignSchema(), and postgresReScanForeignScan().

461 {
462  /*
463  * Submit a query. Since we don't use non-blocking mode, this also can
464  * block. But its risk is relatively small, so we ignore that for now.
465  */
466  if (!PQsendQuery(conn, query))
467  pgfdw_report_error(ERROR, NULL, conn, false, query);
468 
469  /* Wait for the result. */
470  return pgfdw_get_result(conn, query);
471 }
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1132
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:484
#define NULL
Definition: c.h:226
PGresult* pgfdw_get_result ( PGconn conn,
const char *  query 
)

Definition at line 484 of file connection.c.

References CHECK_FOR_INTERRUPTS, ERROR, MyLatch, NULL, PG_WAIT_EXTENSION, pgfdw_report_error(), PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), ResetLatch(), WaitLatchOrSocket(), WL_LATCH_SET, and WL_SOCKET_READABLE.

Referenced by create_cursor(), execute_dml_stmt(), pgfdw_exec_query(), postgresExecForeignDelete(), postgresExecForeignInsert(), postgresExecForeignUpdate(), and prepare_foreign_modify().

485 {
486  PGresult *last_res = NULL;
487 
488  for (;;)
489  {
490  PGresult *res;
491 
492  while (PQisBusy(conn))
493  {
494  int wc;
495 
496  /* Sleep until there's something to do */
499  PQsocket(conn),
500  -1L, PG_WAIT_EXTENSION);
502 
504 
505  /* Data available in socket */
506  if (wc & WL_SOCKET_READABLE)
507  {
508  if (!PQconsumeInput(conn))
509  pgfdw_report_error(ERROR, NULL, conn, false, query);
510  }
511  }
512 
513  res = PQgetResult(conn);
514  if (res == NULL)
515  break; /* query is complete */
516 
517  PQclear(last_res);
518  last_res = res;
519  }
520 
521  return last_res;
522 }
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
#define PG_WAIT_EXTENSION
Definition: pgstat.h:723
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1631
void PQclear(PGresult *res)
Definition: fe-exec.c:650
#define NULL
Definition: c.h:226
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1681
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:5977
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1702
void pgfdw_report_error ( int  elevel,
PGresult res,
PGconn conn,
bool  clear,
const char *  sql 
)

Definition at line 538 of file connection.c.

References ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, NULL, PG_CATCH, PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_RE_THROW, PG_TRY, PQclear(), PQerrorMessage(), and PQresultErrorField().

Referenced by close_cursor(), create_cursor(), do_sql_command(), execute_dml_stmt(), fetch_more_data(), get_remote_estimate(), pgfdw_exec_query(), pgfdw_get_result(), pgfdw_subxact_callback(), pgfdw_xact_callback(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndForeignModify(), postgresExecForeignDelete(), postgresExecForeignInsert(), postgresExecForeignUpdate(), postgresImportForeignSchema(), postgresReScanForeignScan(), and prepare_foreign_modify().

540 {
541  /* If requested, PGresult must be released before leaving this function. */
542  PG_TRY();
543  {
544  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
545  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
546  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
547  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
548  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
549  int sqlstate;
550 
551  if (diag_sqlstate)
552  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
553  diag_sqlstate[1],
554  diag_sqlstate[2],
555  diag_sqlstate[3],
556  diag_sqlstate[4]);
557  else
558  sqlstate = ERRCODE_CONNECTION_FAILURE;
559 
560  /*
561  * If we don't get a message from the PGresult, try the PGconn. This
562  * is needed because for connection-level failures, PQexec may just
563  * return NULL, not a PGresult at all.
564  */
565  if (message_primary == NULL)
566  message_primary = PQerrorMessage(conn);
567 
568  ereport(elevel,
569  (errcode(sqlstate),
570  message_primary ? errmsg_internal("%s", message_primary) :
571  errmsg("could not obtain message string for remote error"),
572  message_detail ? errdetail_internal("%s", message_detail) : 0,
573  message_hint ? errhint("%s", message_hint) : 0,
574  message_context ? errcontext("%s", message_context) : 0,
575  sql ? errcontext("Remote SQL command: %s", sql) : 0));
576  }
577  PG_CATCH();
578  {
579  if (clear)
580  PQclear(res);
581  PG_RE_THROW();
582  }
583  PG_END_TRY();
584  if (clear)
585  PQclear(res);
586 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:5959
int errhint(const char *fmt,...)
Definition: elog.c:987
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:54
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:55
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:62
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:53
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
#define ereport(elevel, rest)
Definition: elog.h:122
static int elevel
Definition: vacuumlazy.c:136
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:56
void PQclear(PGresult *res)
Definition: fe-exec.c:650
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define PG_CATCH()
Definition: elog.h:293
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2658
#define NULL
Definition: c.h:226
#define PG_RE_THROW()
Definition: elog.h:314
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define errcontext
Definition: elog.h:164
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:60
static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
)
static

Definition at line 752 of file connection.c.

References ConnCacheEntry::conn, do_sql_command(), elog, ereport, errcode(), errmsg(), ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, NULL, pgfdw_report_error(), PGRES_COMMAND_OK, PQcancel(), PQclear(), PQexec(), PQfreeCancel(), PQgetCancel(), PQresultStatus(), PQTRANS_ACTIVE, PQtransactionStatus(), snprintf(), SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, WARNING, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

754 {
755  HASH_SEQ_STATUS scan;
756  ConnCacheEntry *entry;
757  int curlevel;
758 
759  /* Nothing to do at subxact start, nor after commit. */
760  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
761  event == SUBXACT_EVENT_ABORT_SUB))
762  return;
763 
764  /* Quick exit if no connections were touched in this transaction. */
765  if (!xact_got_connection)
766  return;
767 
768  /*
769  * Scan all connection cache entries to find open remote subtransactions
770  * of the current level, and close them.
771  */
772  curlevel = GetCurrentTransactionNestLevel();
774  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
775  {
776  PGresult *res;
777  char sql[100];
778 
779  /*
780  * We only care about connections with open remote subtransactions of
781  * the current level.
782  */
783  if (entry->conn == NULL || entry->xact_depth < curlevel)
784  continue;
785 
786  if (entry->xact_depth > curlevel)
787  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
788  entry->xact_depth);
789 
790  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
791  {
792  /* Commit all remote subtransactions during pre-commit */
793  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
794  do_sql_command(entry->conn, sql);
795  }
796  else
797  {
798  /* Assume we might have lost track of prepared statements */
799  entry->have_error = true;
800 
801  /*
802  * If a command has been submitted to the remote server by using
803  * an asynchronous execution function, the command might not have
804  * yet completed. Check to see if a command is still being
805  * processed by the remote server, and if so, request cancellation
806  * of the command.
807  */
808  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
809  {
810  PGcancel *cancel;
811  char errbuf[256];
812 
813  if ((cancel = PQgetCancel(entry->conn)))
814  {
815  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
817  (errcode(ERRCODE_CONNECTION_FAILURE),
818  errmsg("could not send cancel request: %s",
819  errbuf)));
820  PQfreeCancel(cancel);
821  }
822  }
823 
824  /* Rollback all remote subtransactions during abort */
825  snprintf(sql, sizeof(sql),
826  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
827  curlevel, curlevel);
828  res = PQexec(entry->conn, sql);
829  if (PQresultStatus(res) != PGRES_COMMAND_OK)
830  pgfdw_report_error(WARNING, res, entry->conn, true, sql);
831  else
832  PQclear(res);
833  }
834 
835  /* OK, we're outta that level of subtransaction */
836  entry->xact_depth--;
837  }
838 }
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:3660
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:352
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:3637
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
#define ereport(elevel, rest)
Definition: elog.h:122
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:5914
#define WARNING
Definition: elog.h:40
static HTAB * ConnectionHash
Definition: connection.c:57
void PQclear(PGresult *res)
Definition: fe-exec.c:650
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:760
PGconn * conn
Definition: connection.c:47
#define NULL
Definition: c.h:226
static bool xact_got_connection
Definition: connection.c:64
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1353
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1343
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:3792
int errmsg(const char *fmt,...)
Definition: elog.c:797
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
#define elog
Definition: elog.h:219
static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
)
static

Definition at line 592 of file connection.c.

References ConnCacheEntry::conn, CONNECTION_OK, cursor_number, DEBUG3, do_sql_command(), elog, ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, NULL, pgfdw_report_error(), PGRES_COMMAND_OK, PQcancel(), PQclear(), PQexec(), PQfinish(), PQfreeCancel(), PQgetCancel(), PQresultStatus(), PQstatus(), PQTRANS_ACTIVE, PQTRANS_IDLE, PQtransactionStatus(), WARNING, ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

593 {
594  HASH_SEQ_STATUS scan;
595  ConnCacheEntry *entry;
596 
597  /* Quick exit if no connections were touched in this transaction. */
598  if (!xact_got_connection)
599  return;
600 
601  /*
602  * Scan all connection cache entries to find open remote transactions, and
603  * close them.
604  */
606  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
607  {
608  PGresult *res;
609 
610  /* Ignore cache entry if no open connection right now */
611  if (entry->conn == NULL)
612  continue;
613 
614  /* If it has an open remote transaction, try to close it */
615  if (entry->xact_depth > 0)
616  {
617  elog(DEBUG3, "closing remote transaction on connection %p",
618  entry->conn);
619 
620  switch (event)
621  {
624  /* Commit all remote transactions during pre-commit */
625  do_sql_command(entry->conn, "COMMIT TRANSACTION");
626 
627  /*
628  * If there were any errors in subtransactions, and we
629  * made prepared statements, do a DEALLOCATE ALL to make
630  * sure we get rid of all prepared statements. This is
631  * annoying and not terribly bulletproof, but it's
632  * probably not worth trying harder.
633  *
634  * DEALLOCATE ALL only exists in 8.3 and later, so this
635  * constrains how old a server postgres_fdw can
636  * communicate with. We intentionally ignore errors in
637  * the DEALLOCATE, so that we can hobble along to some
638  * extent with older servers (leaking prepared statements
639  * as we go; but we don't really support update operations
640  * pre-8.3 anyway).
641  */
642  if (entry->have_prep_stmt && entry->have_error)
643  {
644  res = PQexec(entry->conn, "DEALLOCATE ALL");
645  PQclear(res);
646  }
647  entry->have_prep_stmt = false;
648  entry->have_error = false;
649  break;
651 
652  /*
653  * We disallow remote transactions that modified anything,
654  * since it's not very reasonable to hold them open until
655  * the prepared transaction is committed. For the moment,
656  * throw error unconditionally; later we might allow
657  * read-only cases. Note that the error will cause us to
658  * come right back here with event == XACT_EVENT_ABORT, so
659  * we'll clean up the connection state at that point.
660  */
661  ereport(ERROR,
662  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
663  errmsg("cannot prepare a transaction that modified remote tables")));
664  break;
666  case XACT_EVENT_COMMIT:
667  case XACT_EVENT_PREPARE:
668  /* Pre-commit should have closed the open transaction */
669  elog(ERROR, "missed cleaning up connection during pre-commit");
670  break;
672  case XACT_EVENT_ABORT:
673  /* Assume we might have lost track of prepared statements */
674  entry->have_error = true;
675 
676  /*
677  * If a command has been submitted to the remote server by
678  * using an asynchronous execution function, the command
679  * might not have yet completed. Check to see if a
680  * command is still being processed by the remote server,
681  * and if so, request cancellation of the command.
682  */
683  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
684  {
685  PGcancel *cancel;
686  char errbuf[256];
687 
688  if ((cancel = PQgetCancel(entry->conn)))
689  {
690  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
692  (errcode(ERRCODE_CONNECTION_FAILURE),
693  errmsg("could not send cancel request: %s",
694  errbuf)));
695  PQfreeCancel(cancel);
696  }
697  }
698 
699  /* If we're aborting, abort all remote transactions too */
700  res = PQexec(entry->conn, "ABORT TRANSACTION");
701  /* Note: can't throw ERROR, it would be infinite loop */
702  if (PQresultStatus(res) != PGRES_COMMAND_OK)
703  pgfdw_report_error(WARNING, res, entry->conn, true,
704  "ABORT TRANSACTION");
705  else
706  {
707  PQclear(res);
708  /* As above, make sure to clear any prepared stmts */
709  if (entry->have_prep_stmt && entry->have_error)
710  {
711  res = PQexec(entry->conn, "DEALLOCATE ALL");
712  PQclear(res);
713  }
714  entry->have_prep_stmt = false;
715  entry->have_error = false;
716  }
717  break;
718  }
719  }
720 
721  /* Reset state to show we're out of a transaction */
722  entry->xact_depth = 0;
723 
724  /*
725  * If the connection isn't in a good idle state, discard it to
726  * recover. Next GetConnection will open a new connection.
727  */
728  if (PQstatus(entry->conn) != CONNECTION_OK ||
730  {
731  elog(DEBUG3, "discarding connection %p", entry->conn);
732  PQfinish(entry->conn);
733  entry->conn = NULL;
734  }
735  }
736 
737  /*
738  * Regardless of the event type, we can now mark ourselves as out of the
739  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
740  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
741  */
742  xact_got_connection = false;
743 
744  /* Also reset cursor numbering for next transaction */
745  cursor_number = 0;
746 }
#define DEBUG3
Definition: elog.h:23
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:3660
int errcode(int sqlerrcode)
Definition: elog.c:575
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3516
bool have_prep_stmt
Definition: connection.c:50
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:352
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:3637
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
static unsigned int cursor_number
Definition: connection.c:60
#define ereport(elevel, rest)
Definition: elog.h:122
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:5914
#define WARNING
Definition: elog.h:40
static HTAB * ConnectionHash
Definition: connection.c:57
void PQclear(PGresult *res)
Definition: fe-exec.c:650
PGconn * conn
Definition: connection.c:47
#define NULL
Definition: c.h:226
static bool xact_got_connection
Definition: connection.c:64
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1353
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1343
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:3792
int errmsg(const char *fmt,...)
Definition: elog.c:797
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
#define elog
Definition: elog.h:219
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:5906
void ReleaseConnection ( PGconn conn)

Definition at line 412 of file connection.c.

Referenced by estimate_path_cost_size(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndDirectModify(), postgresEndForeignModify(), postgresEndForeignScan(), and postgresImportForeignSchema().

413 {
414  /*
415  * Currently, we don't actually track connection references because all
416  * cleanup is managed on a transaction or subtransaction basis instead. So
417  * there's nothing to do here.
418  */
419 }

Variable Documentation

HTAB* ConnectionHash = NULL
static

Definition at line 57 of file connection.c.

unsigned int cursor_number = 0
static
unsigned int prep_stmt_number = 0
static

Definition at line 61 of file connection.c.

Referenced by GetPrepStmtNumber().

bool xact_got_connection = false
static

Definition at line 64 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().