PostgreSQL Source Code  git master
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xact.h"
18 #include "mb/pg_wchar.h"
19 #include "miscadmin.h"
20 #include "pgstat.h"
21 #include "postgres_fdw.h"
22 #include "storage/latch.h"
23 #include "utils/hsearch.h"
24 #include "utils/inval.h"
25 #include "utils/memutils.h"
26 #include "utils/syscache.h"
27 
28 /*
29  * Connection cache hash table entry
30  *
31  * The lookup key in this hash table is the user mapping OID. We use just one
32  * connection per user mapping ID, which ensures that all the scans use the
33  * same snapshot during a query. Using the user mapping OID rather than
34  * the foreign server OID + user OID avoids creating multiple connections when
35  * the public user mapping applies to all user OIDs.
36  *
37  * The "conn" pointer can be NULL if we don't currently have a live connection.
38  * When we do have a connection, xact_depth tracks the current depth of
39  * transactions and subtransactions open on the remote side. We need to issue
40  * commands at the same nesting depth on the remote as we're executing at
41  * ourselves, so that rolling back a subtransaction will kill the right
42  * queries and not the wrong ones.
43  */
44 typedef Oid ConnCacheKey;
45 
46 typedef struct ConnCacheEntry
47 {
48  ConnCacheKey key; /* hash key (must be first) */
49  PGconn *conn; /* connection to foreign server, or NULL */
50  /* Remaining fields are invalid when conn is NULL: */
51  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
52  * one level of subxact open, etc */
53  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
54  bool have_error; /* have any subxacts aborted in this xact? */
55  bool changing_xact_state; /* xact state change in process */
56  bool invalidated; /* true if reconnect is pending */
57  uint32 server_hashvalue; /* hash value of foreign server OID */
58  uint32 mapping_hashvalue; /* hash value of user mapping OID */
60 
61 /*
62  * Connection cache (initialized on first use)
63  */
64 static HTAB *ConnectionHash = NULL;
65 
66 /* for assigning cursor numbers and prepared statement numbers */
67 static unsigned int cursor_number = 0;
68 static unsigned int prep_stmt_number = 0;
69 
70 /* tracks whether any work is needed in callback functions */
71 static bool xact_got_connection = false;
72 
73 /* prototypes of private functions */
75 static void disconnect_pg_server(ConnCacheEntry *entry);
76 static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
78 static void do_sql_command(PGconn *conn, const char *sql);
79 static void begin_remote_xact(ConnCacheEntry *entry);
80 static void pgfdw_xact_callback(XactEvent event, void *arg);
81 static void pgfdw_subxact_callback(SubXactEvent event,
82  SubTransactionId mySubid,
83  SubTransactionId parentSubid,
84  void *arg);
85 static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
87 static bool pgfdw_cancel_query(PGconn *conn);
88 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
89  bool ignore_errors);
90 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
91  PGresult **result);
92 
93 
94 /*
95  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
96  * server with the user's authorization. A new connection is established
97  * if we don't already have a suitable one, and a transaction is opened at
98  * the right subtransaction nesting depth if we didn't do that already.
99  *
100  * will_prep_stmt must be true if caller intends to create any prepared
101  * statements. Since those don't go away automatically at transaction end
102  * (not even on error), we need this flag to cue manual cleanup.
103  */
104 PGconn *
105 GetConnection(UserMapping *user, bool will_prep_stmt)
106 {
107  bool found;
108  ConnCacheEntry *entry;
110 
111  /* First time through, initialize connection cache hashtable */
112  if (ConnectionHash == NULL)
113  {
114  HASHCTL ctl;
115 
116  MemSet(&ctl, 0, sizeof(ctl));
117  ctl.keysize = sizeof(ConnCacheKey);
118  ctl.entrysize = sizeof(ConnCacheEntry);
119  /* allocate ConnectionHash in the cache context */
120  ctl.hcxt = CacheMemoryContext;
121  ConnectionHash = hash_create("postgres_fdw connections", 8,
122  &ctl,
124 
125  /*
126  * Register some callback functions that manage connection cleanup.
127  * This should be done just once in each backend.
128  */
135  }
136 
137  /* Set flag that we did GetConnection during the current transaction */
138  xact_got_connection = true;
139 
140  /* Create hash key for the entry. Assume no pad bytes in key struct */
141  key = user->umid;
142 
143  /*
144  * Find or create cached entry for requested connection.
145  */
146  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
147  if (!found)
148  {
149  /*
150  * We need only clear "conn" here; remaining fields will be filled
151  * later when "conn" is set.
152  */
153  entry->conn = NULL;
154  }
155 
156  /* Reject further use of connections which failed abort cleanup. */
158 
159  /*
160  * If the connection needs to be remade due to invalidation, disconnect as
161  * soon as we're out of all transactions.
162  */
163  if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
164  {
165  elog(DEBUG3, "closing connection %p for option changes to take effect",
166  entry->conn);
167  disconnect_pg_server(entry);
168  }
169 
170  /*
171  * We don't check the health of cached connection here, because it would
172  * require some overhead. Broken connection will be detected when the
173  * connection is actually used.
174  */
175 
176  /*
177  * If cache entry doesn't have a connection, we have to establish a new
178  * connection. (If connect_pg_server throws an error, the cache entry
179  * will remain in a valid empty state, ie conn == NULL.)
180  */
181  if (entry->conn == NULL)
182  {
183  ForeignServer *server = GetForeignServer(user->serverid);
184 
185  /* Reset all transient state fields, to be sure all are clean */
186  entry->xact_depth = 0;
187  entry->have_prep_stmt = false;
188  entry->have_error = false;
189  entry->changing_xact_state = false;
190  entry->invalidated = false;
191  entry->server_hashvalue =
193  ObjectIdGetDatum(server->serverid));
194  entry->mapping_hashvalue =
196  ObjectIdGetDatum(user->umid));
197 
198  /* Now try to make the connection */
199  entry->conn = connect_pg_server(server, user);
200 
201  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
202  entry->conn, server->servername, user->umid, user->userid);
203  }
204 
205  /*
206  * Start a new transaction or subtransaction if needed.
207  */
208  begin_remote_xact(entry);
209 
210  /* Remember if caller will prepare statements */
211  entry->have_prep_stmt |= will_prep_stmt;
212 
213  return entry->conn;
214 }
215 
216 /*
217  * Connect to remote server using specified server and user mapping properties.
218  */
219 static PGconn *
221 {
222  PGconn *volatile conn = NULL;
223 
224  /*
225  * Use PG_TRY block to ensure closing connection on error.
226  */
227  PG_TRY();
228  {
229  const char **keywords;
230  const char **values;
231  int n;
232 
233  /*
234  * Construct connection params from generic options of ForeignServer
235  * and UserMapping. (Some of them might not be libpq options, in
236  * which case we'll just waste a few array slots.) Add 3 extra slots
237  * for fallback_application_name, client_encoding, end marker.
238  */
239  n = list_length(server->options) + list_length(user->options) + 3;
240  keywords = (const char **) palloc(n * sizeof(char *));
241  values = (const char **) palloc(n * sizeof(char *));
242 
243  n = 0;
244  n += ExtractConnectionOptions(server->options,
245  keywords + n, values + n);
247  keywords + n, values + n);
248 
249  /* Use "postgres_fdw" as fallback_application_name. */
250  keywords[n] = "fallback_application_name";
251  values[n] = "postgres_fdw";
252  n++;
253 
254  /* Set client_encoding so that libpq can convert encoding properly. */
255  keywords[n] = "client_encoding";
256  values[n] = GetDatabaseEncodingName();
257  n++;
258 
259  keywords[n] = values[n] = NULL;
260 
261  /* verify connection parameters and make connection */
262  check_conn_params(keywords, values, user);
263 
264  conn = PQconnectdbParams(keywords, values, false);
265  if (!conn || PQstatus(conn) != CONNECTION_OK)
266  ereport(ERROR,
267  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
268  errmsg("could not connect to server \"%s\"",
269  server->servername),
270  errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
271 
272  /*
273  * Check that non-superuser has used password to establish connection;
274  * otherwise, he's piggybacking on the postgres server's user
275  * identity. See also dblink_security_check() in contrib/dblink.
276  */
277  if (!superuser_arg(user->userid) && !PQconnectionUsedPassword(conn))
278  ereport(ERROR,
279  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
280  errmsg("password is required"),
281  errdetail("Non-superuser cannot connect if the server does not request a password."),
282  errhint("Target server's authentication method must be changed.")));
283 
284  /* Prepare new session for use */
286 
287  pfree(keywords);
288  pfree(values);
289  }
290  PG_CATCH();
291  {
292  /* Release PGconn data structure if we managed to create one */
293  if (conn)
294  PQfinish(conn);
295  PG_RE_THROW();
296  }
297  PG_END_TRY();
298 
299  return conn;
300 }
301 
302 /*
303  * Disconnect any open connection for a connection cache entry.
304  */
305 static void
307 {
308  if (entry->conn != NULL)
309  {
310  PQfinish(entry->conn);
311  entry->conn = NULL;
312  }
313 }
314 
315 /*
316  * For non-superusers, insist that the connstr specify a password. This
317  * prevents a password from being picked up from .pgpass, a service file,
318  * the environment, etc. We don't want the postgres user's passwords
319  * to be accessible to non-superusers. (See also dblink_connstr_check in
320  * contrib/dblink.)
321  */
322 static void
323 check_conn_params(const char **keywords, const char **values, UserMapping *user)
324 {
325  int i;
326 
327  /* no check required if superuser */
328  if (superuser_arg(user->userid))
329  return;
330 
331  /* ok if params contain a non-empty password */
332  for (i = 0; keywords[i] != NULL; i++)
333  {
334  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
335  return;
336  }
337 
338  ereport(ERROR,
339  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
340  errmsg("password is required"),
341  errdetail("Non-superusers must provide a password in the user mapping.")));
342 }
343 
344 /*
345  * Issue SET commands to make sure remote session is configured properly.
346  *
347  * We do this just once at connection, assuming nothing will change the
348  * values later. Since we'll never send volatile function calls to the
349  * remote, there shouldn't be any way to break this assumption from our end.
350  * It's possible to think of ways to break it at the remote end, eg making
351  * a foreign table point to a view that includes a set_config call ---
352  * but once you admit the possibility of a malicious view definition,
353  * there are any number of ways to break things.
354  */
355 static void
357 {
358  int remoteversion = PQserverVersion(conn);
359 
360  /* Force the search path to contain only pg_catalog (see deparse.c) */
361  do_sql_command(conn, "SET search_path = pg_catalog");
362 
363  /*
364  * Set remote timezone; this is basically just cosmetic, since all
365  * transmitted and returned timestamptzs should specify a zone explicitly
366  * anyway. However it makes the regression test outputs more predictable.
367  *
368  * We don't risk setting remote zone equal to ours, since the remote
369  * server might use a different timezone database. Instead, use UTC
370  * (quoted, because very old servers are picky about case).
371  */
372  do_sql_command(conn, "SET timezone = 'UTC'");
373 
374  /*
375  * Set values needed to ensure unambiguous data output from remote. (This
376  * logic should match what pg_dump does. See also set_transmission_modes
377  * in postgres_fdw.c.)
378  */
379  do_sql_command(conn, "SET datestyle = ISO");
380  if (remoteversion >= 80400)
381  do_sql_command(conn, "SET intervalstyle = postgres");
382  if (remoteversion >= 90000)
383  do_sql_command(conn, "SET extra_float_digits = 3");
384  else
385  do_sql_command(conn, "SET extra_float_digits = 2");
386 }
387 
388 /*
389  * Convenience subroutine to issue a non-data-returning SQL command to remote
390  */
391 static void
392 do_sql_command(PGconn *conn, const char *sql)
393 {
394  PGresult *res;
395 
396  if (!PQsendQuery(conn, sql))
397  pgfdw_report_error(ERROR, NULL, conn, false, sql);
398  res = pgfdw_get_result(conn, sql);
399  if (PQresultStatus(res) != PGRES_COMMAND_OK)
400  pgfdw_report_error(ERROR, res, conn, true, sql);
401  PQclear(res);
402 }
403 
404 /*
405  * Start remote transaction or subtransaction, if needed.
406  *
407  * Note that we always use at least REPEATABLE READ in the remote session.
408  * This is so that, if a query initiates multiple scans of the same or
409  * different foreign tables, we will get snapshot-consistent results from
410  * those scans. A disadvantage is that we can't provide sane emulation of
411  * READ COMMITTED behavior --- it would be nice if we had some other way to
412  * control which remote queries share a snapshot.
413  */
414 static void
416 {
417  int curlevel = GetCurrentTransactionNestLevel();
418 
419  /* Start main transaction if we haven't yet */
420  if (entry->xact_depth <= 0)
421  {
422  const char *sql;
423 
424  elog(DEBUG3, "starting remote transaction on connection %p",
425  entry->conn);
426 
428  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
429  else
430  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
431  entry->changing_xact_state = true;
432  do_sql_command(entry->conn, sql);
433  entry->xact_depth = 1;
434  entry->changing_xact_state = false;
435  }
436 
437  /*
438  * If we're in a subtransaction, stack up savepoints to match our level.
439  * This ensures we can rollback just the desired effects when a
440  * subtransaction aborts.
441  */
442  while (entry->xact_depth < curlevel)
443  {
444  char sql[64];
445 
446  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
447  entry->changing_xact_state = true;
448  do_sql_command(entry->conn, sql);
449  entry->xact_depth++;
450  entry->changing_xact_state = false;
451  }
452 }
453 
454 /*
455  * Release connection reference count created by calling GetConnection.
456  */
457 void
459 {
460  /*
461  * Currently, we don't actually track connection references because all
462  * cleanup is managed on a transaction or subtransaction basis instead. So
463  * there's nothing to do here.
464  */
465 }
466 
467 /*
468  * Assign a "unique" number for a cursor.
469  *
470  * These really only need to be unique per connection within a transaction.
471  * For the moment we ignore the per-connection point and assign them across
472  * all connections in the transaction, but we ask for the connection to be
473  * supplied in case we want to refine that.
474  *
475  * Note that even if wraparound happens in a very long transaction, actual
476  * collisions are highly improbable; just be sure to use %u not %d to print.
477  */
478 unsigned int
480 {
481  return ++cursor_number;
482 }
483 
484 /*
485  * Assign a "unique" number for a prepared statement.
486  *
487  * This works much like GetCursorNumber, except that we never reset the counter
488  * within a session. That's because we can't be 100% sure we've gotten rid
489  * of all prepared statements on all connections, and it's not really worth
490  * increasing the risk of prepared-statement name collisions by resetting.
491  */
492 unsigned int
494 {
495  return ++prep_stmt_number;
496 }
497 
498 /*
499  * Submit a query and wait for the result.
500  *
501  * This function is interruptible by signals.
502  *
503  * Caller is responsible for the error handling on the result.
504  */
505 PGresult *
506 pgfdw_exec_query(PGconn *conn, const char *query)
507 {
508  /*
509  * Submit a query. Since we don't use non-blocking mode, this also can
510  * block. But its risk is relatively small, so we ignore that for now.
511  */
512  if (!PQsendQuery(conn, query))
513  pgfdw_report_error(ERROR, NULL, conn, false, query);
514 
515  /* Wait for the result. */
516  return pgfdw_get_result(conn, query);
517 }
518 
519 /*
520  * Wait for the result from a prior asynchronous execution function call.
521  *
522  * This function offers quick responsiveness by checking for any interruptions.
523  *
524  * This function emulates PQexec()'s behavior of returning the last result
525  * when there are many.
526  *
527  * Caller is responsible for the error handling on the result.
528  */
529 PGresult *
530 pgfdw_get_result(PGconn *conn, const char *query)
531 {
532  PGresult *volatile last_res = NULL;
533 
534  /* In what follows, do not leak any PGresults on an error. */
535  PG_TRY();
536  {
537  for (;;)
538  {
539  PGresult *res;
540 
541  while (PQisBusy(conn))
542  {
543  int wc;
544 
545  /* Sleep until there's something to do */
549  PQsocket(conn),
550  -1L, PG_WAIT_EXTENSION);
552 
554 
555  /* Data available in socket? */
556  if (wc & WL_SOCKET_READABLE)
557  {
558  if (!PQconsumeInput(conn))
559  pgfdw_report_error(ERROR, NULL, conn, false, query);
560  }
561  }
562 
563  res = PQgetResult(conn);
564  if (res == NULL)
565  break; /* query is complete */
566 
567  PQclear(last_res);
568  last_res = res;
569  }
570  }
571  PG_CATCH();
572  {
573  PQclear(last_res);
574  PG_RE_THROW();
575  }
576  PG_END_TRY();
577 
578  return last_res;
579 }
580 
581 /*
582  * Report an error we got from the remote server.
583  *
584  * elevel: error level to use (typically ERROR, but might be less)
585  * res: PGresult containing the error
586  * conn: connection we did the query on
587  * clear: if true, PQclear the result (otherwise caller will handle it)
588  * sql: NULL, or text of remote command we tried to execute
589  *
590  * Note: callers that choose not to throw ERROR for a remote error are
591  * responsible for making sure that the associated ConnCacheEntry gets
592  * marked with have_error = true.
593  */
594 void
596  bool clear, const char *sql)
597 {
598  /* If requested, PGresult must be released before leaving this function. */
599  PG_TRY();
600  {
601  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
602  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
603  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
604  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
605  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
606  int sqlstate;
607 
608  if (diag_sqlstate)
609  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
610  diag_sqlstate[1],
611  diag_sqlstate[2],
612  diag_sqlstate[3],
613  diag_sqlstate[4]);
614  else
615  sqlstate = ERRCODE_CONNECTION_FAILURE;
616 
617  /*
618  * If we don't get a message from the PGresult, try the PGconn. This
619  * is needed because for connection-level failures, PQexec may just
620  * return NULL, not a PGresult at all.
621  */
622  if (message_primary == NULL)
623  message_primary = pchomp(PQerrorMessage(conn));
624 
625  ereport(elevel,
626  (errcode(sqlstate),
627  message_primary ? errmsg_internal("%s", message_primary) :
628  errmsg("could not obtain message string for remote error"),
629  message_detail ? errdetail_internal("%s", message_detail) : 0,
630  message_hint ? errhint("%s", message_hint) : 0,
631  message_context ? errcontext("%s", message_context) : 0,
632  sql ? errcontext("remote SQL command: %s", sql) : 0));
633  }
634  PG_FINALLY();
635  {
636  if (clear)
637  PQclear(res);
638  }
639  PG_END_TRY();
640 }
641 
642 /*
643  * pgfdw_xact_callback --- cleanup at main-transaction end.
644  */
645 static void
647 {
648  HASH_SEQ_STATUS scan;
649  ConnCacheEntry *entry;
650 
651  /* Quick exit if no connections were touched in this transaction. */
652  if (!xact_got_connection)
653  return;
654 
655  /*
656  * Scan all connection cache entries to find open remote transactions, and
657  * close them.
658  */
659  hash_seq_init(&scan, ConnectionHash);
660  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
661  {
662  PGresult *res;
663 
664  /* Ignore cache entry if no open connection right now */
665  if (entry->conn == NULL)
666  continue;
667 
668  /* If it has an open remote transaction, try to close it */
669  if (entry->xact_depth > 0)
670  {
671  bool abort_cleanup_failure = false;
672 
673  elog(DEBUG3, "closing remote transaction on connection %p",
674  entry->conn);
675 
676  switch (event)
677  {
680 
681  /*
682  * If abort cleanup previously failed for this connection,
683  * we can't issue any more commands against it.
684  */
686 
687  /* Commit all remote transactions during pre-commit */
688  entry->changing_xact_state = true;
689  do_sql_command(entry->conn, "COMMIT TRANSACTION");
690  entry->changing_xact_state = false;
691 
692  /*
693  * If there were any errors in subtransactions, and we
694  * made prepared statements, do a DEALLOCATE ALL to make
695  * sure we get rid of all prepared statements. This is
696  * annoying and not terribly bulletproof, but it's
697  * probably not worth trying harder.
698  *
699  * DEALLOCATE ALL only exists in 8.3 and later, so this
700  * constrains how old a server postgres_fdw can
701  * communicate with. We intentionally ignore errors in
702  * the DEALLOCATE, so that we can hobble along to some
703  * extent with older servers (leaking prepared statements
704  * as we go; but we don't really support update operations
705  * pre-8.3 anyway).
706  */
707  if (entry->have_prep_stmt && entry->have_error)
708  {
709  res = PQexec(entry->conn, "DEALLOCATE ALL");
710  PQclear(res);
711  }
712  entry->have_prep_stmt = false;
713  entry->have_error = false;
714  break;
716 
717  /*
718  * We disallow any remote transactions, since it's not
719  * very reasonable to hold them open until the prepared
720  * transaction is committed. For the moment, throw error
721  * unconditionally; later we might allow read-only cases.
722  * Note that the error will cause us to come right back
723  * here with event == XACT_EVENT_ABORT, so we'll clean up
724  * the connection state at that point.
725  */
726  ereport(ERROR,
727  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
728  errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
729  break;
731  case XACT_EVENT_COMMIT:
732  case XACT_EVENT_PREPARE:
733  /* Pre-commit should have closed the open transaction */
734  elog(ERROR, "missed cleaning up connection during pre-commit");
735  break;
737  case XACT_EVENT_ABORT:
738 
739  /*
740  * Don't try to clean up the connection if we're already
741  * in error recursion trouble.
742  */
744  entry->changing_xact_state = true;
745 
746  /*
747  * If connection is already unsalvageable, don't touch it
748  * further.
749  */
750  if (entry->changing_xact_state)
751  break;
752 
753  /*
754  * Mark this connection as in the process of changing
755  * transaction state.
756  */
757  entry->changing_xact_state = true;
758 
759  /* Assume we might have lost track of prepared statements */
760  entry->have_error = true;
761 
762  /*
763  * If a command has been submitted to the remote server by
764  * using an asynchronous execution function, the command
765  * might not have yet completed. Check to see if a
766  * command is still being processed by the remote server,
767  * and if so, request cancellation of the command.
768  */
769  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
770  !pgfdw_cancel_query(entry->conn))
771  {
772  /* Unable to cancel running query. */
773  abort_cleanup_failure = true;
774  }
775  else if (!pgfdw_exec_cleanup_query(entry->conn,
776  "ABORT TRANSACTION",
777  false))
778  {
779  /* Unable to abort remote transaction. */
780  abort_cleanup_failure = true;
781  }
782  else if (entry->have_prep_stmt && entry->have_error &&
784  "DEALLOCATE ALL",
785  true))
786  {
787  /* Trouble clearing prepared statements. */
788  abort_cleanup_failure = true;
789  }
790  else
791  {
792  entry->have_prep_stmt = false;
793  entry->have_error = false;
794  }
795 
796  /* Disarm changing_xact_state if it all worked. */
797  entry->changing_xact_state = abort_cleanup_failure;
798  break;
799  }
800  }
801 
802  /* Reset state to show we're out of a transaction */
803  entry->xact_depth = 0;
804 
805  /*
806  * If the connection isn't in a good idle state, discard it to
807  * recover. Next GetConnection will open a new connection.
808  */
809  if (PQstatus(entry->conn) != CONNECTION_OK ||
811  entry->changing_xact_state)
812  {
813  elog(DEBUG3, "discarding connection %p", entry->conn);
814  disconnect_pg_server(entry);
815  }
816  }
817 
818  /*
819  * Regardless of the event type, we can now mark ourselves as out of the
820  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
821  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
822  */
823  xact_got_connection = false;
824 
825  /* Also reset cursor numbering for next transaction */
826  cursor_number = 0;
827 }
828 
829 /*
830  * pgfdw_subxact_callback --- cleanup at subtransaction end.
831  */
832 static void
834  SubTransactionId parentSubid, void *arg)
835 {
836  HASH_SEQ_STATUS scan;
837  ConnCacheEntry *entry;
838  int curlevel;
839 
840  /* Nothing to do at subxact start, nor after commit. */
841  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
842  event == SUBXACT_EVENT_ABORT_SUB))
843  return;
844 
845  /* Quick exit if no connections were touched in this transaction. */
846  if (!xact_got_connection)
847  return;
848 
849  /*
850  * Scan all connection cache entries to find open remote subtransactions
851  * of the current level, and close them.
852  */
853  curlevel = GetCurrentTransactionNestLevel();
854  hash_seq_init(&scan, ConnectionHash);
855  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
856  {
857  char sql[100];
858 
859  /*
860  * We only care about connections with open remote subtransactions of
861  * the current level.
862  */
863  if (entry->conn == NULL || entry->xact_depth < curlevel)
864  continue;
865 
866  if (entry->xact_depth > curlevel)
867  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
868  entry->xact_depth);
869 
870  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
871  {
872  /*
873  * If abort cleanup previously failed for this connection, we
874  * can't issue any more commands against it.
875  */
877 
878  /* Commit all remote subtransactions during pre-commit */
879  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
880  entry->changing_xact_state = true;
881  do_sql_command(entry->conn, sql);
882  entry->changing_xact_state = false;
883  }
884  else if (in_error_recursion_trouble())
885  {
886  /*
887  * Don't try to clean up the connection if we're already in error
888  * recursion trouble.
889  */
890  entry->changing_xact_state = true;
891  }
892  else if (!entry->changing_xact_state)
893  {
894  bool abort_cleanup_failure = false;
895 
896  /* Remember that abort cleanup is in progress. */
897  entry->changing_xact_state = true;
898 
899  /* Assume we might have lost track of prepared statements */
900  entry->have_error = true;
901 
902  /*
903  * If a command has been submitted to the remote server by using
904  * an asynchronous execution function, the command might not have
905  * yet completed. Check to see if a command is still being
906  * processed by the remote server, and if so, request cancellation
907  * of the command.
908  */
909  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
910  !pgfdw_cancel_query(entry->conn))
911  abort_cleanup_failure = true;
912  else
913  {
914  /* Rollback all remote subtransactions during abort */
915  snprintf(sql, sizeof(sql),
916  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
917  curlevel, curlevel);
918  if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
919  abort_cleanup_failure = true;
920  }
921 
922  /* Disarm changing_xact_state if it all worked. */
923  entry->changing_xact_state = abort_cleanup_failure;
924  }
925 
926  /* OK, we're outta that level of subtransaction */
927  entry->xact_depth--;
928  }
929 }
930 
931 /*
932  * Connection invalidation callback function
933  *
934  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
935  * mark connections depending on that entry as needing to be remade.
936  * We can't immediately destroy them, since they might be in the midst of
937  * a transaction, but we'll remake them at the next opportunity.
938  *
939  * Although most cache invalidation callbacks blow away all the related stuff
940  * regardless of the given hashvalue, connections are expensive enough that
941  * it's worth trying to avoid that.
942  *
943  * NB: We could avoid unnecessary disconnection more strictly by examining
944  * individual option values, but it seems too much effort for the gain.
945  */
946 static void
947 pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
948 {
949  HASH_SEQ_STATUS scan;
950  ConnCacheEntry *entry;
951 
952  Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
953 
954  /* ConnectionHash must exist already, if we're registered */
955  hash_seq_init(&scan, ConnectionHash);
956  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
957  {
958  /* Ignore invalid entries */
959  if (entry->conn == NULL)
960  continue;
961 
962  /* hashvalue == 0 means a cache reset, must clear all state */
963  if (hashvalue == 0 ||
964  (cacheid == FOREIGNSERVEROID &&
965  entry->server_hashvalue == hashvalue) ||
966  (cacheid == USERMAPPINGOID &&
967  entry->mapping_hashvalue == hashvalue))
968  entry->invalidated = true;
969  }
970 }
971 
972 /*
973  * Raise an error if the given connection cache entry is marked as being
974  * in the middle of an xact state change. This should be called at which no
975  * such change is expected to be in progress; if one is found to be in
976  * progress, it means that we aborted in the middle of a previous state change
977  * and now don't know what the remote transaction state actually is.
978  * Such connections can't safely be further used. Re-establishing the
979  * connection would change the snapshot and roll back any writes already
980  * performed, so that's not an option, either. Thus, we must abort.
981  */
982 static void
984 {
985  HeapTuple tup;
986  Form_pg_user_mapping umform;
987  ForeignServer *server;
988 
989  /* nothing to do for inactive entries and entries of sane state */
990  if (entry->conn == NULL || !entry->changing_xact_state)
991  return;
992 
993  /* make sure this entry is inactive */
994  disconnect_pg_server(entry);
995 
996  /* find server name to be shown in the message below */
998  ObjectIdGetDatum(entry->key));
999  if (!HeapTupleIsValid(tup))
1000  elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
1001  umform = (Form_pg_user_mapping) GETSTRUCT(tup);
1002  server = GetForeignServer(umform->umserver);
1003  ReleaseSysCache(tup);
1004 
1005  ereport(ERROR,
1006  (errcode(ERRCODE_CONNECTION_EXCEPTION),
1007  errmsg("connection to server \"%s\" was lost",
1008  server->servername)));
1009 }
1010 
1011 /*
1012  * Cancel the currently-in-progress query (whose query text we do not have)
1013  * and ignore the result. Returns true if we successfully cancel the query
1014  * and discard any pending result, and false if not.
1015  */
1016 static bool
1018 {
1019  PGcancel *cancel;
1020  char errbuf[256];
1021  PGresult *result = NULL;
1022  TimestampTz endtime;
1023 
1024  /*
1025  * If it takes too long to cancel the query and discard the result, assume
1026  * the connection is dead.
1027  */
1029 
1030  /*
1031  * Issue cancel request. Unfortunately, there's no good way to limit the
1032  * amount of time that we might block inside PQgetCancel().
1033  */
1034  if ((cancel = PQgetCancel(conn)))
1035  {
1036  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1037  {
1038  ereport(WARNING,
1039  (errcode(ERRCODE_CONNECTION_FAILURE),
1040  errmsg("could not send cancel request: %s",
1041  errbuf)));
1042  PQfreeCancel(cancel);
1043  return false;
1044  }
1045  PQfreeCancel(cancel);
1046  }
1047 
1048  /* Get and discard the result of the query. */
1049  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1050  return false;
1051  PQclear(result);
1052 
1053  return true;
1054 }
1055 
1056 /*
1057  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1058  * result. If the query is executed without error, the return value is true.
1059  * If the query is executed successfully but returns an error, the return
1060  * value is true if and only if ignore_errors is set. If the query can't be
1061  * sent or times out, the return value is false.
1062  */
1063 static bool
1064 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1065 {
1066  PGresult *result = NULL;
1067  TimestampTz endtime;
1068 
1069  /*
1070  * If it takes too long to execute a cleanup query, assume the connection
1071  * is dead. It's fairly likely that this is why we aborted in the first
1072  * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1073  * be too long.
1074  */
1076 
1077  /*
1078  * Submit a query. Since we don't use non-blocking mode, this also can
1079  * block. But its risk is relatively small, so we ignore that for now.
1080  */
1081  if (!PQsendQuery(conn, query))
1082  {
1083  pgfdw_report_error(WARNING, NULL, conn, false, query);
1084  return false;
1085  }
1086 
1087  /* Get the result of the query. */
1088  if (pgfdw_get_cleanup_result(conn, endtime, &result))
1089  return false;
1090 
1091  /* Issue a warning if not successful. */
1092  if (PQresultStatus(result) != PGRES_COMMAND_OK)
1093  {
1094  pgfdw_report_error(WARNING, result, conn, true, query);
1095  return ignore_errors;
1096  }
1097  PQclear(result);
1098 
1099  return true;
1100 }
1101 
1102 /*
1103  * Get, during abort cleanup, the result of a query that is in progress. This
1104  * might be a query that is being interrupted by transaction abort, or it might
1105  * be a query that was initiated as part of transaction abort to get the remote
1106  * side back to the appropriate state.
1107  *
1108  * It's not a huge problem if we throw an ERROR here, but if we get into error
1109  * recursion trouble, we'll end up slamming the connection shut, which will
1110  * necessitate failing the entire toplevel transaction even if subtransactions
1111  * were used. Try to use WARNING where we can.
1112  *
1113  * endtime is the time at which we should give up and assume the remote
1114  * side is dead. Returns true if the timeout expired, otherwise false.
1115  * Sets *result except in case of a timeout.
1116  */
1117 static bool
1119 {
1120  volatile bool timed_out = false;
1121  PGresult *volatile last_res = NULL;
1122 
1123  /* In what follows, do not leak any PGresults on an error. */
1124  PG_TRY();
1125  {
1126  for (;;)
1127  {
1128  PGresult *res;
1129 
1130  while (PQisBusy(conn))
1131  {
1132  int wc;
1134  long secs;
1135  int microsecs;
1136  long cur_timeout;
1137 
1138  /* If timeout has expired, give up, else get sleep time. */
1139  if (now >= endtime)
1140  {
1141  timed_out = true;
1142  goto exit;
1143  }
1144  TimestampDifference(now, endtime, &secs, &microsecs);
1145 
1146  /* To protect against clock skew, limit sleep to one minute. */
1147  cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
1148 
1149  /* Sleep until there's something to do */
1153  PQsocket(conn),
1154  cur_timeout, PG_WAIT_EXTENSION);
1156 
1158 
1159  /* Data available in socket? */
1160  if (wc & WL_SOCKET_READABLE)
1161  {
1162  if (!PQconsumeInput(conn))
1163  {
1164  /* connection trouble; treat the same as a timeout */
1165  timed_out = true;
1166  goto exit;
1167  }
1168  }
1169  }
1170 
1171  res = PQgetResult(conn);
1172  if (res == NULL)
1173  break; /* query is complete */
1174 
1175  PQclear(last_res);
1176  last_res = res;
1177  }
1178 exit: ;
1179  }
1180  PG_CATCH();
1181  {
1182  PQclear(last_res);
1183  PG_RE_THROW();
1184  }
1185  PG_END_TRY();
1186 
1187  if (timed_out)
1188  PQclear(last_res);
1189  else
1190  *result = last_res;
1191  return timed_out;
1192 }
Oid umid
Definition: foreign.h:47
XactEvent
Definition: xact.h:108
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6623
static void configure_remote_session(PGconn *conn)
Definition: connection.c:356
int errhint(const char *fmt,...)
Definition: elog.c:1069
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:58
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
#define WL_TIMEOUT
Definition: latch.h:127
MemoryContext hcxt
Definition: hsearch.h:78
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
Definition: connection.c:1118
#define DEBUG3
Definition: elog.h:23
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:59
#define USECS_PER_SEC
Definition: timestamp.h:94
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
struct ConnCacheEntry ConnCacheEntry
int64 TimestampTz
Definition: timestamp.h:39
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:62
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4269
ConnCacheKey key
Definition: connection.c:48
#define Min(x, y)
Definition: c.h:911
Size entrysize
Definition: hsearch.h:73
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition: connection.c:323
int errcode(int sqlerrcode)
Definition: elog.c:608
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4125
#define MemSet(start, val, len)
Definition: c.h:962
#define WL_SOCKET_READABLE
Definition: latch.h:125
bool have_prep_stmt
Definition: connection.c:53
#define GetSysCacheHashValue1(cacheId, key1)
Definition: syscache.h:201
uint32 server_hashvalue
Definition: connection.c:57
uint32 SubTransactionId
Definition: c.h:518
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6613
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
Oid userid
Definition: foreign.h:48
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
void ResetLatch(Latch *latch)
Definition: latch.c:519
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:624
int errdetail_internal(const char *fmt,...)
Definition: elog.c:982
void ReleaseConnection(PGconn *conn)
Definition: connection.c:458
char * pchomp(const char *in)
Definition: mcxt.c:1214
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1234
Definition: dynahash.c:208
void pfree(void *pointer)
Definition: mcxt.c:1056
static unsigned int prep_stmt_number
Definition: connection.c:68
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:295
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
bool changing_xact_state
Definition: connection.c:55
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:392
FormData_pg_user_mapping * Form_pg_user_mapping
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4246
int errdetail(const char *fmt,...)
Definition: elog.c:955
List * options
Definition: foreign.h:50
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
unsigned int uint32
Definition: c.h:359
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:595
static unsigned int cursor_number
Definition: connection.c:67
#define ereport(elevel, rest)
Definition: elog.h:141
bool invalidated
Definition: connection.c:56
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:6578
#define WARNING
Definition: elog.h:40
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Definition: connection.c:1064
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
Definition: connection.c:947
static int elevel
Definition: vacuumlazy.c:143
#define HASH_BLOBS
Definition: hsearch.h:88
SubXactEvent
Definition: xact.h:122
#define PG_FINALLY()
Definition: elog.h:339
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:646
static HTAB * ConnectionHash
Definition: connection.c:64
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:109
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
uintptr_t Datum
Definition: postgres.h:367
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3534
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
static bool pgfdw_cancel_query(PGconn *conn)
Definition: connection.c:1017
#define PG_WAIT_EXTENSION
Definition: pgstat.h:759
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
Size keysize
Definition: hsearch.h:72
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:60
void PQclear(PGresult *res)
Definition: fe-exec.c:694
uint32 mapping_hashvalue
Definition: connection.c:58
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:841
bool in_error_recursion_trouble(void)
Definition: elog.c:197
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:105
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
#define PG_CATCH()
Definition: elog.h:332
PGconn * conn
Definition: connection.c:49
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:530
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2754
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition: connection.c:306
#define Assert(condition)
Definition: c.h:739
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1052
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3479
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Definition: connection.c:983
static bool xact_got_connection
Definition: connection.c:71
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:479
static int list_length(const List *l)
Definition: pg_list.h:169
Oid serverid
Definition: foreign.h:49
#define PG_RE_THROW()
Definition: elog.h:363
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:6672
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4401
static char * user
Definition: pg_regress.c:94
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define IsolationIsSerializable()
Definition: xact.h:52
char * servername
Definition: foreign.h:39
#define elog(elevel,...)
Definition: elog.h:228
int i
#define errcontext
Definition: elog.h:183
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1939
void * arg
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:493
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:833
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6570
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1657
#define PG_TRY()
Definition: elog.h:322
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:415
Oid ConnCacheKey
Definition: connection.c:44
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:506
#define snprintf
Definition: port.h:192
List * options
Definition: foreign.h:42
#define WL_LATCH_SET
Definition: latch.h:124
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6641
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
Oid serverid
Definition: foreign.h:36
#define PG_END_TRY()
Definition: elog.h:347
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:64
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:220