PostgreSQL Source Code  git master
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
18 #include "commands/defrem.h"
19 #include "mb/pg_wchar.h"
20 #include "miscadmin.h"
21 #include "pgstat.h"
22 #include "postgres_fdw.h"
23 #include "storage/fd.h"
24 #include "storage/latch.h"
25 #include "utils/datetime.h"
26 #include "utils/hsearch.h"
27 #include "utils/inval.h"
28 #include "utils/memutils.h"
29 #include "utils/syscache.h"
30 
31 /*
32  * Connection cache hash table entry
33  *
34  * The lookup key in this hash table is the user mapping OID. We use just one
35  * connection per user mapping ID, which ensures that all the scans use the
36  * same snapshot during a query. Using the user mapping OID rather than
37  * the foreign server OID + user OID avoids creating multiple connections when
38  * the public user mapping applies to all user OIDs.
39  *
40  * The "conn" pointer can be NULL if we don't currently have a live connection.
41  * When we do have a connection, xact_depth tracks the current depth of
42  * transactions and subtransactions open on the remote side. We need to issue
43  * commands at the same nesting depth on the remote as we're executing at
44  * ourselves, so that rolling back a subtransaction will kill the right
45  * queries and not the wrong ones.
46  */
47 typedef Oid ConnCacheKey;
48 
49 typedef struct ConnCacheEntry
50 {
51  ConnCacheKey key; /* hash key (must be first) */
52  PGconn *conn; /* connection to foreign server, or NULL */
53  /* Remaining fields are invalid when conn is NULL: */
54  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
55  * one level of subxact open, etc */
56  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
57  bool have_error; /* have any subxacts aborted in this xact? */
58  bool changing_xact_state; /* xact state change in process */
59  bool invalidated; /* true if reconnect is pending */
60  uint32 server_hashvalue; /* hash value of foreign server OID */
61  uint32 mapping_hashvalue; /* hash value of user mapping OID */
63 
64 /*
65  * Connection cache (initialized on first use)
66  */
67 static HTAB *ConnectionHash = NULL;
68 
69 /* for assigning cursor numbers and prepared statement numbers */
70 static unsigned int cursor_number = 0;
71 static unsigned int prep_stmt_number = 0;
72 
73 /* tracks whether any work is needed in callback functions */
74 static bool xact_got_connection = false;
75 
76 /* prototypes of private functions */
78 static void disconnect_pg_server(ConnCacheEntry *entry);
79 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
81 static void do_sql_command(PGconn *conn, const char *sql);
82 static void begin_remote_xact(ConnCacheEntry *entry);
83 static void pgfdw_xact_callback(XactEvent event, void *arg);
84 static void pgfdw_subxact_callback(SubXactEvent event,
85  SubTransactionId mySubid,
86  SubTransactionId parentSubid,
87  void *arg);
88 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
90 static bool pgfdw_cancel_query(PGconn *conn);
91 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
92  bool ignore_errors);
93 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
94  PGresult **result);
96 
97 /*
98  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
99  * server with the user's authorization. A new connection is established
100  * if we don't already have a suitable one, and a transaction is opened at
101  * the right subtransaction nesting depth if we didn't do that already.
102  *
103  * will_prep_stmt must be true if caller intends to create any prepared
104  * statements. Since those don't go away automatically at transaction end
105  * (not even on error), we need this flag to cue manual cleanup.
106  */
107 PGconn *
108 GetConnection(UserMapping *user, bool will_prep_stmt)
109 {
110  bool found;
111  ConnCacheEntry *entry;
113 
114  /* First time through, initialize connection cache hashtable */
115  if (ConnectionHash == NULL)
116  {
117  HASHCTL ctl;
118 
119  MemSet(&ctl, 0, sizeof(ctl));
120  ctl.keysize = sizeof(ConnCacheKey);
121  ctl.entrysize = sizeof(ConnCacheEntry);
122  /* allocate ConnectionHash in the cache context */
123  ctl.hcxt = CacheMemoryContext;
124  ConnectionHash = hash_create("postgres_fdw connections", 8,
125  &ctl,
127 
128  /*
129  * Register some callback functions that manage connection cleanup.
130  * This should be done just once in each backend.
131  */
138  }
139 
140  /* Set flag that we did GetConnection during the current transaction */
141  xact_got_connection = true;
142 
143  /* Create hash key for the entry. Assume no pad bytes in key struct */
144  key = user->umid;
145 
146  /*
147  * Find or create cached entry for requested connection.
148  */
149  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
150  if (!found)
151  {
152  /*
153  * We need only clear "conn" here; remaining fields will be filled
154  * later when "conn" is set.
155  */
156  entry->conn = NULL;
157  }
158 
159  /* Reject further use of connections which failed abort cleanup. */
161 
162  /*
163  * If the connection needs to be remade due to invalidation, disconnect as
164  * soon as we're out of all transactions.
165  */
166  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
167  {
168  elog(DEBUG3, "closing connection %p for option changes to take effect",
169  entry->conn);
170  disconnect_pg_server(entry);
171  }
172 
173  /*
174  * We don't check the health of cached connection here, because it would
175  * require some overhead. Broken connection will be detected when the
176  * connection is actually used.
177  */
178 
179  /*
180  * If cache entry doesn't have a connection, we have to establish a new
181  * connection. (If connect_pg_server throws an error, the cache entry
182  * will remain in a valid empty state, ie conn == NULL.)
183  */
184  if (entry->conn == NULL)
185  {
186  ForeignServer *server = GetForeignServer(user->serverid);
187 
188  /* Reset all transient state fields, to be sure all are clean */
189  entry->xact_depth = 0;
190  entry->have_prep_stmt = false;
191  entry->have_error = false;
192  entry->changing_xact_state = false;
193  entry->invalidated = false;
194  entry->server_hashvalue =
196  ObjectIdGetDatum(server->serverid));
197  entry->mapping_hashvalue =
199  ObjectIdGetDatum(user->umid));
200 
201  /* Now try to make the connection */
202  entry->conn = connect_pg_server(server, user);
203 
204  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
205  entry->conn, server->servername, user->umid, user->userid);
206  }
207 
208  /*
209  * Start a new transaction or subtransaction if needed.
210  */
211  begin_remote_xact(entry);
212 
213  /* Remember if caller will prepare statements */
214  entry->have_prep_stmt |= will_prep_stmt;
215 
216  return entry->conn;
217 }
218 
219 /*
220  * Connect to remote server using specified server and user mapping properties.
221  */
222 static PGconn *
224 {
225  PGconn *volatile conn = NULL;
226 
227  /*
228  * Use PG_TRY block to ensure closing connection on error.
229  */
230  PG_TRY();
231  {
232  const char **keywords;
233  const char **values;
234  int n;
235 
236  /*
237  * Construct connection params from generic options of ForeignServer
238  * and UserMapping. (Some of them might not be libpq options, in
239  * which case we'll just waste a few array slots.) Add 3 extra slots
240  * for fallback_application_name, client_encoding, end marker.
241  */
242  n = list_length(server->options) + list_length(user->options) + 3;
243  keywords = (const char **) palloc(n * sizeof(char *));
244  values = (const char **) palloc(n * sizeof(char *));
245 
246  n = 0;
247  n += ExtractConnectionOptions(server->options,
248  keywords + n, values + n);
250  keywords + n, values + n);
251 
252  /* Use "postgres_fdw" as fallback_application_name. */
253  keywords[n] = "fallback_application_name";
254  values[n] = "postgres_fdw";
255  n++;
256 
257  /* Set client_encoding so that libpq can convert encoding properly. */
258  keywords[n] = "client_encoding";
259  values[n] = GetDatabaseEncodingName();
260  n++;
261 
262  keywords[n] = values[n] = NULL;
263 
264  /* verify the set of connection parameters */
265  check_conn_params(keywords, values, user);
266 
267  /*
268  * We must obey fd.c's limit on non-virtual file descriptors. Assume
269  * that a PGconn represents one long-lived FD. (Doing this here also
270  * ensures that VFDs are closed if needed to make room.)
271  */
272  if (!AcquireExternalFD())
273  {
274 #ifndef WIN32 /* can't write #if within ereport() macro */
275  ereport(ERROR,
276  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
277  errmsg("could not connect to server \"%s\"",
278  server->servername),
279  errdetail("There are too many open files on the local server."),
280  errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
281 #else
282  ereport(ERROR,
283  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
284  errmsg("could not connect to server \"%s\"",
285  server->servername),
286  errdetail("There are too many open files on the local server."),
287  errhint("Raise the server's max_files_per_process setting.")));
288 #endif
289  }
290 
291  /* OK to make connection */
292  conn = PQconnectdbParams(keywords, values, false);
293 
294  if (!conn)
295  ReleaseExternalFD(); /* because the PG_CATCH block won't */
296 
297  if (!conn || PQstatus(conn) != CONNECTION_OK)
298  ereport(ERROR,
299  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
300  errmsg("could not connect to server \"%s\"",
301  server->servername),
302  errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
303 
304  /*
305  * Check that non-superuser has used password to establish connection;
306  * otherwise, he's piggybacking on the postgres server's user
307  * identity. See also dblink_security_check() in contrib/dblink and
308  * check_conn_params.
309  */
310  if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
312  ereport(ERROR,
313  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
314  errmsg("password is required"),
315  errdetail("Non-superuser cannot connect if the server does not request a password."),
316  errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
317 
318  /* Prepare new session for use */
320 
321  pfree(keywords);
322  pfree(values);
323  }
324  PG_CATCH();
325  {
326  /* Release PGconn data structure if we managed to create one */
327  if (conn)
328  {
329  PQfinish(conn);
331  }
332  PG_RE_THROW();
333  }
334  PG_END_TRY();
335 
336  return conn;
337 }
338 
339 /*
340  * Disconnect any open connection for a connection cache entry.
341  */
342 static void
344 {
345  if (entry->conn != NULL)
346  {
347  PQfinish(entry->conn);
348  entry->conn = NULL;
350  }
351 }
352 
353 /*
354  * Return true if the password_required is defined and false for this user
355  * mapping, otherwise false. The mapping has been pre-validated.
356  */
357 static bool
359 {
360  ListCell *cell;
361 
362  foreach(cell, user->options)
363  {
364  DefElem *def = (DefElem *) lfirst(cell);
365 
366  if (strcmp(def->defname, "password_required") == 0)
367  return defGetBoolean(def);
368  }
369 
370  return true;
371 }
372 
373 /*
374  * For non-superusers, insist that the connstr specify a password. This
375  * prevents a password from being picked up from .pgpass, a service file, the
376  * environment, etc. We don't want the postgres user's passwords,
377  * certificates, etc to be accessible to non-superusers. (See also
378  * dblink_connstr_check in contrib/dblink.)
379  */
380 static void
381 check_conn_params(const char **keywords, const char **values, UserMapping *user)
382 {
383  int i;
384 
385  /* no check required if superuser */
386  if (superuser_arg(user->userid))
387  return;
388 
389  /* ok if params contain a non-empty password */
390  for (i = 0; keywords[i] != NULL; i++)
391  {
392  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
393  return;
394  }
395 
396  /* ok if the superuser explicitly said so at user mapping creation time */
397  if (!UserMappingPasswordRequired(user))
398  return;
399 
400  ereport(ERROR,
401  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
402  errmsg("password is required"),
403  errdetail("Non-superusers must provide a password in the user mapping.")));
404 }
405 
406 /*
407  * Issue SET commands to make sure remote session is configured properly.
408  *
409  * We do this just once at connection, assuming nothing will change the
410  * values later. Since we'll never send volatile function calls to the
411  * remote, there shouldn't be any way to break this assumption from our end.
412  * It's possible to think of ways to break it at the remote end, eg making
413  * a foreign table point to a view that includes a set_config call ---
414  * but once you admit the possibility of a malicious view definition,
415  * there are any number of ways to break things.
416  */
417 static void
419 {
420  int remoteversion = PQserverVersion(conn);
421 
422  /* Force the search path to contain only pg_catalog (see deparse.c) */
423  do_sql_command(conn, "SET search_path = pg_catalog");
424 
425  /*
426  * Set remote timezone; this is basically just cosmetic, since all
427  * transmitted and returned timestamptzs should specify a zone explicitly
428  * anyway. However it makes the regression test outputs more predictable.
429  *
430  * We don't risk setting remote zone equal to ours, since the remote
431  * server might use a different timezone database. Instead, use UTC
432  * (quoted, because very old servers are picky about case).
433  */
434  do_sql_command(conn, "SET timezone = 'UTC'");
435 
436  /*
437  * Set values needed to ensure unambiguous data output from remote. (This
438  * logic should match what pg_dump does. See also set_transmission_modes
439  * in postgres_fdw.c.)
440  */
441  do_sql_command(conn, "SET datestyle = ISO");
442  if (remoteversion >= 80400)
443  do_sql_command(conn, "SET intervalstyle = postgres");
444  if (remoteversion >= 90000)
445  do_sql_command(conn, "SET extra_float_digits = 3");
446  else
447  do_sql_command(conn, "SET extra_float_digits = 2");
448 }
449 
450 /*
451  * Convenience subroutine to issue a non-data-returning SQL command to remote
452  */
453 static void
454 do_sql_command(PGconn *conn, const char *sql)
455 {
456  PGresult *res;
457 
458  if (!PQsendQuery(conn, sql))
459  pgfdw_report_error(ERROR, NULL, conn, false, sql);
460  res = pgfdw_get_result(conn, sql);
461  if (PQresultStatus(res) != PGRES_COMMAND_OK)
462  pgfdw_report_error(ERROR, res, conn, true, sql);
463  PQclear(res);
464 }
465 
466 /*
467  * Start remote transaction or subtransaction, if needed.
468  *
469  * Note that we always use at least REPEATABLE READ in the remote session.
470  * This is so that, if a query initiates multiple scans of the same or
471  * different foreign tables, we will get snapshot-consistent results from
472  * those scans. A disadvantage is that we can't provide sane emulation of
473  * READ COMMITTED behavior --- it would be nice if we had some other way to
474  * control which remote queries share a snapshot.
475  */
476 static void
478 {
479  int curlevel = GetCurrentTransactionNestLevel();
480 
481  /* Start main transaction if we haven't yet */
482  if (entry->xact_depth <= 0)
483  {
484  const char *sql;
485 
486  elog(DEBUG3, "starting remote transaction on connection %p",
487  entry->conn);
488 
490  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
491  else
492  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
493  entry->changing_xact_state = true;
494  do_sql_command(entry->conn, sql);
495  entry->xact_depth = 1;
496  entry->changing_xact_state = false;
497  }
498 
499  /*
500  * If we're in a subtransaction, stack up savepoints to match our level.
501  * This ensures we can rollback just the desired effects when a
502  * subtransaction aborts.
503  */
504  while (entry->xact_depth < curlevel)
505  {
506  char sql[64];
507 
508  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
509  entry->changing_xact_state = true;
510  do_sql_command(entry->conn, sql);
511  entry->xact_depth++;
512  entry->changing_xact_state = false;
513  }
514 }
515 
516 /*
517  * Release connection reference count created by calling GetConnection.
518  */
519 void
521 {
522  /*
523  * Currently, we don't actually track connection references because all
524  * cleanup is managed on a transaction or subtransaction basis instead. So
525  * there's nothing to do here.
526  */
527 }
528 
529 /*
530  * Assign a "unique" number for a cursor.
531  *
532  * These really only need to be unique per connection within a transaction.
533  * For the moment we ignore the per-connection point and assign them across
534  * all connections in the transaction, but we ask for the connection to be
535  * supplied in case we want to refine that.
536  *
537  * Note that even if wraparound happens in a very long transaction, actual
538  * collisions are highly improbable; just be sure to use %u not %d to print.
539  */
540 unsigned int
542 {
543  return ++cursor_number;
544 }
545 
546 /*
547  * Assign a "unique" number for a prepared statement.
548  *
549  * This works much like GetCursorNumber, except that we never reset the counter
550  * within a session. That's because we can't be 100% sure we've gotten rid
551  * of all prepared statements on all connections, and it's not really worth
552  * increasing the risk of prepared-statement name collisions by resetting.
553  */
554 unsigned int
556 {
557  return ++prep_stmt_number;
558 }
559 
560 /*
561  * Submit a query and wait for the result.
562  *
563  * This function is interruptible by signals.
564  *
565  * Caller is responsible for the error handling on the result.
566  */
567 PGresult *
568 pgfdw_exec_query(PGconn *conn, const char *query)
569 {
570  /*
571  * Submit a query. Since we don't use non-blocking mode, this also can
572  * block. But its risk is relatively small, so we ignore that for now.
573  */
574  if (!PQsendQuery(conn, query))
575  pgfdw_report_error(ERROR, NULL, conn, false, query);
576 
577  /* Wait for the result. */
578  return pgfdw_get_result(conn, query);
579 }
580 
581 /*
582  * Wait for the result from a prior asynchronous execution function call.
583  *
584  * This function offers quick responsiveness by checking for any interruptions.
585  *
586  * This function emulates PQexec()'s behavior of returning the last result
587  * when there are many.
588  *
589  * Caller is responsible for the error handling on the result.
590  */
591 PGresult *
592 pgfdw_get_result(PGconn *conn, const char *query)
593 {
594  PGresult *volatile last_res = NULL;
595 
596  /* In what follows, do not leak any PGresults on an error. */
597  PG_TRY();
598  {
599  for (;;)
600  {
601  PGresult *res;
602 
603  while (PQisBusy(conn))
604  {
605  int wc;
606 
607  /* Sleep until there's something to do */
611  PQsocket(conn),
612  -1L, PG_WAIT_EXTENSION);
614 
616 
617  /* Data available in socket? */
618  if (wc & WL_SOCKET_READABLE)
619  {
620  if (!PQconsumeInput(conn))
621  pgfdw_report_error(ERROR, NULL, conn, false, query);
622  }
623  }
624 
625  res = PQgetResult(conn);
626  if (res == NULL)
627  break; /* query is complete */
628 
629  PQclear(last_res);
630  last_res = res;
631  }
632  }
633  PG_CATCH();
634  {
635  PQclear(last_res);
636  PG_RE_THROW();
637  }
638  PG_END_TRY();
639 
640  return last_res;
641 }
642 
643 /*
644  * Report an error we got from the remote server.
645  *
646  * elevel: error level to use (typically ERROR, but might be less)
647  * res: PGresult containing the error
648  * conn: connection we did the query on
649  * clear: if true, PQclear the result (otherwise caller will handle it)
650  * sql: NULL, or text of remote command we tried to execute
651  *
652  * Note: callers that choose not to throw ERROR for a remote error are
653  * responsible for making sure that the associated ConnCacheEntry gets
654  * marked with have_error = true.
655  */
656 void
658  bool clear, const char *sql)
659 {
660  /* If requested, PGresult must be released before leaving this function. */
661  PG_TRY();
662  {
663  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
664  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
665  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
666  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
667  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
668  int sqlstate;
669 
670  if (diag_sqlstate)
671  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
672  diag_sqlstate[1],
673  diag_sqlstate[2],
674  diag_sqlstate[3],
675  diag_sqlstate[4]);
676  else
677  sqlstate = ERRCODE_CONNECTION_FAILURE;
678 
679  /*
680  * If we don't get a message from the PGresult, try the PGconn. This
681  * is needed because for connection-level failures, PQexec may just
682  * return NULL, not a PGresult at all.
683  */
684  if (message_primary == NULL)
685  message_primary = pchomp(PQerrorMessage(conn));
686 
687  ereport(elevel,
688  (errcode(sqlstate),
689  message_primary ? errmsg_internal("%s", message_primary) :
690  errmsg("could not obtain message string for remote error"),
691  message_detail ? errdetail_internal("%s", message_detail) : 0,
692  message_hint ? errhint("%s", message_hint) : 0,
693  message_context ? errcontext("%s", message_context) : 0,
694  sql ? errcontext("remote SQL command: %s", sql) : 0));
695  }
696  PG_FINALLY();
697  {
698  if (clear)
699  PQclear(res);
700  }
701  PG_END_TRY();
702 }
703 
704 /*
705  * pgfdw_xact_callback --- cleanup at main-transaction end.
706  */
707 static void
709 {
710  HASH_SEQ_STATUS scan;
711  ConnCacheEntry *entry;
712 
713  /* Quick exit if no connections were touched in this transaction. */
714  if (!xact_got_connection)
715  return;
716 
717  /*
718  * Scan all connection cache entries to find open remote transactions, and
719  * close them.
720  */
721  hash_seq_init(&scan, ConnectionHash);
722  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
723  {
724  PGresult *res;
725 
726  /* Ignore cache entry if no open connection right now */
727  if (entry->conn == NULL)
728  continue;
729 
730  /* If it has an open remote transaction, try to close it */
731  if (entry->xact_depth > 0)
732  {
733  bool abort_cleanup_failure = false;
734 
735  elog(DEBUG3, "closing remote transaction on connection %p",
736  entry->conn);
737 
738  switch (event)
739  {
742 
743  /*
744  * If abort cleanup previously failed for this connection,
745  * we can't issue any more commands against it.
746  */
748 
749  /* Commit all remote transactions during pre-commit */
750  entry->changing_xact_state = true;
751  do_sql_command(entry->conn, "COMMIT TRANSACTION");
752  entry->changing_xact_state = false;
753 
754  /*
755  * If there were any errors in subtransactions, and we
756  * made prepared statements, do a DEALLOCATE ALL to make
757  * sure we get rid of all prepared statements. This is
758  * annoying and not terribly bulletproof, but it's
759  * probably not worth trying harder.
760  *
761  * DEALLOCATE ALL only exists in 8.3 and later, so this
762  * constrains how old a server postgres_fdw can
763  * communicate with. We intentionally ignore errors in
764  * the DEALLOCATE, so that we can hobble along to some
765  * extent with older servers (leaking prepared statements
766  * as we go; but we don't really support update operations
767  * pre-8.3 anyway).
768  */
769  if (entry->have_prep_stmt && entry->have_error)
770  {
771  res = PQexec(entry->conn, "DEALLOCATE ALL");
772  PQclear(res);
773  }
774  entry->have_prep_stmt = false;
775  entry->have_error = false;
776  break;
778 
779  /*
780  * We disallow any remote transactions, since it's not
781  * very reasonable to hold them open until the prepared
782  * transaction is committed. For the moment, throw error
783  * unconditionally; later we might allow read-only cases.
784  * Note that the error will cause us to come right back
785  * here with event == XACT_EVENT_ABORT, so we'll clean up
786  * the connection state at that point.
787  */
788  ereport(ERROR,
789  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
790  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
791  break;
793  case XACT_EVENT_COMMIT:
794  case XACT_EVENT_PREPARE:
795  /* Pre-commit should have closed the open transaction */
796  elog(ERROR, "missed cleaning up connection during pre-commit");
797  break;
799  case XACT_EVENT_ABORT:
800 
801  /*
802  * Don't try to clean up the connection if we're already
803  * in error recursion trouble.
804  */
806  entry->changing_xact_state = true;
807 
808  /*
809  * If connection is already unsalvageable, don't touch it
810  * further.
811  */
812  if (entry->changing_xact_state)
813  break;
814 
815  /*
816  * Mark this connection as in the process of changing
817  * transaction state.
818  */
819  entry->changing_xact_state = true;
820 
821  /* Assume we might have lost track of prepared statements */
822  entry->have_error = true;
823 
824  /*
825  * If a command has been submitted to the remote server by
826  * using an asynchronous execution function, the command
827  * might not have yet completed. Check to see if a
828  * command is still being processed by the remote server,
829  * and if so, request cancellation of the command.
830  */
831  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
832  !pgfdw_cancel_query(entry->conn))
833  {
834  /* Unable to cancel running query. */
835  abort_cleanup_failure = true;
836  }
837  else if (!pgfdw_exec_cleanup_query(entry->conn,
838  "ABORT TRANSACTION",
839  false))
840  {
841  /* Unable to abort remote transaction. */
842  abort_cleanup_failure = true;
843  }
844  else if (entry->have_prep_stmt && entry->have_error &&
846  "DEALLOCATE ALL",
847  true))
848  {
849  /* Trouble clearing prepared statements. */
850  abort_cleanup_failure = true;
851  }
852  else
853  {
854  entry->have_prep_stmt = false;
855  entry->have_error = false;
856  }
857 
858  /* Disarm changing_xact_state if it all worked. */
859  entry->changing_xact_state = abort_cleanup_failure;
860  break;
861  }
862  }
863 
864  /* Reset state to show we're out of a transaction */
865  entry->xact_depth = 0;
866 
867  /*
868  * If the connection isn't in a good idle state, discard it to
869  * recover. Next GetConnection will open a new connection.
870  */
871  if (PQstatus(entry->conn) != CONNECTION_OK ||
873  entry->changing_xact_state)
874  {
875  elog(DEBUG3, "discarding connection %p", entry->conn);
876  disconnect_pg_server(entry);
877  }
878  }
879 
880  /*
881  * Regardless of the event type, we can now mark ourselves as out of the
882  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
883  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
884  */
885  xact_got_connection = false;
886 
887  /* Also reset cursor numbering for next transaction */
888  cursor_number = 0;
889 }
890 
891 /*
892  * pgfdw_subxact_callback --- cleanup at subtransaction end.
893  */
894 static void
896  SubTransactionId parentSubid, void *arg)
897 {
898  HASH_SEQ_STATUS scan;
899  ConnCacheEntry *entry;
900  int curlevel;
901 
902  /* Nothing to do at subxact start, nor after commit. */
903  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
904  event == SUBXACT_EVENT_ABORT_SUB))
905  return;
906 
907  /* Quick exit if no connections were touched in this transaction. */
908  if (!xact_got_connection)
909  return;
910 
911  /*
912  * Scan all connection cache entries to find open remote subtransactions
913  * of the current level, and close them.
914  */
915  curlevel = GetCurrentTransactionNestLevel();
916  hash_seq_init(&scan, ConnectionHash);
917  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
918  {
919  char sql[100];
920 
921  /*
922  * We only care about connections with open remote subtransactions of
923  * the current level.
924  */
925  if (entry->conn == NULL || entry->xact_depth < curlevel)
926  continue;
927 
928  if (entry->xact_depth > curlevel)
929  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
930  entry->xact_depth);
931 
932  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
933  {
934  /*
935  * If abort cleanup previously failed for this connection, we
936  * can't issue any more commands against it.
937  */
939 
940  /* Commit all remote subtransactions during pre-commit */
941  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
942  entry->changing_xact_state = true;
943  do_sql_command(entry->conn, sql);
944  entry->changing_xact_state = false;
945  }
946  else if (in_error_recursion_trouble())
947  {
948  /*
949  * Don't try to clean up the connection if we're already in error
950  * recursion trouble.
951  */
952  entry->changing_xact_state = true;
953  }
954  else if (!entry->changing_xact_state)
955  {
956  bool abort_cleanup_failure = false;
957 
958  /* Remember that abort cleanup is in progress. */
959  entry->changing_xact_state = true;
960 
961  /* Assume we might have lost track of prepared statements */
962  entry->have_error = true;
963 
964  /*
965  * If a command has been submitted to the remote server by using
966  * an asynchronous execution function, the command might not have
967  * yet completed. Check to see if a command is still being
968  * processed by the remote server, and if so, request cancellation
969  * of the command.
970  */
971  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
972  !pgfdw_cancel_query(entry->conn))
973  abort_cleanup_failure = true;
974  else
975  {
976  /* Rollback all remote subtransactions during abort */
977  snprintf(sql, sizeof(sql),
978  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
979  curlevel, curlevel);
980  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
981  abort_cleanup_failure = true;
982  }
983 
984  /* Disarm changing_xact_state if it all worked. */
985  entry->changing_xact_state = abort_cleanup_failure;
986  }
987 
988  /* OK, we're outta that level of subtransaction */
989  entry->xact_depth--;
990  }
991 }
992 
993 /*
994  * Connection invalidation callback function
995  *
996  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
997  * mark connections depending on that entry as needing to be remade.
998  * We can't immediately destroy them, since they might be in the midst of
999  * a transaction, but we'll remake them at the next opportunity.
1000  *
1001  * Although most cache invalidation callbacks blow away all the related stuff
1002  * regardless of the given hashvalue, connections are expensive enough that
1003  * it's worth trying to avoid that.
1004  *
1005  * NB: We could avoid unnecessary disconnection more strictly by examining
1006  * individual option values, but it seems too much effort for the gain.
1007  */
1008 static void
1009 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1010 {
1011  HASH_SEQ_STATUS scan;
1012  ConnCacheEntry *entry;
1013 
1014  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1015 
1016  /* ConnectionHash must exist already, if we're registered */
1017  hash_seq_init(&scan, ConnectionHash);
1018  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1019  {
1020  /* Ignore invalid entries */
1021  if (entry->conn == NULL)
1022  continue;
1023 
1024  /* hashvalue == 0 means a cache reset, must clear all state */
1025  if (hashvalue == 0 ||
1026  (cacheid == FOREIGNSERVEROID &&
1027  entry->server_hashvalue == hashvalue) ||
1028  (cacheid == USERMAPPINGOID &&
1029  entry->mapping_hashvalue == hashvalue))
1030  entry->invalidated = true;
1031  }
1032 }
1033 
1034 /*
1035  * Raise an error if the given connection cache entry is marked as being
1036  * in the middle of an xact state change. This should be called at which no
1037  * such change is expected to be in progress; if one is found to be in
1038  * progress, it means that we aborted in the middle of a previous state change
1039  * and now don't know what the remote transaction state actually is.
1040  * Such connections can't safely be further used. Re-establishing the
1041  * connection would change the snapshot and roll back any writes already
1042  * performed, so that's not an option, either. Thus, we must abort.
1043  */
1044 static void
1046 {
1047  HeapTuple tup;
1048  Form_pg_user_mapping umform;
1049  ForeignServer *server;
1050 
1051  /* nothing to do for inactive entries and entries of sane state */
1052  if (entry->conn == NULL || !entry->changing_xact_state)
1053  return;
1054 
1055  /* make sure this entry is inactive */
1056  disconnect_pg_server(entry);
1057 
1058  /* find server name to be shown in the message below */
1060  ObjectIdGetDatum(entry->key));
1061  if (!HeapTupleIsValid(tup))
1062  elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
1063  umform = (Form_pg_user_mapping) GETSTRUCT(tup);
1064  server = GetForeignServer(umform->umserver);
1065  ReleaseSysCache(tup);
1066 
1067  ereport(ERROR,
1068  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1069  errmsg("connection to server \"%s\" was lost",
1070  server->servername)));
1071 }
1072 
1073 /*
1074  * Cancel the currently-in-progress query (whose query text we do not have)
1075  * and ignore the result. Returns true if we successfully cancel the query
1076  * and discard any pending result, and false if not.
1077  */
1078 static bool
1080 {
1081  PGcancel *cancel;
1082  char errbuf[256];
1083  PGresult *result = NULL;
1084  TimestampTz endtime;
1085 
1086  /*
1087  * If it takes too long to cancel the query and discard the result, assume
1088  * the connection is dead.
1089  */
1091 
1092  /*
1093  * Issue cancel request. Unfortunately, there's no good way to limit the
1094  * amount of time that we might block inside PQgetCancel().
1095  */
1096  if ((cancel = PQgetCancel(conn)))
1097  {
1098  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1099  {
1100  ereport(WARNING,
1101  (errcode(ERRCODE_CONNECTION_FAILURE),
1102  errmsg("could not send cancel request: %s",
1103  errbuf)));
1104  PQfreeCancel(cancel);
1105  return false;
1106  }
1107  PQfreeCancel(cancel);
1108  }
1109 
1110  /* Get and discard the result of the query. */
1111  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1112  return false;
1113  PQclear(result);
1114 
1115  return true;
1116 }
1117 
1118 /*
1119  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1120  * result. If the query is executed without error, the return value is true.
1121  * If the query is executed successfully but returns an error, the return
1122  * value is true if and only if ignore_errors is set. If the query can't be
1123  * sent or times out, the return value is false.
1124  */
1125 static bool
1126 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1127 {
1128  PGresult *result = NULL;
1129  TimestampTz endtime;
1130 
1131  /*
1132  * If it takes too long to execute a cleanup query, assume the connection
1133  * is dead. It's fairly likely that this is why we aborted in the first
1134  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1135  * be too long.
1136  */
1138 
1139  /*
1140  * Submit a query. Since we don't use non-blocking mode, this also can
1141  * block. But its risk is relatively small, so we ignore that for now.
1142  */
1143  if (!PQsendQuery(conn, query))
1144  {
1145  pgfdw_report_error(WARNING, NULL, conn, false, query);
1146  return false;
1147  }
1148 
1149  /* Get the result of the query. */
1150  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1151  return false;
1152 
1153  /* Issue a warning if not successful. */
1154  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1155  {
1156  pgfdw_report_error(WARNING, result, conn, true, query);
1157  return ignore_errors;
1158  }
1159  PQclear(result);
1160 
1161  return true;
1162 }
1163 
1164 /*
1165  * Get, during abort cleanup, the result of a query that is in progress. This
1166  * might be a query that is being interrupted by transaction abort, or it might
1167  * be a query that was initiated as part of transaction abort to get the remote
1168  * side back to the appropriate state.
1169  *
1170  * It's not a huge problem if we throw an ERROR here, but if we get into error
1171  * recursion trouble, we'll end up slamming the connection shut, which will
1172  * necessitate failing the entire toplevel transaction even if subtransactions
1173  * were used. Try to use WARNING where we can.
1174  *
1175  * endtime is the time at which we should give up and assume the remote
1176  * side is dead. Returns true if the timeout expired, otherwise false.
1177  * Sets *result except in case of a timeout.
1178  */
1179 static bool
1181 {
1182  volatile bool timed_out = false;
1183  PGresult *volatile last_res = NULL;
1184 
1185  /* In what follows, do not leak any PGresults on an error. */
1186  PG_TRY();
1187  {
1188  for (;;)
1189  {
1190  PGresult *res;
1191 
1192  while (PQisBusy(conn))
1193  {
1194  int wc;
1196  long secs;
1197  int microsecs;
1198  long cur_timeout;
1199 
1200  /* If timeout has expired, give up, else get sleep time. */
1201  if (now >= endtime)
1202  {
1203  timed_out = true;
1204  goto exit;
1205  }
1206  TimestampDifference(now, endtime, &secs, &microsecs);
1207 
1208  /* To protect against clock skew, limit sleep to one minute. */
1209  cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
1210 
1211  /* Sleep until there's something to do */
1215  PQsocket(conn),
1216  cur_timeout, PG_WAIT_EXTENSION);
1218 
1220 
1221  /* Data available in socket? */
1222  if (wc & WL_SOCKET_READABLE)
1223  {
1224  if (!PQconsumeInput(conn))
1225  {
1226  /* connection trouble; treat the same as a timeout */
1227  timed_out = true;
1228  goto exit;
1229  }
1230  }
1231  }
1232 
1233  res = PQgetResult(conn);
1234  if (res == NULL)
1235  break; /* query is complete */
1236 
1237  PQclear(last_res);
1238  last_res = res;
1239  }
1240 exit: ;
1241  }
1242  PG_CATCH();
1243  {
1244  PQclear(last_res);
1245  PG_RE_THROW();
1246  }
1247  PG_END_TRY();
1248 
1249  if (timed_out)
1250  PQclear(last_res);
1251  else
1252  *result = last_res;
1253  return timed_out;
1254 }
Oid umid
Definition: foreign.h:47
XactEvent
Definition: xact.h:112
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6666
static void configure_remote_session(PGconn *conn)
Definition: connection.c:418
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
#define WL_TIMEOUT
Definition: latch.h:127
MemoryContext hcxt
Definition: hsearch.h:77
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1180
#define DEBUG3
Definition: elog.h:23
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
#define USECS_PER_SEC
Definition: timestamp.h:94
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
struct ConnCacheEntry ConnCacheEntry
int64 TimestampTz
Definition: timestamp.h:39
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:62
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4315
ConnCacheKey key
Definition: connection.c:51
#define Min(x, y)
Definition: c.h:927
Size entrysize
Definition: hsearch.h:72
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:381
int errcode(int sqlerrcode)
Definition: elog.c:610
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4171
#define MemSet(start, val, len)
Definition: c.h:949
#define WL_SOCKET_READABLE
Definition: latch.h:125
bool have_prep_stmt
Definition: connection.c:56
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:201
uint32 server_hashvalue
Definition: connection.c:60
uint32 SubTransactionId
Definition: c.h:524
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6656
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
Oid userid
Definition: foreign.h:48
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
void ResetLatch(Latch *latch)
Definition: latch.c:588
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:647
int errdetail_internal(const char *fmt,...)
Definition: elog.c:984
void ReleaseConnection(PGconn *conn)
Definition: connection.c:520
char * pchomp(const char *in)
Definition: mcxt.c:1215
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1234
Definition: dynahash.c:218
bool defGetBoolean(DefElem *def)
Definition: define.c:111
void pfree(void *pointer)
Definition: mcxt.c:1057
static unsigned int prep_stmt_number
Definition: connection.c:71
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:333
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
bool changing_xact_state
Definition: connection.c:58
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:454
FormData_pg_user_mapping * Form_pg_user_mapping
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4292
int errdetail(const char *fmt,...)
Definition: elog.c:957
List * options
Definition: foreign.h:50
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
unsigned int uint32
Definition: c.h:374
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:657
static unsigned int cursor_number
Definition: connection.c:70
bool invalidated
Definition: connection.c:59
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6621
#define WARNING
Definition: elog.h:40
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1126
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:1009
static int elevel
Definition: vacuumlazy.c:333
#define HASH_BLOBS
Definition: hsearch.h:86
SubXactEvent
Definition: xact.h:126
#define PG_FINALLY()
Definition: elog.h:312
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:708
static HTAB * ConnectionHash
Definition: connection.c:67
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
uintptr_t Datum
Definition: postgres.h:367
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3583
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1079
#define PG_WAIT_EXTENSION
Definition: pgstat.h:789
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
Size keysize
Definition: hsearch.h:71
static bool UserMappingPasswordRequired(UserMapping *user)
Definition: connection.c:358
bool AcquireExternalFD(void)
Definition: fd.c:1048
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
#define ereport(elevel,...)
Definition: elog.h:144
void PQclear(PGresult *res)
Definition: fe-exec.c:694
uint32 mapping_hashvalue
Definition: connection.c:61
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:857
bool in_error_recursion_trouble(void)
Definition: elog.c:198
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:108
int errmsg_internal(const char *fmt,...)
Definition: elog.c:911
#define PG_CATCH()
Definition: elog.h:305
PGconn * conn
Definition: connection.c:52
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:592
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2754
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:343
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1157
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3528
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:1045
static bool xact_got_connection
Definition: connection.c:74
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:541
static int list_length(const List *l)
Definition: pg_list.h:169
Oid serverid
Definition: foreign.h:49
#define PG_RE_THROW()
Definition: elog.h:336
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1401
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1391
static Datum values[MAXATTR]
Definition: bootstrap.c:165
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6715
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4447
void ReleaseExternalFD(void)
Definition: fd.c:1101
static char * user
Definition: pg_regress.c:95
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define IsolationIsSerializable()
Definition: xact.h:52
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:214
int i
#define errcontext
Definition: elog.h:185
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1939
void * arg
struct Latch * MyLatch
Definition: globals.c:54
char * defname
Definition: parsenodes.h:733
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:555
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:895
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6613
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1648
#define PG_TRY()
Definition: elog.h:295
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:477
Oid ConnCacheKey
Definition: connection.c:47
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:568
#define snprintf
Definition: port.h:193
List * options
Definition: foreign.h:42
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6684
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
Oid serverid
Definition: foreign.h:36
#define PG_END_TRY()
Definition: elog.h:320
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:223