PostgreSQL Source Code git master
Loading...
Searching...
No Matches
connection.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * connection.c
4 * Connection management functions for postgres_fdw
5 *
6 * Portions Copyright (c) 2012-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/connection.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#if HAVE_POLL_H
16#include <poll.h>
17#endif
18
19#include "access/htup_details.h"
20#include "access/xact.h"
22#include "commands/defrem.h"
23#include "common/base64.h"
24#include "funcapi.h"
25#include "libpq/libpq-be.h"
27#include "mb/pg_wchar.h"
28#include "miscadmin.h"
29#include "pgstat.h"
30#include "postgres_fdw.h"
31#include "storage/latch.h"
32#include "utils/builtins.h"
33#include "utils/hsearch.h"
34#include "utils/inval.h"
35#include "utils/syscache.h"
36#include "utils/tuplestore.h"
37
38/*
39 * Connection cache hash table entry
40 *
41 * The lookup key in this hash table is the user mapping OID. We use just one
42 * connection per user mapping ID, which ensures that all the scans use the
43 * same snapshot during a query. Using the user mapping OID rather than
44 * the foreign server OID + user OID avoids creating multiple connections when
45 * the public user mapping applies to all user OIDs.
46 *
47 * The "conn" pointer can be NULL if we don't currently have a live connection.
48 * When we do have a connection, xact_depth tracks the current depth of
49 * transactions and subtransactions open on the remote side. We need to issue
50 * commands at the same nesting depth on the remote as we're executing at
51 * ourselves, so that rolling back a subtransaction will kill the right
52 * queries and not the wrong ones.
53 */
55
56typedef struct ConnCacheEntry
57{
58 ConnCacheKey key; /* hash key (must be first) */
59 PGconn *conn; /* connection to foreign server, or NULL */
60 /* Remaining fields are invalid when conn is NULL: */
61 int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
62 * one level of subxact open, etc */
63 bool xact_read_only; /* xact r/o state */
64 bool have_prep_stmt; /* have we prepared any stmts in this xact? */
65 bool have_error; /* have any subxacts aborted in this xact? */
66 bool changing_xact_state; /* xact state change in process */
67 bool parallel_commit; /* do we commit (sub)xacts in parallel? */
68 bool parallel_abort; /* do we abort (sub)xacts in parallel? */
69 bool invalidated; /* true if reconnect is pending */
70 bool keep_connections; /* setting value of keep_connections
71 * server option */
72 Oid serverid; /* foreign server OID used to get server name */
73 uint32 server_hashvalue; /* hash value of foreign server OID */
74 uint32 mapping_hashvalue; /* hash value of user mapping OID */
75 PgFdwConnState state; /* extra per-connection state */
77
78/*
79 * Connection cache (initialized on first use)
80 */
82
83/* for assigning cursor numbers and prepared statement numbers */
84static unsigned int cursor_number = 0;
85static unsigned int prep_stmt_number = 0;
86
87/* tracks whether any work is needed in callback functions */
88static bool xact_got_connection = false;
89
90/*
91 * tracks the topmost read-only local transaction's nesting level determined
92 * by GetTopReadOnlyTransactionNestLevel()
93 */
94static int read_only_level = 0;
95
96/* custom wait event values, retrieved from shared memory */
100
101/*
102 * Milliseconds to wait to cancel an in-progress query or execute a cleanup
103 * query; if it takes longer than 30 seconds to do these, we assume the
104 * connection is dead.
105 */
106#define CONNECTION_CLEANUP_TIMEOUT 30000
107
108/*
109 * Milliseconds to wait before issuing another cancel request. This covers
110 * the race condition where the remote session ignored our cancel request
111 * because it arrived while idle.
112 */
113#define RETRY_CANCEL_TIMEOUT 1000
114
115/* Macro for constructing abort command to be sent */
116#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
117 do { \
118 if (toplevel) \
119 snprintf((sql), sizeof(sql), \
120 "ABORT TRANSACTION"); \
121 else \
122 snprintf((sql), sizeof(sql), \
123 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
124 (entry)->xact_depth, (entry)->xact_depth); \
125 } while(0)
126
127/*
128 * Extension version number, for supporting older extension versions' objects
129 */
135
136/*
137 * SQL functions
138 */
144
145/* prototypes of private functions */
148static void disconnect_pg_server(ConnCacheEntry *entry);
149static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
151static void do_sql_command_begin(PGconn *conn, const char *sql);
152static void do_sql_command_end(PGconn *conn, const char *sql,
153 bool consume_input);
154static void begin_remote_xact(ConnCacheEntry *entry);
155static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
156 const char *sql);
157static void pgfdw_xact_callback(XactEvent event, void *arg);
158static void pgfdw_subxact_callback(SubXactEvent event,
161 void *arg);
163 uint32 hashvalue);
165static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
166static bool pgfdw_cancel_query(PGconn *conn);
170 bool consume_input);
171static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
172 bool ignore_errors);
173static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
174static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
176 bool consume_input,
177 bool ignore_errors);
180 PGresult **result, bool *timed_out);
181static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
182static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
187 int curlevel);
190 bool toplevel);
191static void pgfdw_security_check(const char **keywords, const char **values,
195static bool disconnect_cached_connections(Oid serverid);
197 enum pgfdwVersion api_version);
198static int pgfdw_conn_check(PGconn *conn);
199static bool pgfdw_conn_checkable(void);
200static bool pgfdw_has_required_scram_options(const char **keywords, const char **values);
201
202/*
203 * Get a PGconn which can be used to execute queries on the remote PostgreSQL
204 * server with the user's authorization. A new connection is established
205 * if we don't already have a suitable one, and a transaction is opened at
206 * the right subtransaction nesting depth if we didn't do that already.
207 *
208 * will_prep_stmt must be true if caller intends to create any prepared
209 * statements. Since those don't go away automatically at transaction end
210 * (not even on error), we need this flag to cue manual cleanup.
211 *
212 * If state is not NULL, *state receives the per-connection state associated
213 * with the PGconn.
214 */
215PGconn *
217{
218 bool found;
219 bool retry = false;
220 ConnCacheEntry *entry;
221 ConnCacheKey key;
223
224 /* First time through, initialize connection cache hashtable */
225 if (ConnectionHash == NULL)
226 {
227 HASHCTL ctl;
228
229 if (pgfdw_we_get_result == 0)
231 WaitEventExtensionNew("PostgresFdwGetResult");
232
233 ctl.keysize = sizeof(ConnCacheKey);
234 ctl.entrysize = sizeof(ConnCacheEntry);
235 ConnectionHash = hash_create("postgres_fdw connections", 8,
236 &ctl,
238
239 /*
240 * Register some callback functions that manage connection cleanup.
241 * This should be done just once in each backend.
242 */
249 }
250
251 /* Set flag that we did GetConnection during the current transaction */
252 xact_got_connection = true;
253
254 /* Create hash key for the entry. Assume no pad bytes in key struct */
255 key = user->umid;
256
257 /*
258 * Find or create cached entry for requested connection.
259 */
260 entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
261 if (!found)
262 {
263 /*
264 * We need only clear "conn" here; remaining fields will be filled
265 * later when "conn" is set.
266 */
267 entry->conn = NULL;
268 }
269
270 /* Reject further use of connections which failed abort cleanup. */
272
273 /*
274 * If the connection needs to be remade due to invalidation, disconnect as
275 * soon as we're out of all transactions.
276 */
277 if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
278 {
279 elog(DEBUG3, "closing connection %p for option changes to take effect",
280 entry->conn);
282 }
283
284 /*
285 * If cache entry doesn't have a connection, we have to establish a new
286 * connection. (If connect_pg_server throws an error, the cache entry
287 * will remain in a valid empty state, ie conn == NULL.)
288 */
289 if (entry->conn == NULL)
291
292 /*
293 * We check the health of the cached connection here when using it. In
294 * cases where we're out of all transactions, if a broken connection is
295 * detected, we try to reestablish a new connection later.
296 */
297 PG_TRY();
298 {
299 /* Process a pending asynchronous request if any. */
300 if (entry->state.pendingAreq)
302 /* Start a new transaction or subtransaction if needed. */
303 begin_remote_xact(entry);
304 }
305 PG_CATCH();
306 {
309
310 /*
311 * Determine whether to try to reestablish the connection.
312 *
313 * After a broken connection is detected in libpq, any error other
314 * than connection failure (e.g., out-of-memory) can be thrown
315 * somewhere between return from libpq and the expected ereport() call
316 * in pgfdw_report_error(). In this case, since PQstatus() indicates
317 * CONNECTION_BAD, checking only PQstatus() causes the false detection
318 * of connection failure. To avoid this, we also verify that the
319 * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
320 * checking only the sqlstate can cause another false detection
321 * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
322 * for any libpq-originated error condition.
323 */
324 if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
325 PQstatus(entry->conn) != CONNECTION_BAD ||
326 entry->xact_depth > 0)
327 {
329 PG_RE_THROW();
330 }
331
332 /* Clean up the error state */
335 errdata = NULL;
336
337 retry = true;
338 }
339 PG_END_TRY();
340
341 /*
342 * If a broken connection is detected, disconnect it, reestablish a new
343 * connection and retry a new remote transaction. If connection failure is
344 * reported again, we give up getting a connection.
345 */
346 if (retry)
347 {
348 Assert(entry->xact_depth == 0);
349
351 (errmsg_internal("could not start remote transaction on connection %p",
352 entry->conn)),
354
355 elog(DEBUG3, "closing connection %p to reestablish a new one",
356 entry->conn);
358
360
361 begin_remote_xact(entry);
362 }
363
364 /* Remember if caller will prepare statements */
366
367 /* If caller needs access to the per-connection state, return it. */
368 if (state)
369 *state = &entry->state;
370
371 return entry->conn;
372}
373
374/*
375 * Reset all transient state fields in the cached connection entry and
376 * establish new connection to the remote server.
377 */
378static void
380{
381 ForeignServer *server = GetForeignServer(user->serverid);
382 ListCell *lc;
383
384 Assert(entry->conn == NULL);
385
386 /* Reset all transient state fields, to be sure all are clean */
387 entry->xact_depth = 0;
388 entry->xact_read_only = false;
389 entry->have_prep_stmt = false;
390 entry->have_error = false;
391 entry->changing_xact_state = false;
392 entry->invalidated = false;
393 entry->serverid = server->serverid;
394 entry->server_hashvalue =
396 ObjectIdGetDatum(server->serverid));
397 entry->mapping_hashvalue =
399 ObjectIdGetDatum(user->umid));
400 memset(&entry->state, 0, sizeof(entry->state));
401
402 /*
403 * Determine whether to keep the connection that we're about to make here
404 * open even after the transaction using it ends, so that the subsequent
405 * transactions can re-use it.
406 *
407 * By default, all the connections to any foreign servers are kept open.
408 *
409 * Also determine whether to commit/abort (sub)transactions opened on the
410 * remote server in parallel at (sub)transaction end, which is disabled by
411 * default.
412 *
413 * Note: it's enough to determine these only when making a new connection
414 * because if these settings for it are changed, it will be closed and
415 * re-made later.
416 */
417 entry->keep_connections = true;
418 entry->parallel_commit = false;
419 entry->parallel_abort = false;
420 foreach(lc, server->options)
421 {
422 DefElem *def = (DefElem *) lfirst(lc);
423
424 if (strcmp(def->defname, "keep_connections") == 0)
425 entry->keep_connections = defGetBoolean(def);
426 else if (strcmp(def->defname, "parallel_commit") == 0)
427 entry->parallel_commit = defGetBoolean(def);
428 else if (strcmp(def->defname, "parallel_abort") == 0)
429 entry->parallel_abort = defGetBoolean(def);
430 }
431
432 /* Now try to make the connection */
433 entry->conn = connect_pg_server(server, user);
434
435 elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
436 entry->conn, server->servername, user->umid, user->userid);
437}
438
439/*
440 * Check that non-superuser has used password or delegated credentials
441 * to establish connection; otherwise, he's piggybacking on the
442 * postgres server's user identity. See also dblink_security_check()
443 * in contrib/dblink and check_conn_params.
444 */
445static void
447{
448 /* Superusers bypass the check */
449 if (superuser_arg(user->userid))
450 return;
451
452#ifdef ENABLE_GSS
453 /* Connected via GSSAPI with delegated credentials- all good. */
455 return;
456#endif
457
458 /* Ok if superuser set PW required false. */
460 return;
461
462 /* Connected via PW, with PW required true, and provided non-empty PW. */
464 {
465 /* ok if params contain a non-empty password */
466 for (int i = 0; keywords[i] != NULL; i++)
467 {
468 if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
469 return;
470 }
471 }
472
473 /*
474 * Ok if SCRAM pass-through is being used and all required SCRAM options
475 * are set correctly. If pgfdw_has_required_scram_options returns true we
476 * assume that UseScramPassthrough is also true since SCRAM options are
477 * only set when UseScramPassthrough is enabled.
478 */
480 return;
481
484 errmsg("password or GSSAPI delegated credentials required"),
485 errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
486 errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
487}
488
489/*
490 * Construct connection params from generic options of ForeignServer and
491 * UserMapping. (Some of them might not be libpq options, in which case we'll
492 * just waste a few array slots.)
493 */
494static void
496 const char ***p_keywords, const char ***p_values,
497 char **p_appname)
498{
499 const char **keywords;
500 const char **values;
501 char *appname = NULL;
502 int n;
503
504 /*
505 * Add 4 extra slots for application_name, fallback_application_name,
506 * client_encoding, end marker, and 3 extra slots for scram keys and
507 * required scram pass-through options.
508 */
509 n = list_length(server->options) + list_length(user->options) + 4 + 3;
510 keywords = (const char **) palloc(n * sizeof(char *));
511 values = (const char **) palloc(n * sizeof(char *));
512
513 n = 0;
515 keywords + n, values + n);
516 n += ExtractConnectionOptions(user->options,
517 keywords + n, values + n);
518
519 /*
520 * Use pgfdw_application_name as application_name if set.
521 *
522 * PQconnectdbParams() processes the parameter arrays from start to end.
523 * If any key word is repeated, the last value is used. Therefore note
524 * that pgfdw_application_name must be added to the arrays after options
525 * of ForeignServer are, so that it can override application_name set in
526 * ForeignServer.
527 */
529 {
530 keywords[n] = "application_name";
532 n++;
533 }
534
535 /*
536 * Search the parameter arrays to find application_name setting, and
537 * replace escape sequences in it with status information if found. The
538 * arrays are searched backwards because the last value is used if
539 * application_name is repeatedly set.
540 */
541 for (int i = n - 1; i >= 0; i--)
542 {
543 if (strcmp(keywords[i], "application_name") == 0 &&
544 *(values[i]) != '\0')
545 {
546 /*
547 * Use this application_name setting if it's not empty string even
548 * after any escape sequences in it are replaced.
549 */
550 appname = process_pgfdw_appname(values[i]);
551 if (appname[0] != '\0')
552 {
553 values[i] = appname;
554 break;
555 }
556
557 /*
558 * This empty application_name is not used, so we set values[i] to
559 * NULL and keep searching the array to find the next one.
560 */
561 values[i] = NULL;
562 pfree(appname);
563 appname = NULL;
564 }
565 }
566
567 *p_appname = appname;
568
569 /* Use "postgres_fdw" as fallback_application_name */
570 keywords[n] = "fallback_application_name";
571 values[n] = "postgres_fdw";
572 n++;
573
574 /* Set client_encoding so that libpq can convert encoding properly. */
575 keywords[n] = "client_encoding";
577 n++;
578
579 /* Add required SCRAM pass-through connection options if it's enabled. */
581 {
582 int len;
583 int encoded_len;
584
585 keywords[n] = "scram_client_key";
587 /* don't forget the zero-terminator */
588 values[n] = palloc0(len + 1);
591 (char *) values[n], len);
592 if (encoded_len < 0)
593 elog(ERROR, "could not encode SCRAM client key");
594 n++;
595
596 keywords[n] = "scram_server_key";
598 /* don't forget the zero-terminator */
599 values[n] = palloc0(len + 1);
602 (char *) values[n], len);
603 if (encoded_len < 0)
604 elog(ERROR, "could not encode SCRAM server key");
605 n++;
606
607 /*
608 * Require scram-sha-256 to ensure that no other auth method is used
609 * when connecting with foreign server.
610 */
611 keywords[n] = "require_auth";
612 values[n] = "scram-sha-256";
613 n++;
614 }
615
616 keywords[n] = values[n] = NULL;
617
618 /* Verify the set of connection parameters. */
620
622 *p_values = values;
623}
624
625/*
626 * Connect to remote server using specified server and user mapping properties.
627 */
628static PGconn *
630{
631 PGconn *volatile conn = NULL;
632
633 /*
634 * Use PG_TRY block to ensure closing connection on error.
635 */
636 PG_TRY();
637 {
638 const char **keywords;
639 const char **values;
640 char *appname;
642
643 construct_connection_params(server, user, &keywords, &values, &appname);
644
645 /* first time, allocate or get the custom wait event */
646 if (pgfdw_we_connect == 0)
647 pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
648
649 /* OK to make connection */
651 /* expand_dbname = */ false);
653 "received message via remote connection");
656
657 if (!conn || PQstatus(conn) != CONNECTION_OK)
660 errmsg("could not connect to server \"%s\"",
661 server->servername),
663
664 /* Perform post-connection security checks. */
666
667 /* Prepare new session for use */
669
670 if (appname != NULL)
671 pfree(appname);
673 pfree(values);
674 }
675 PG_CATCH();
676 {
678 PG_RE_THROW();
679 }
680 PG_END_TRY();
681
682 return conn;
683}
684
685/*
686 * Disconnect any open connection for a connection cache entry.
687 */
688static void
690{
691 if (entry->conn != NULL)
692 {
694 entry->conn = NULL;
695 }
696}
697
698/*
699 * Check and return the value of password_required, if defined; otherwise,
700 * return true, which is the default value of it. The mapping has been
701 * pre-validated.
702 */
703static bool
705{
706 ListCell *cell;
707
708 foreach(cell, user->options)
709 {
710 DefElem *def = (DefElem *) lfirst(cell);
711
712 if (strcmp(def->defname, "password_required") == 0)
713 return defGetBoolean(def);
714 }
715
716 return true;
717}
718
719static bool
721{
722 ListCell *cell;
723
724 foreach(cell, server->options)
725 {
726 DefElem *def = (DefElem *) lfirst(cell);
727
728 if (strcmp(def->defname, "use_scram_passthrough") == 0)
729 return defGetBoolean(def);
730 }
731
732 foreach(cell, user->options)
733 {
734 DefElem *def = (DefElem *) lfirst(cell);
735
736 if (strcmp(def->defname, "use_scram_passthrough") == 0)
737 return defGetBoolean(def);
738 }
739
740 return false;
741}
742
743/*
744 * For non-superusers, insist that the connstr specify a password or that the
745 * user provided their own GSSAPI delegated credentials. This
746 * prevents a password from being picked up from .pgpass, a service file, the
747 * environment, etc. We don't want the postgres user's passwords,
748 * certificates, etc to be accessible to non-superusers. (See also
749 * dblink_connstr_check in contrib/dblink.)
750 */
751static void
752check_conn_params(const char **keywords, const char **values, UserMapping *user)
753{
754 int i;
755
756 /* no check required if superuser */
757 if (superuser_arg(user->userid))
758 return;
759
760#ifdef ENABLE_GSS
761 /* ok if the user provided their own delegated credentials */
763 return;
764#endif
765
766 /* ok if params contain a non-empty password */
767 for (i = 0; keywords[i] != NULL; i++)
768 {
769 if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
770 return;
771 }
772
773 /* ok if the superuser explicitly said so at user mapping creation time */
775 return;
776
777 /*
778 * Ok if SCRAM pass-through is being used and all required scram options
779 * are set correctly. If pgfdw_has_required_scram_options returns true we
780 * assume that UseScramPassthrough is also true since SCRAM options are
781 * only set when UseScramPassthrough is enabled.
782 */
784 return;
785
788 errmsg("password or GSSAPI delegated credentials required"),
789 errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
790}
791
792/*
793 * Issue SET commands to make sure remote session is configured properly.
794 *
795 * We do this just once at connection, assuming nothing will change the
796 * values later. Since we'll never send volatile function calls to the
797 * remote, there shouldn't be any way to break this assumption from our end.
798 * It's possible to think of ways to break it at the remote end, eg making
799 * a foreign table point to a view that includes a set_config call ---
800 * but once you admit the possibility of a malicious view definition,
801 * there are any number of ways to break things.
802 */
803static void
805{
807
808 /* Force the search path to contain only pg_catalog (see deparse.c) */
809 do_sql_command(conn, "SET search_path = pg_catalog");
810
811 /*
812 * Set remote timezone; this is basically just cosmetic, since all
813 * transmitted and returned timestamptzs should specify a zone explicitly
814 * anyway. However it makes the regression test outputs more predictable.
815 *
816 * We don't risk setting remote zone equal to ours, since the remote
817 * server might use a different timezone database. Instead, use GMT
818 * (quoted, because very old servers are picky about case). That's
819 * guaranteed to work regardless of the remote's timezone database,
820 * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
821 */
822 do_sql_command(conn, "SET timezone = 'GMT'");
823
824 /*
825 * Set values needed to ensure unambiguous data output from remote. (This
826 * logic should match what pg_dump does. See also set_transmission_modes
827 * in postgres_fdw.c.)
828 */
829 do_sql_command(conn, "SET datestyle = ISO");
830 if (remoteversion >= 80400)
831 do_sql_command(conn, "SET intervalstyle = postgres");
832 if (remoteversion >= 90000)
833 do_sql_command(conn, "SET extra_float_digits = 3");
834 else
835 do_sql_command(conn, "SET extra_float_digits = 2");
836}
837
838/*
839 * Convenience subroutine to issue a non-data-returning SQL command to remote
840 */
841void
842do_sql_command(PGconn *conn, const char *sql)
843{
845 do_sql_command_end(conn, sql, false);
846}
847
848static void
850{
851 if (!PQsendQuery(conn, sql))
853}
854
855static void
857{
858 PGresult *res;
859
860 /*
861 * If requested, consume whatever data is available from the socket. (Note
862 * that if all data is available, this allows pgfdw_get_result to call
863 * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
864 * would be large compared to the overhead of PQconsumeInput.)
865 */
868 res = pgfdw_get_result(conn);
870 pgfdw_report_error(res, conn, sql);
871 PQclear(res);
872}
873
874/*
875 * Start remote transaction or subtransaction, if needed.
876 *
877 * Note that we always use at least REPEATABLE READ in the remote session.
878 * This is so that, if a query initiates multiple scans of the same or
879 * different foreign tables, we will get snapshot-consistent results from
880 * those scans. A disadvantage is that we can't provide sane emulation of
881 * READ COMMITTED behavior --- it would be nice if we had some other way to
882 * control which remote queries share a snapshot.
883 *
884 * Note also that we always start the remote transaction with the same
885 * read/write and deferrable properties as the local transaction, and start
886 * the remote subtransaction with the same read/write property as the local
887 * subtransaction.
888 */
889static void
891{
893
894 /*
895 * If the current local (sub)transaction is read-only, set the topmost
896 * read-only local transaction's nesting level if we haven't yet.
897 *
898 * Note: once it's set, it's retained until the topmost read-only local
899 * transaction is committed/aborted (see pgfdw_xact_callback and
900 * pgfdw_subxact_callback).
901 */
902 if (XactReadOnly)
903 {
904 if (read_only_level == 0)
907 }
908 else
910
911 /*
912 * Start main transaction if we haven't yet; otherwise, change the current
913 * remote (sub)transaction's read/write mode if needed.
914 */
915 if (entry->xact_depth <= 0)
916 {
917 /*
918 * This is the case when we haven't yet started a main transaction.
919 */
920 StringInfoData sql;
921 bool ro = (read_only_level == 1);
922
923 elog(DEBUG3, "starting remote transaction on connection %p",
924 entry->conn);
925
926 initStringInfo(&sql);
927 appendStringInfoString(&sql, "START TRANSACTION ISOLATION LEVEL ");
929 appendStringInfoString(&sql, "SERIALIZABLE");
930 else
931 appendStringInfoString(&sql, "REPEATABLE READ");
932 if (ro)
933 appendStringInfoString(&sql, " READ ONLY");
934 if (XactDeferrable)
935 appendStringInfoString(&sql, " DEFERRABLE");
936 entry->changing_xact_state = true;
937 do_sql_command(entry->conn, sql.data);
938 entry->xact_depth = 1;
939 if (ro)
940 {
941 Assert(!entry->xact_read_only);
942 entry->xact_read_only = true;
943 }
944 entry->changing_xact_state = false;
945 }
946 else if (!entry->xact_read_only)
947 {
948 /*
949 * The remote (sub)transaction has been opened in read-write mode.
950 */
951 Assert(read_only_level == 0 ||
952 entry->xact_depth <= read_only_level);
953
954 /*
955 * If its nesting depth matches read_only_level, it means that the
956 * local read-write (sub)transaction that started it has changed to
957 * read-only after that; in which case change it to read-only as well.
958 * Otherwise, the local (sub)transaction is still read-write, so there
959 * is no need to do anything.
960 */
961 if (entry->xact_depth == read_only_level)
962 {
963 entry->changing_xact_state = true;
964 do_sql_command(entry->conn, "SET transaction_read_only = on");
965 entry->xact_read_only = true;
966 entry->changing_xact_state = false;
967 }
968 }
969 else
970 {
971 /*
972 * The remote (sub)transaction has been opened in read-only mode.
973 */
975 entry->xact_depth >= read_only_level);
976
977 /*
978 * The local read-only (sub)transaction that started it is guaranteed
979 * to be still read-only (see check_transaction_read_only), so there
980 * is no need to do anything.
981 */
982 }
983
984 /*
985 * If we're in a subtransaction, stack up savepoints to match our level.
986 * This ensures we can rollback just the desired effects when a
987 * subtransaction aborts.
988 */
989 while (entry->xact_depth < curlevel)
990 {
991 StringInfoData sql;
992 bool ro = (entry->xact_depth + 1 == read_only_level);
993
994 initStringInfo(&sql);
995 appendStringInfo(&sql, "SAVEPOINT s%d", entry->xact_depth + 1);
996 if (ro)
997 appendStringInfoString(&sql, "; SET transaction_read_only = on");
998 entry->changing_xact_state = true;
999 do_sql_command(entry->conn, sql.data);
1000 entry->xact_depth++;
1001 if (ro)
1002 {
1003 Assert(!entry->xact_read_only);
1004 entry->xact_read_only = true;
1005 }
1006 entry->changing_xact_state = false;
1007 }
1008}
1009
1010/*
1011 * Release connection reference count created by calling GetConnection.
1012 */
1013void
1015{
1016 /*
1017 * Currently, we don't actually track connection references because all
1018 * cleanup is managed on a transaction or subtransaction basis instead. So
1019 * there's nothing to do here.
1020 */
1021}
1022
1023/*
1024 * Assign a "unique" number for a cursor.
1025 *
1026 * These really only need to be unique per connection within a transaction.
1027 * For the moment we ignore the per-connection point and assign them across
1028 * all connections in the transaction, but we ask for the connection to be
1029 * supplied in case we want to refine that.
1030 *
1031 * Note that even if wraparound happens in a very long transaction, actual
1032 * collisions are highly improbable; just be sure to use %u not %d to print.
1033 */
1034unsigned int
1036{
1037 return ++cursor_number;
1038}
1039
1040/*
1041 * Assign a "unique" number for a prepared statement.
1042 *
1043 * This works much like GetCursorNumber, except that we never reset the counter
1044 * within a session. That's because we can't be 100% sure we've gotten rid
1045 * of all prepared statements on all connections, and it's not really worth
1046 * increasing the risk of prepared-statement name collisions by resetting.
1047 */
1048unsigned int
1050{
1051 return ++prep_stmt_number;
1052}
1053
1054/*
1055 * Submit a query and wait for the result.
1056 *
1057 * Since we don't use non-blocking mode, this can't process interrupts while
1058 * pushing the query text to the server. That risk is relatively small, so we
1059 * ignore that for now.
1060 *
1061 * Caller is responsible for the error handling on the result.
1062 */
1063PGresult *
1065{
1066 /* First, process a pending asynchronous request, if any. */
1067 if (state && state->pendingAreq)
1068 process_pending_request(state->pendingAreq);
1069
1070 if (!PQsendQuery(conn, query))
1071 return NULL;
1072 return pgfdw_get_result(conn);
1073}
1074
1075/*
1076 * Wrap libpqsrv_get_result_last(), adding wait event.
1077 *
1078 * Caller is responsible for the error handling on the result.
1079 */
1080PGresult *
1085
1086/*
1087 * Report an error we got from the remote server.
1088 *
1089 * Callers should use pgfdw_report_error() to throw an error, or use
1090 * pgfdw_report() for lesser message levels. (We make this distinction
1091 * so that pgfdw_report_error() can be marked noreturn.)
1092 *
1093 * res: PGresult containing the error (might be NULL)
1094 * conn: connection we did the query on
1095 * sql: NULL, or text of remote command we tried to execute
1096 *
1097 * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
1098 * in which case memory context cleanup will clear it eventually).
1099 *
1100 * Note: callers that choose not to throw ERROR for a remote error are
1101 * responsible for making sure that the associated ConnCacheEntry gets
1102 * marked with have_error = true.
1103 */
1104void
1105pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
1106{
1107 pgfdw_report_internal(ERROR, res, conn, sql);
1109}
1110
1111void
1112pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
1113{
1114 Assert(elevel < ERROR); /* use pgfdw_report_error for that */
1115 pgfdw_report_internal(elevel, res, conn, sql);
1116}
1117
1118static void
1120 const char *sql)
1121{
1127 int sqlstate;
1128
1129 if (diag_sqlstate)
1130 sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1131 diag_sqlstate[1],
1132 diag_sqlstate[2],
1133 diag_sqlstate[3],
1134 diag_sqlstate[4]);
1135 else
1136 sqlstate = ERRCODE_CONNECTION_FAILURE;
1137
1138 /*
1139 * If we don't get a message from the PGresult, try the PGconn. This is
1140 * needed because for connection-level failures, PQgetResult may just
1141 * return NULL, not a PGresult at all.
1142 */
1143 if (message_primary == NULL)
1145
1146 ereport(elevel,
1147 (errcode(sqlstate),
1148 (message_primary != NULL && message_primary[0] != '\0') ?
1150 errmsg("could not obtain message string for remote error"),
1152 message_hint ? errhint("%s", message_hint) : 0,
1154 sql ? errcontext("remote SQL command: %s", sql) : 0));
1155 PQclear(res);
1156}
1157
1158/*
1159 * pgfdw_xact_callback --- cleanup at main-transaction end.
1160 *
1161 * This runs just late enough that it must not enter user-defined code
1162 * locally. (Entering such code on the remote side is fine. Its remote
1163 * COMMIT TRANSACTION may run deferred triggers.)
1164 */
1165static void
1167{
1168 HASH_SEQ_STATUS scan;
1169 ConnCacheEntry *entry;
1172
1173 /* Quick exit if no connections were touched in this transaction. */
1175 return;
1176
1177 /*
1178 * Scan all connection cache entries to find open remote transactions, and
1179 * close them.
1180 */
1182 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1183 {
1184 PGresult *res;
1185
1186 /* Ignore cache entry if no open connection right now */
1187 if (entry->conn == NULL)
1188 continue;
1189
1190 /* If it has an open remote transaction, try to close it */
1191 if (entry->xact_depth > 0)
1192 {
1193 elog(DEBUG3, "closing remote transaction on connection %p",
1194 entry->conn);
1195
1196 switch (event)
1197 {
1200
1201 /*
1202 * If abort cleanup previously failed for this connection,
1203 * we can't issue any more commands against it.
1204 */
1206
1207 /* Commit all remote transactions during pre-commit */
1208 entry->changing_xact_state = true;
1209 if (entry->parallel_commit)
1210 {
1211 do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
1213 continue;
1214 }
1215 do_sql_command(entry->conn, "COMMIT TRANSACTION");
1216 entry->changing_xact_state = false;
1217
1218 /*
1219 * If there were any errors in subtransactions, and we
1220 * made prepared statements, do a DEALLOCATE ALL to make
1221 * sure we get rid of all prepared statements. This is
1222 * annoying and not terribly bulletproof, but it's
1223 * probably not worth trying harder.
1224 *
1225 * DEALLOCATE ALL only exists in 8.3 and later, so this
1226 * constrains how old a server postgres_fdw can
1227 * communicate with. We intentionally ignore errors in
1228 * the DEALLOCATE, so that we can hobble along to some
1229 * extent with older servers (leaking prepared statements
1230 * as we go; but we don't really support update operations
1231 * pre-8.3 anyway).
1232 */
1233 if (entry->have_prep_stmt && entry->have_error)
1234 {
1235 res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
1236 NULL);
1237 PQclear(res);
1238 }
1239 entry->have_prep_stmt = false;
1240 entry->have_error = false;
1241 break;
1243
1244 /*
1245 * We disallow any remote transactions, since it's not
1246 * very reasonable to hold them open until the prepared
1247 * transaction is committed. For the moment, throw error
1248 * unconditionally; later we might allow read-only cases.
1249 * Note that the error will cause us to come right back
1250 * here with event == XACT_EVENT_ABORT, so we'll clean up
1251 * the connection state at that point.
1252 */
1253 ereport(ERROR,
1255 errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1256 break;
1258 case XACT_EVENT_COMMIT:
1259 case XACT_EVENT_PREPARE:
1260 /* Pre-commit should have closed the open transaction */
1261 elog(ERROR, "missed cleaning up connection during pre-commit");
1262 break;
1264 case XACT_EVENT_ABORT:
1265 /* Rollback all remote transactions during abort */
1266 if (entry->parallel_abort)
1267 {
1268 if (pgfdw_abort_cleanup_begin(entry, true,
1271 continue;
1272 }
1273 else
1274 pgfdw_abort_cleanup(entry, true);
1275 break;
1276 }
1277 }
1278
1279 /* Reset state to show we're out of a transaction */
1280 pgfdw_reset_xact_state(entry, true);
1281 }
1282
1283 /* If there are any pending connections, finish cleaning them up */
1285 {
1286 if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1287 event == XACT_EVENT_PRE_COMMIT)
1288 {
1291 }
1292 else
1293 {
1295 event == XACT_EVENT_ABORT);
1297 true);
1298 }
1299 }
1300
1301 /*
1302 * Regardless of the event type, we can now mark ourselves as out of the
1303 * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1304 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1305 */
1306 xact_got_connection = false;
1307
1308 /* Also reset cursor numbering for next transaction */
1309 cursor_number = 0;
1310
1311 /* Likewise for read_only_level */
1312 read_only_level = 0;
1313}
1314
1315/*
1316 * pgfdw_subxact_callback --- cleanup at subtransaction end.
1317 */
1318static void
1321{
1322 HASH_SEQ_STATUS scan;
1323 ConnCacheEntry *entry;
1324 int curlevel;
1327
1328 /* Nothing to do at subxact start, nor after commit. */
1329 if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1330 event == SUBXACT_EVENT_ABORT_SUB))
1331 return;
1332
1333 /* Quick exit if no connections were touched in this transaction. */
1335 return;
1336
1337 /*
1338 * Scan all connection cache entries to find open remote subtransactions
1339 * of the current level, and close them.
1340 */
1343 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1344 {
1345 char sql[100];
1346
1347 /*
1348 * We only care about connections with open remote subtransactions of
1349 * the current level.
1350 */
1351 if (entry->conn == NULL || entry->xact_depth < curlevel)
1352 continue;
1353
1354 if (entry->xact_depth > curlevel)
1355 elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1356 entry->xact_depth);
1357
1358 if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1359 {
1360 /*
1361 * If abort cleanup previously failed for this connection, we
1362 * can't issue any more commands against it.
1363 */
1365
1366 /* Commit all remote subtransactions during pre-commit */
1367 snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1368 entry->changing_xact_state = true;
1369 if (entry->parallel_commit)
1370 {
1371 do_sql_command_begin(entry->conn, sql);
1373 continue;
1374 }
1375 do_sql_command(entry->conn, sql);
1376 entry->changing_xact_state = false;
1377 }
1378 else
1379 {
1380 /* Rollback all remote subtransactions during abort */
1381 if (entry->parallel_abort)
1382 {
1383 if (pgfdw_abort_cleanup_begin(entry, false,
1386 continue;
1387 }
1388 else
1389 pgfdw_abort_cleanup(entry, false);
1390 }
1391
1392 /* OK, we're outta that level of subtransaction */
1393 pgfdw_reset_xact_state(entry, false);
1394 }
1395
1396 /* If there are any pending connections, finish cleaning them up */
1398 {
1399 if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1400 {
1403 }
1404 else
1405 {
1408 false);
1409 }
1410 }
1411
1412 /* If in read_only_level, reset it */
1414 read_only_level = 0;
1415}
1416
1417/*
1418 * Connection invalidation callback function
1419 *
1420 * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1421 * close connections depending on that entry immediately if current transaction
1422 * has not used those connections yet. Otherwise, mark those connections as
1423 * invalid and then make pgfdw_xact_callback() close them at the end of current
1424 * transaction, since they cannot be closed in the midst of the transaction
1425 * using them. Closed connections will be remade at the next opportunity if
1426 * necessary.
1427 *
1428 * Although most cache invalidation callbacks blow away all the related stuff
1429 * regardless of the given hashvalue, connections are expensive enough that
1430 * it's worth trying to avoid that.
1431 *
1432 * NB: We could avoid unnecessary disconnection more strictly by examining
1433 * individual option values, but it seems too much effort for the gain.
1434 */
1435static void
1437{
1438 HASH_SEQ_STATUS scan;
1439 ConnCacheEntry *entry;
1440
1442
1443 /* ConnectionHash must exist already, if we're registered */
1445 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1446 {
1447 /* Ignore invalid entries */
1448 if (entry->conn == NULL)
1449 continue;
1450
1451 /* hashvalue == 0 means a cache reset, must clear all state */
1452 if (hashvalue == 0 ||
1454 entry->server_hashvalue == hashvalue) ||
1455 (cacheid == USERMAPPINGOID &&
1456 entry->mapping_hashvalue == hashvalue))
1457 {
1458 /*
1459 * Close the connection immediately if it's not used yet in this
1460 * transaction. Otherwise mark it as invalid so that
1461 * pgfdw_xact_callback() can close it at the end of this
1462 * transaction.
1463 */
1464 if (entry->xact_depth == 0)
1465 {
1466 elog(DEBUG3, "discarding connection %p", entry->conn);
1467 disconnect_pg_server(entry);
1468 }
1469 else
1470 entry->invalidated = true;
1471 }
1472 }
1473}
1474
1475/*
1476 * Raise an error if the given connection cache entry is marked as being
1477 * in the middle of an xact state change. This should be called at which no
1478 * such change is expected to be in progress; if one is found to be in
1479 * progress, it means that we aborted in the middle of a previous state change
1480 * and now don't know what the remote transaction state actually is.
1481 * Such connections can't safely be further used. Re-establishing the
1482 * connection would change the snapshot and roll back any writes already
1483 * performed, so that's not an option, either. Thus, we must abort.
1484 *
1485 * Note: there might be open cursors that use the connection, so even if the
1486 * connection cache entry is marked as such, we will retain it until abort
1487 * cleanup of the main transaction, to ensure such open cursors can safely
1488 * refer to the PGconn for the connection.
1489 */
1490static void
1492{
1493 ForeignServer *server;
1494
1495 /* nothing to do for inactive entries and entries of sane state */
1496 if (entry->conn == NULL || !entry->changing_xact_state)
1497 return;
1498
1499 /* find server name to be shown in the message below */
1500 server = GetForeignServer(entry->serverid);
1501
1502 ereport(ERROR,
1504 errmsg("connection to server \"%s\" cannot be used due to abort cleanup failure",
1505 server->servername)));
1506}
1507
1508/*
1509 * Reset state to show we're out of a (sub)transaction.
1510 */
1511static void
1513{
1514 if (toplevel)
1515 {
1516 /* Reset state to show we're out of a transaction */
1517 entry->xact_depth = 0;
1518
1519 /* Reset xact r/o state */
1520 entry->xact_read_only = false;
1521
1522 /*
1523 * If the connection isn't in a good idle state, it is marked as
1524 * invalid or keep_connections option of its server is disabled, then
1525 * discard it to recover. Next GetConnection will open a new
1526 * connection.
1527 */
1528 if (PQstatus(entry->conn) != CONNECTION_OK ||
1530 entry->changing_xact_state ||
1531 entry->invalidated ||
1532 !entry->keep_connections)
1533 {
1534 elog(DEBUG3, "discarding connection %p", entry->conn);
1535 disconnect_pg_server(entry);
1536 }
1537 }
1538 else
1539 {
1540 /* Reset state to show we're out of a subtransaction */
1541 entry->xact_depth--;
1542
1543 /* If in read_only_level, reset xact r/o state */
1544 if (entry->xact_depth + 1 == read_only_level)
1545 entry->xact_read_only = false;
1546 }
1547}
1548
1549/*
1550 * Cancel the currently-in-progress query (whose query text we do not have)
1551 * and ignore the result. Returns true if we successfully cancel the query
1552 * and discard any pending result, and false if not.
1553 *
1554 * It's not a huge problem if we throw an ERROR here, but if we get into error
1555 * recursion trouble, we'll end up slamming the connection shut, which will
1556 * necessitate failing the entire toplevel transaction even if subtransactions
1557 * were used. Try to use WARNING where we can.
1558 *
1559 * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1560 * query text from the pendingAreq saved in the per-connection state, then
1561 * report the query using it.
1562 */
1563static bool
1565{
1569
1570 /*
1571 * If it takes too long to cancel the query and discard the result, assume
1572 * the connection is dead.
1573 */
1575
1576 /*
1577 * Also, lose patience and re-issue the cancel request after a little bit.
1578 * (This serves to close some race conditions.)
1579 */
1581
1583 return false;
1585}
1586
1587/*
1588 * Submit a cancel request to the given connection, waiting only until
1589 * the given time.
1590 *
1591 * We sleep interruptibly until we receive confirmation that the cancel
1592 * request has been accepted, and if it is, return true; if the timeout
1593 * lapses without that, or the request fails for whatever reason, return
1594 * false.
1595 */
1596static bool
1598{
1599 const char *errormsg = libpqsrv_cancel(conn, endtime);
1600
1601 if (errormsg != NULL)
1604 errmsg("could not send cancel request: %s", errormsg));
1605
1606 return errormsg == NULL;
1607}
1608
1609static bool
1612{
1614 bool timed_out;
1615
1616 /*
1617 * If requested, consume whatever data is available from the socket. (Note
1618 * that if all data is available, this allows pgfdw_get_cleanup_result to
1619 * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1620 * which would be large compared to the overhead of PQconsumeInput.)
1621 */
1623 {
1626 errmsg("could not get result of cancel request: %s",
1628 return false;
1629 }
1630
1631 /* Get and discard the result of the query. */
1633 &result, &timed_out))
1634 {
1635 if (timed_out)
1637 (errmsg("could not get result of cancel request due to timeout")));
1638 else
1641 errmsg("could not get result of cancel request: %s",
1643
1644 return false;
1645 }
1646 PQclear(result);
1647
1648 return true;
1649}
1650
1651/*
1652 * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1653 * result. If the query is executed without error, the return value is true.
1654 * If the query is executed successfully but returns an error, the return
1655 * value is true if and only if ignore_errors is set. If the query can't be
1656 * sent or times out, the return value is false.
1657 *
1658 * It's not a huge problem if we throw an ERROR here, but if we get into error
1659 * recursion trouble, we'll end up slamming the connection shut, which will
1660 * necessitate failing the entire toplevel transaction even if subtransactions
1661 * were used. Try to use WARNING where we can.
1662 */
1663static bool
1665{
1667
1668 /*
1669 * If it takes too long to execute a cleanup query, assume the connection
1670 * is dead. It's fairly likely that this is why we aborted in the first
1671 * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1672 * be too long.
1673 */
1676
1678 return false;
1680 false, ignore_errors);
1681}
1682
1683static bool
1685{
1686 Assert(query != NULL);
1687
1688 /*
1689 * Submit a query. Since we don't use non-blocking mode, this also can
1690 * block. But its risk is relatively small, so we ignore that for now.
1691 */
1692 if (!PQsendQuery(conn, query))
1693 {
1694 pgfdw_report(WARNING, NULL, conn, query);
1695 return false;
1696 }
1697
1698 return true;
1699}
1700
1701static bool
1704 bool ignore_errors)
1705{
1707 bool timed_out;
1708
1709 Assert(query != NULL);
1710
1711 /*
1712 * If requested, consume whatever data is available from the socket. (Note
1713 * that if all data is available, this allows pgfdw_get_cleanup_result to
1714 * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1715 * which would be large compared to the overhead of PQconsumeInput.)
1716 */
1718 {
1719 pgfdw_report(WARNING, NULL, conn, query);
1720 return false;
1721 }
1722
1723 /* Get the result of the query. */
1725 {
1726 if (timed_out)
1728 (errmsg("could not get query result due to timeout"),
1729 errcontext("remote SQL command: %s", query)));
1730 else
1731 pgfdw_report(WARNING, NULL, conn, query);
1732
1733 return false;
1734 }
1735
1736 /* Issue a warning if not successful. */
1738 {
1739 pgfdw_report(WARNING, result, conn, query);
1740 return ignore_errors;
1741 }
1742 PQclear(result);
1743
1744 return true;
1745}
1746
1747/*
1748 * Get, during abort cleanup, the result of a query that is in progress.
1749 * This might be a query that is being interrupted by a cancel request or by
1750 * transaction abort, or it might be a query that was initiated as part of
1751 * transaction abort to get the remote side back to the appropriate state.
1752 *
1753 * endtime is the time at which we should give up and assume the remote side
1754 * is dead. retrycanceltime is the time at which we should issue a fresh
1755 * cancel request (pass the same value as endtime if this is not wanted).
1756 *
1757 * Returns true if the timeout expired or connection trouble occurred,
1758 * false otherwise. Sets *result except in case of a true result.
1759 * Sets *timed_out to true only when the timeout expired.
1760 */
1761static bool
1764 PGresult **result,
1765 bool *timed_out)
1766{
1767 bool failed = false;
1768 PGresult *last_res = NULL;
1770
1771 *result = NULL;
1772 *timed_out = false;
1773 for (;;)
1774 {
1775 PGresult *res;
1776
1777 while (PQisBusy(conn))
1778 {
1779 int wc;
1781 long cur_timeout;
1782
1783 /* If timeout has expired, give up. */
1784 if (now >= endtime)
1785 {
1786 *timed_out = true;
1787 failed = true;
1788 goto exit;
1789 }
1790
1791 /* If we need to re-issue the cancel request, do that. */
1792 if (now >= retrycanceltime)
1793 {
1794 /* We ignore failure to issue the repeated request. */
1796
1797 /* Recompute "now" in case that took measurable time. */
1799
1800 /* Adjust re-cancel timeout in increasing steps. */
1802 canceldelta);
1804 }
1805
1806 /* If timeout has expired, give up, else get sleep time. */
1808 Min(endtime,
1810 if (cur_timeout <= 0)
1811 {
1812 *timed_out = true;
1813 failed = true;
1814 goto exit;
1815 }
1816
1817 /* first time, allocate or get the custom wait event */
1818 if (pgfdw_we_cleanup_result == 0)
1819 pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1820
1821 /* Sleep until there's something to do */
1825 PQsocket(conn),
1828
1830
1831 /* Data available in socket? */
1832 if (wc & WL_SOCKET_READABLE)
1833 {
1834 if (!PQconsumeInput(conn))
1835 {
1836 /* connection trouble */
1837 failed = true;
1838 goto exit;
1839 }
1840 }
1841 }
1842
1843 res = PQgetResult(conn);
1844 if (res == NULL)
1845 break; /* query is complete */
1846
1847 PQclear(last_res);
1848 last_res = res;
1849 }
1850exit:
1851 if (failed)
1852 PQclear(last_res);
1853 else
1854 *result = last_res;
1855 return failed;
1856}
1857
1858/*
1859 * Abort remote transaction or subtransaction.
1860 *
1861 * "toplevel" should be set to true if toplevel (main) transaction is
1862 * rollbacked, false otherwise.
1863 *
1864 * Set entry->changing_xact_state to false on success, true on failure.
1865 */
1866static void
1868{
1869 char sql[100];
1870
1871 /*
1872 * Don't try to clean up the connection if we're already in error
1873 * recursion trouble.
1874 */
1876 entry->changing_xact_state = true;
1877
1878 /*
1879 * If connection is already unsalvageable, don't touch it further.
1880 */
1881 if (entry->changing_xact_state)
1882 return;
1883
1884 /*
1885 * Mark this connection as in the process of changing transaction state.
1886 */
1887 entry->changing_xact_state = true;
1888
1889 /* Assume we might have lost track of prepared statements */
1890 entry->have_error = true;
1891
1892 /*
1893 * If a command has been submitted to the remote server by using an
1894 * asynchronous execution function, the command might not have yet
1895 * completed. Check to see if a command is still being processed by the
1896 * remote server, and if so, request cancellation of the command.
1897 */
1898 if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1899 !pgfdw_cancel_query(entry->conn))
1900 return; /* Unable to cancel running query */
1901
1902 CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1903 if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1904 return; /* Unable to abort remote (sub)transaction */
1905
1906 if (toplevel)
1907 {
1908 if (entry->have_prep_stmt && entry->have_error &&
1910 "DEALLOCATE ALL",
1911 true))
1912 return; /* Trouble clearing prepared statements */
1913
1914 entry->have_prep_stmt = false;
1915 entry->have_error = false;
1916 }
1917
1918 /*
1919 * If pendingAreq of the per-connection state is not NULL, it means that
1920 * an asynchronous fetch begun by fetch_more_data_begin() was not done
1921 * successfully and thus the per-connection state was not reset in
1922 * fetch_more_data(); in that case reset the per-connection state here.
1923 */
1924 if (entry->state.pendingAreq)
1925 memset(&entry->state, 0, sizeof(entry->state));
1926
1927 /* Disarm changing_xact_state if it all worked */
1928 entry->changing_xact_state = false;
1929}
1930
1931/*
1932 * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1933 * don't wait for the result.
1934 *
1935 * Returns true if the abort command or cancel request is successfully issued,
1936 * false otherwise. If the abort command is successfully issued, the given
1937 * connection cache entry is appended to *pending_entries. Otherwise, if the
1938 * cancel request is successfully issued, it is appended to *cancel_requested.
1939 */
1940static bool
1943{
1944 /*
1945 * Don't try to clean up the connection if we're already in error
1946 * recursion trouble.
1947 */
1949 entry->changing_xact_state = true;
1950
1951 /*
1952 * If connection is already unsalvageable, don't touch it further.
1953 */
1954 if (entry->changing_xact_state)
1955 return false;
1956
1957 /*
1958 * Mark this connection as in the process of changing transaction state.
1959 */
1960 entry->changing_xact_state = true;
1961
1962 /* Assume we might have lost track of prepared statements */
1963 entry->have_error = true;
1964
1965 /*
1966 * If a command has been submitted to the remote server by using an
1967 * asynchronous execution function, the command might not have yet
1968 * completed. Check to see if a command is still being processed by the
1969 * remote server, and if so, request cancellation of the command.
1970 */
1972 {
1974
1978 return false; /* Unable to cancel running query */
1980 }
1981 else
1982 {
1983 char sql[100];
1984
1985 CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1986 if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1987 return false; /* Unable to abort remote transaction */
1989 }
1990
1991 return true;
1992}
1993
1994/*
1995 * Finish pre-commit cleanup of connections on each of which we've sent a
1996 * COMMIT command to the remote server.
1997 */
1998static void
2000{
2001 ConnCacheEntry *entry;
2003 ListCell *lc;
2004
2006
2007 /*
2008 * Get the result of the COMMIT command for each of the pending entries
2009 */
2010 foreach(lc, pending_entries)
2011 {
2012 entry = (ConnCacheEntry *) lfirst(lc);
2013
2015
2016 /*
2017 * We might already have received the result on the socket, so pass
2018 * consume_input=true to try to consume it first
2019 */
2020 do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
2021 entry->changing_xact_state = false;
2022
2023 /* Do a DEALLOCATE ALL in parallel if needed */
2024 if (entry->have_prep_stmt && entry->have_error)
2025 {
2026 /* Ignore errors (see notes in pgfdw_xact_callback) */
2027 if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
2028 {
2030 continue;
2031 }
2032 }
2033 entry->have_prep_stmt = false;
2034 entry->have_error = false;
2035
2036 pgfdw_reset_xact_state(entry, true);
2037 }
2038
2039 /* No further work if no pending entries */
2040 if (!pending_deallocs)
2041 return;
2042
2043 /*
2044 * Get the result of the DEALLOCATE command for each of the pending
2045 * entries
2046 */
2047 foreach(lc, pending_deallocs)
2048 {
2049 PGresult *res;
2050
2051 entry = (ConnCacheEntry *) lfirst(lc);
2052
2053 /* Ignore errors (see notes in pgfdw_xact_callback) */
2054 while ((res = PQgetResult(entry->conn)) != NULL)
2055 {
2056 PQclear(res);
2057 /* Stop if the connection is lost (else we'll loop infinitely) */
2058 if (PQstatus(entry->conn) == CONNECTION_BAD)
2059 break;
2060 }
2061 entry->have_prep_stmt = false;
2062 entry->have_error = false;
2063
2064 pgfdw_reset_xact_state(entry, true);
2065 }
2066}
2067
2068/*
2069 * Finish pre-subcommit cleanup of connections on each of which we've sent a
2070 * RELEASE command to the remote server.
2071 */
2072static void
2074{
2075 ConnCacheEntry *entry;
2076 char sql[100];
2077 ListCell *lc;
2078
2080
2081 /*
2082 * Get the result of the RELEASE command for each of the pending entries
2083 */
2084 snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
2085 foreach(lc, pending_entries)
2086 {
2087 entry = (ConnCacheEntry *) lfirst(lc);
2088
2090
2091 /*
2092 * We might already have received the result on the socket, so pass
2093 * consume_input=true to try to consume it first
2094 */
2095 do_sql_command_end(entry->conn, sql, true);
2096 entry->changing_xact_state = false;
2097
2098 pgfdw_reset_xact_state(entry, false);
2099 }
2100}
2101
2102/*
2103 * Finish abort cleanup of connections on each of which we've sent an abort
2104 * command or cancel request to the remote server.
2105 */
2106static void
2108 bool toplevel)
2109{
2111 ListCell *lc;
2112
2113 /*
2114 * For each of the pending cancel requests (if any), get and discard the
2115 * result of the query, and submit an abort command to the remote server.
2116 */
2117 if (cancel_requested)
2118 {
2119 foreach(lc, cancel_requested)
2120 {
2121 ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2125 char sql[100];
2126
2128
2129 /*
2130 * Set end time. You might think we should do this before issuing
2131 * cancel request like in normal mode, but that is problematic,
2132 * because if, for example, it took longer than 30 seconds to
2133 * process the first few entries in the cancel_requested list, it
2134 * would cause a timeout error when processing each of the
2135 * remaining entries in the list, leading to slamming that entry's
2136 * connection shut.
2137 */
2142
2143 if (!pgfdw_cancel_query_end(entry->conn, endtime,
2144 retrycanceltime, true))
2145 {
2146 /* Unable to cancel running query */
2147 pgfdw_reset_xact_state(entry, toplevel);
2148 continue;
2149 }
2150
2151 /* Send an abort command in parallel if needed */
2152 CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2153 if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
2154 {
2155 /* Unable to abort remote (sub)transaction */
2156 pgfdw_reset_xact_state(entry, toplevel);
2157 }
2158 else
2160 }
2161 }
2162
2163 /* No further work if no pending entries */
2164 if (!pending_entries)
2165 return;
2166
2167 /*
2168 * Get the result of the abort command for each of the pending entries
2169 */
2170 foreach(lc, pending_entries)
2171 {
2172 ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2174 char sql[100];
2175
2177
2178 /*
2179 * Set end time. We do this now, not before issuing the command like
2180 * in normal mode, for the same reason as for the cancel_requested
2181 * entries.
2182 */
2185
2186 CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2187 if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
2188 true, false))
2189 {
2190 /* Unable to abort remote (sub)transaction */
2191 pgfdw_reset_xact_state(entry, toplevel);
2192 continue;
2193 }
2194
2195 if (toplevel)
2196 {
2197 /* Do a DEALLOCATE ALL in parallel if needed */
2198 if (entry->have_prep_stmt && entry->have_error)
2199 {
2201 "DEALLOCATE ALL"))
2202 {
2203 /* Trouble clearing prepared statements */
2204 pgfdw_reset_xact_state(entry, toplevel);
2205 }
2206 else
2208 continue;
2209 }
2210 entry->have_prep_stmt = false;
2211 entry->have_error = false;
2212 }
2213
2214 /* Reset the per-connection state if needed */
2215 if (entry->state.pendingAreq)
2216 memset(&entry->state, 0, sizeof(entry->state));
2217
2218 /* We're done with this entry; unset the changing_xact_state flag */
2219 entry->changing_xact_state = false;
2220 pgfdw_reset_xact_state(entry, toplevel);
2221 }
2222
2223 /* No further work if no pending entries */
2224 if (!pending_deallocs)
2225 return;
2226 Assert(toplevel);
2227
2228 /*
2229 * Get the result of the DEALLOCATE command for each of the pending
2230 * entries
2231 */
2232 foreach(lc, pending_deallocs)
2233 {
2234 ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2236
2238 Assert(entry->have_prep_stmt);
2239 Assert(entry->have_error);
2240
2241 /*
2242 * Set end time. We do this now, not before issuing the command like
2243 * in normal mode, for the same reason as for the cancel_requested
2244 * entries.
2245 */
2248
2249 if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
2250 endtime, true, true))
2251 {
2252 /* Trouble clearing prepared statements */
2253 pgfdw_reset_xact_state(entry, toplevel);
2254 continue;
2255 }
2256 entry->have_prep_stmt = false;
2257 entry->have_error = false;
2258
2259 /* Reset the per-connection state if needed */
2260 if (entry->state.pendingAreq)
2261 memset(&entry->state, 0, sizeof(entry->state));
2262
2263 /* We're done with this entry; unset the changing_xact_state flag */
2264 entry->changing_xact_state = false;
2265 pgfdw_reset_xact_state(entry, toplevel);
2266 }
2267}
2268
2269/* Number of output arguments (columns) for various API versions */
2270#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
2271#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 6
2272#define POSTGRES_FDW_GET_CONNECTIONS_COLS 6 /* maximum of above */
2273
2274/*
2275 * Internal function used by postgres_fdw_get_connections variants.
2276 *
2277 * For API version 1.1, this function takes no input parameter and
2278 * returns a set of records with the following values:
2279 *
2280 * - server_name - server name of active connection. In case the foreign server
2281 * is dropped but still the connection is active, then the server name will
2282 * be NULL in output.
2283 * - valid - true/false representing whether the connection is valid or not.
2284 * Note that connections can become invalid in pgfdw_inval_callback.
2285 *
2286 * For API version 1.2 and later, this function takes an input parameter
2287 * to check a connection status and returns the following
2288 * additional values along with the four values from version 1.1:
2289 *
2290 * - user_name - the local user name of the active connection. In case the
2291 * user mapping is dropped but the connection is still active, then the
2292 * user name will be NULL in the output.
2293 * - used_in_xact - true if the connection is used in the current transaction.
2294 * - closed - true if the connection is closed.
2295 * - remote_backend_pid - process ID of the remote backend, on the foreign
2296 * server, handling the connection.
2297 *
2298 * No records are returned when there are no cached connections at all.
2299 */
2300static void
2302 enum pgfdwVersion api_version)
2303{
2305 HASH_SEQ_STATUS scan;
2306 ConnCacheEntry *entry;
2307
2308 InitMaterializedSRF(fcinfo, 0);
2309
2310 /* If cache doesn't exist, we return no records */
2311 if (!ConnectionHash)
2312 return;
2313
2314 /* Check we have the expected number of output arguments */
2315 switch (rsinfo->setDesc->natts)
2316 {
2318 if (api_version != PGFDW_V1_1)
2319 elog(ERROR, "incorrect number of output arguments");
2320 break;
2322 if (api_version != PGFDW_V1_2)
2323 elog(ERROR, "incorrect number of output arguments");
2324 break;
2325 default:
2326 elog(ERROR, "incorrect number of output arguments");
2327 }
2328
2330 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2331 {
2332 ForeignServer *server;
2334 bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2335 int i = 0;
2336
2337 /* We only look for open remote connections */
2338 if (!entry->conn)
2339 continue;
2340
2342
2343 /*
2344 * The foreign server may have been dropped in current explicit
2345 * transaction. It is not possible to drop the server from another
2346 * session when the connection associated with it is in use in the
2347 * current transaction, if tried so, the drop query in another session
2348 * blocks until the current transaction finishes.
2349 *
2350 * Even though the server is dropped in the current transaction, the
2351 * cache can still have associated active connection entry, say we
2352 * call such connections dangling. Since we can not fetch the server
2353 * name from system catalogs for dangling connections, instead we show
2354 * NULL value for server name in output.
2355 *
2356 * We could have done better by storing the server name in the cache
2357 * entry instead of server oid so that it could be used in the output.
2358 * But the server name in each cache entry requires 64 bytes of
2359 * memory, which is huge, when there are many cached connections and
2360 * the use case i.e. dropping the foreign server within the explicit
2361 * current transaction seems rare. So, we chose to show NULL value for
2362 * server name in output.
2363 *
2364 * Such dangling connections get closed either in next use or at the
2365 * end of current explicit transaction in pgfdw_xact_callback.
2366 */
2367 if (!server)
2368 {
2369 /*
2370 * If the server has been dropped in the current explicit
2371 * transaction, then this entry would have been invalidated in
2372 * pgfdw_inval_callback at the end of drop server command. Note
2373 * that this connection would not have been closed in
2374 * pgfdw_inval_callback because it is still being used in the
2375 * current explicit transaction. So, assert that here.
2376 */
2377 Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2378
2379 /* Show null, if no server name was found */
2380 nulls[i++] = true;
2381 }
2382 else
2383 values[i++] = CStringGetTextDatum(server->servername);
2384
2385 if (api_version >= PGFDW_V1_2)
2386 {
2387 HeapTuple tp;
2388
2389 /* Use the system cache to obtain the user mapping */
2391
2392 /*
2393 * Just like in the foreign server case, user mappings can also be
2394 * dropped in the current explicit transaction. Therefore, the
2395 * similar check as in the server case is required.
2396 */
2397 if (!HeapTupleIsValid(tp))
2398 {
2399 /*
2400 * If we reach here, this entry must have been invalidated in
2401 * pgfdw_inval_callback, same as in the server case.
2402 */
2403 Assert(entry->conn && entry->xact_depth > 0 &&
2404 entry->invalidated);
2405
2406 nulls[i++] = true;
2407 }
2408 else
2409 {
2410 Oid userid;
2411
2412 userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
2414 ReleaseSysCache(tp);
2415 }
2416 }
2417
2418 values[i++] = BoolGetDatum(!entry->invalidated);
2419
2420 if (api_version >= PGFDW_V1_2)
2421 {
2422 bool check_conn = PG_GETARG_BOOL(0);
2423
2424 /* Is this connection used in the current transaction? */
2425 values[i++] = BoolGetDatum(entry->xact_depth > 0);
2426
2427 /*
2428 * If a connection status check is requested and supported, return
2429 * whether the connection is closed. Otherwise, return NULL.
2430 */
2432 values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
2433 else
2434 nulls[i++] = true;
2435
2436 /* Return process ID of remote backend */
2437 values[i++] = Int32GetDatum(PQbackendPID(entry->conn));
2438 }
2439
2440 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2441 }
2442}
2443
2444/*
2445 * Values in connection strings must be enclosed in single quotes. Single
2446 * quotes and backslashes must be escaped with backslash. NB: these rules are
2447 * different from the rules for escaping a SQL literal.
2448 */
2449static void
2451{
2453 for (int i = 0; val[i] != '\0'; i++)
2454 {
2455 if (val[i] == '\\' || val[i] == '\'')
2458 }
2460}
2461
2462Datum
2464{
2465 Oid userid = PG_GETARG_OID(0);
2466 Oid serverid = PG_GETARG_OID(1);
2467 ForeignServer *server = GetForeignServer(serverid);
2468 UserMapping *user = GetUserMapping(userid, serverid);
2470 const char **keywords;
2471 const char **values;
2472 char *appname;
2473 char *sep = "";
2474
2475 construct_connection_params(server, user, &keywords, &values, &appname);
2476
2478 for (int i = 0; keywords[i] != NULL; i++)
2479 {
2480 if (values[i] == NULL)
2481 continue;
2482 appendStringInfo(&str, "%s%s = ", sep, keywords[i]);
2484 sep = " ";
2485 }
2486
2487 if (appname != NULL)
2488 pfree(appname);
2489 pfree(keywords);
2490 pfree(values);
2492}
2493
2494/*
2495 * List active foreign server connections.
2496 *
2497 * The SQL API of this function has changed multiple times, and will likely
2498 * do so again in future. To support the case where a newer version of this
2499 * loadable module is being used with an old SQL declaration of the function,
2500 * we continue to support the older API versions.
2501 */
2502Datum
2509
2510Datum
2517
2518/*
2519 * Disconnect the specified cached connections.
2520 *
2521 * This function discards the open connections that are established by
2522 * postgres_fdw from the local session to the foreign server with
2523 * the given name. Note that there can be multiple connections to
2524 * the given server using different user mappings. If the connections
2525 * are used in the current local transaction, they are not disconnected
2526 * and warning messages are reported. This function returns true
2527 * if it disconnects at least one connection, otherwise false. If no
2528 * foreign server with the given name is found, an error is reported.
2529 */
2530Datum
2532{
2533 ForeignServer *server;
2534 char *servername;
2535
2536 servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2537 server = GetForeignServerByName(servername, false);
2538
2540}
2541
2542/*
2543 * Disconnect all the cached connections.
2544 *
2545 * This function discards all the open connections that are established by
2546 * postgres_fdw from the local session to the foreign servers.
2547 * If the connections are used in the current local transaction, they are
2548 * not disconnected and warning messages are reported. This function
2549 * returns true if it disconnects at least one connection, otherwise false.
2550 */
2551Datum
2556
2557/*
2558 * Workhorse to disconnect cached connections.
2559 *
2560 * This function scans all the connection cache entries and disconnects
2561 * the open connections whose foreign server OID matches with
2562 * the specified one. If InvalidOid is specified, it disconnects all
2563 * the cached connections.
2564 *
2565 * This function emits a warning for each connection that's used in
2566 * the current transaction and doesn't close it. It returns true if
2567 * it disconnects at least one connection, otherwise false.
2568 *
2569 * Note that this function disconnects even the connections that are
2570 * established by other users in the same local session using different
2571 * user mappings. This leads even non-superuser to be able to close
2572 * the connections established by superusers in the same local session.
2573 *
2574 * XXX As of now we don't see any security risk doing this. But we should
2575 * set some restrictions on that, for example, prevent non-superuser
2576 * from closing the connections established by superusers even
2577 * in the same session?
2578 */
2579static bool
2581{
2582 HASH_SEQ_STATUS scan;
2583 ConnCacheEntry *entry;
2584 bool all = !OidIsValid(serverid);
2585 bool result = false;
2586
2587 /*
2588 * Connection cache hashtable has not been initialized yet in this
2589 * session, so return false.
2590 */
2591 if (!ConnectionHash)
2592 return false;
2593
2595 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2596 {
2597 /* Ignore cache entry if no open connection right now. */
2598 if (!entry->conn)
2599 continue;
2600
2601 if (all || entry->serverid == serverid)
2602 {
2603 /*
2604 * Emit a warning because the connection to close is used in the
2605 * current transaction and cannot be disconnected right now.
2606 */
2607 if (entry->xact_depth > 0)
2608 {
2609 ForeignServer *server;
2610
2611 server = GetForeignServerExtended(entry->serverid,
2613
2614 if (!server)
2615 {
2616 /*
2617 * If the foreign server was dropped while its connection
2618 * was used in the current transaction, the connection
2619 * must have been marked as invalid by
2620 * pgfdw_inval_callback at the end of DROP SERVER command.
2621 */
2622 Assert(entry->invalidated);
2623
2625 (errmsg("cannot close dropped server connection because it is still in use")));
2626 }
2627 else
2629 (errmsg("cannot close connection for server \"%s\" because it is still in use",
2630 server->servername)));
2631 }
2632 else
2633 {
2634 elog(DEBUG3, "discarding connection %p", entry->conn);
2635 disconnect_pg_server(entry);
2636 result = true;
2637 }
2638 }
2639 }
2640
2641 return result;
2642}
2643
2644/*
2645 * Check if the remote server closed the connection.
2646 *
2647 * Returns 1 if the connection is closed, -1 if an error occurred,
2648 * and 0 if it's not closed or if the connection check is unavailable
2649 * on this platform.
2650 */
2651static int
2653{
2654 int sock = PQsocket(conn);
2655
2656 if (PQstatus(conn) != CONNECTION_OK || sock == -1)
2657 return -1;
2658
2659#if (defined(HAVE_POLL) && defined(POLLRDHUP))
2660 {
2661 struct pollfd input_fd;
2662 int result;
2663
2664 input_fd.fd = sock;
2665 input_fd.events = POLLRDHUP;
2666 input_fd.revents = 0;
2667
2668 do
2669 result = poll(&input_fd, 1, 0);
2670 while (result < 0 && errno == EINTR);
2671
2672 if (result < 0)
2673 return -1;
2674
2675 return (input_fd.revents &
2676 (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
2677 }
2678#else
2679 return 0;
2680#endif
2681}
2682
2683/*
2684 * Check if connection status checking is available on this platform.
2685 *
2686 * Returns true if available, false otherwise.
2687 */
2688static bool
2690{
2691#if (defined(HAVE_POLL) && defined(POLLRDHUP))
2692 return true;
2693#else
2694 return false;
2695#endif
2696}
2697
2698/*
2699 * Ensure that require_auth and SCRAM keys are correctly set on values. SCRAM
2700 * keys used to pass-through are coming from the initial connection from the
2701 * client with the server.
2702 *
2703 * All required SCRAM options are set by postgres_fdw, so we just need to
2704 * ensure that these options are not overwritten by the user.
2705 */
2706static bool
2708{
2709 bool has_scram_server_key = false;
2710 bool has_scram_client_key = false;
2711 bool has_require_auth = false;
2712 bool has_scram_keys = false;
2713
2714 /*
2715 * Continue iterating even if we found the keys that we need to validate
2716 * to make sure that there is no other declaration of these keys that can
2717 * overwrite the first.
2718 */
2719 for (int i = 0; keywords[i] != NULL; i++)
2720 {
2721 if (strcmp(keywords[i], "scram_client_key") == 0)
2722 {
2723 if (values[i] != NULL && values[i][0] != '\0')
2724 has_scram_client_key = true;
2725 else
2726 has_scram_client_key = false;
2727 }
2728
2729 if (strcmp(keywords[i], "scram_server_key") == 0)
2730 {
2731 if (values[i] != NULL && values[i][0] != '\0')
2732 has_scram_server_key = true;
2733 else
2734 has_scram_server_key = false;
2735 }
2736
2737 if (strcmp(keywords[i], "require_auth") == 0)
2738 {
2739 if (values[i] != NULL && strcmp(values[i], "scram-sha-256") == 0)
2740 has_require_auth = true;
2741 else
2742 has_require_auth = false;
2743 }
2744 }
2745
2747
2748 return (has_scram_keys && has_require_auth);
2749}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1765
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
int pg_b64_enc_len(int srclen)
Definition base64.c:224
int pg_b64_encode(const uint8 *src, int len, char *dst, int dstlen)
Definition base64.c:49
bool be_gssapi_get_delegation(Port *port)
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Min(x, y)
Definition c.h:1091
uint32 SubTransactionId
Definition c.h:740
#define Assert(condition)
Definition c.h:943
#define pg_unreachable()
Definition c.h:367
uint32_t uint32
Definition c.h:624
#define OidIsValid(objectId)
Definition c.h:858
uint32 result
Oid ConnCacheKey
Definition connection.c:54
static unsigned int prep_stmt_number
Definition connection.c:85
unsigned int GetCursorNumber(PGconn *conn)
static bool UserMappingPasswordRequired(UserMapping *user)
Definition connection.c:704
Datum postgres_fdw_get_connections(PG_FUNCTION_ARGS)
void do_sql_command(PGconn *conn, const char *sql)
Definition connection.c:842
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2
static void construct_connection_params(ForeignServer *server, UserMapping *user, const char ***p_keywords, const char ***p_values, char **p_appname)
Definition connection.c:495
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition connection.c:689
void ReleaseConnection(PGconn *conn)
static uint32 pgfdw_we_get_result
Definition connection.c:99
static bool UseScramPassthrough(ForeignServer *server, UserMapping *user)
Definition connection.c:720
#define RETRY_CANCEL_TIMEOUT
Definition connection.c:113
PGresult * pgfdw_get_result(PGconn *conn)
void pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
static void pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, bool toplevel)
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
static int pgfdw_conn_check(PGconn *conn)
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1
static void configure_remote_session(PGconn *conn)
Definition connection.c:804
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, TimestampTz retrycanceltime, bool consume_input)
static bool xact_got_connection
Definition connection.c:88
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
static int read_only_level
Definition connection.c:94
void pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
Datum postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
Definition connection.c:856
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
Definition connection.c:116
static bool pgfdw_conn_checkable(void)
static uint32 pgfdw_we_cleanup_result
Definition connection.c:97
static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
static void appendEscapedValue(StringInfo str, const char *val)
static HTAB * ConnectionHash
Definition connection.c:81
static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
static unsigned int cursor_number
Definition connection.c:84
static bool pgfdw_has_required_scram_options(const char **keywords, const char **values)
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition connection.c:379
static void pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
Definition connection.c:446
Datum postgres_fdw_disconnect(PG_FUNCTION_ARGS)
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition connection.c:629
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
Datum postgres_fdw_connection(PG_FUNCTION_ARGS)
unsigned int GetPrepStmtNumber(PGconn *conn)
Datum postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition connection.c:752
static uint32 pgfdw_we_connect
Definition connection.c:98
static void pgfdw_xact_callback(XactEvent event, void *arg)
static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo, enum pgfdwVersion api_version)
static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn, const char *sql)
#define CONNECTION_CLEANUP_TIMEOUT
Definition connection.c:106
static void do_sql_command_begin(PGconn *conn, const char *sql)
Definition connection.c:849
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, TimestampTz retrycanceltime, PGresult **result, bool *timed_out)
static void begin_remote_xact(ConnCacheEntry *entry)
Definition connection.c:890
pgfdwVersion
Definition connection.c:131
@ PGFDW_V1_1
Definition connection.c:132
@ PGFDW_V1_2
Definition connection.c:133
static void pgfdw_inval_callback(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
static bool pgfdw_cancel_query(PGconn *conn)
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
static bool disconnect_cached_connections(Oid serverid)
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition option.c:418
char * pgfdw_application_name
Definition option.c:46
char * process_pgfdw_appname(const char *appname)
Definition option.c:495
int64 TimestampTz
Definition timestamp.h:39
bool defGetBoolean(DefElem *def)
Definition define.c:93
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:889
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:360
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1352
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1317
Datum arg
Definition elog.c:1323
void FreeErrorData(ErrorData *edata)
Definition elog.c:2014
ErrorData * CopyErrorData(void)
Definition elog.c:1942
void FlushErrorState(void)
Definition elog.c:2063
int errcode(int sqlerrcode)
Definition elog.c:875
bool in_error_recursion_trouble(void)
Definition elog.c:306
#define PG_RE_THROW()
Definition elog.h:407
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
#define errcontext
Definition elog.h:200
int errhint(const char *fmt,...) pg_attribute_printf(1
#define DEBUG3
Definition elog.h:29
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define WARNING
Definition elog.h:37
#define PG_END_TRY(...)
Definition elog.h:399
#define ERROR
Definition elog.h:40
#define PG_CATCH(...)
Definition elog.h:384
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition elog.h:58
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
int PQserverVersion(const PGconn *conn)
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
int PQconnectionUsedPassword(const PGconn *conn)
int PQconnectionUsedGSSAPI(const PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
PQnoticeReceiver PQsetNoticeReceiver(PGconn *conn, PQnoticeReceiver proc, void *arg)
int PQbackendPID(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQconsumeInput(PGconn *conn)
Definition fe-exec.c:2001
int PQsendQuery(PGconn *conn, const char *query)
Definition fe-exec.c:1433
int PQisBusy(PGconn *conn)
Definition fe-exec.c:2048
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_OID(n)
Definition fmgr.h:275
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_FUNCTION_INFO_V1(funcname)
Definition fmgr.h:417
#define PG_RETURN_TEXT_P(x)
Definition fmgr.h:374
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition foreign.c:185
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition foreign.c:232
ForeignServer * GetForeignServer(Oid serverid)
Definition foreign.c:114
ForeignServer * GetForeignServerExtended(Oid serverid, uint16 flags)
Definition foreign.c:126
#define MappingUserName(userid)
Definition foreign.h:20
#define FSV_MISSING_OK
Definition foreign.h:62
void InitMaterializedSRF(FunctionCallInfo fcinfo, uint32 flags)
Definition funcapi.c:76
struct Port * MyProcPort
Definition globals.c:53
struct Latch * MyLatch
Definition globals.c:65
const char * str
@ HASH_ENTER
Definition hsearch.h:109
#define HASH_ELEM
Definition hsearch.h:90
#define HASH_BLOBS
Definition hsearch.h:92
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
long val
Definition informix.c:689
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1813
int i
Definition isn.c:77
static const JsonPathKeyword keywords[]
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
void ResetLatch(Latch *latch)
Definition latch.c:374
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
static void libpqsrv_connect_complete(PGconn *conn, uint32 wait_event_info)
static void libpqsrv_notice_receiver(void *arg, const PGresult *res)
static void libpqsrv_disconnect(PGconn *conn)
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
static PGconn * libpqsrv_connect_params_start(const char *const *keywords, const char *const *values, int expand_dbname)
#define PQgetResult
#define PQclear
#define PQresultErrorField
#define PQresultStatus
@ CONNECTION_BAD
Definition libpq-fe.h:91
@ CONNECTION_OK
Definition libpq-fe.h:90
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131
@ PQTRANS_IDLE
Definition libpq-fe.h:153
@ PQTRANS_ACTIVE
Definition libpq-fe.h:154
List * lappend(List *list, void *datum)
Definition list.c:339
const char * GetDatabaseEncodingName(void)
Definition mbutils.c:1395
void pfree(void *pointer)
Definition mcxt.c:1619
void * palloc0(Size size)
Definition mcxt.c:1420
char * pchomp(const char *in)
Definition mcxt.c:1938
void * palloc(Size size)
Definition mcxt.c:1390
MemoryContext CurrentMemoryContext
Definition mcxt.c:161
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
const void size_t len
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
static char * user
Definition pg_regress.c:121
END_CATALOG_STRUCT typedef FormData_pg_user_mapping * Form_pg_user_mapping
#define snprintf
Definition port.h:261
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
#define InvalidOid
unsigned int Oid
#define PG_DIAG_MESSAGE_HINT
#define PG_DIAG_SQLSTATE
#define PG_DIAG_MESSAGE_PRIMARY
#define PG_DIAG_MESSAGE_DETAIL
#define PG_DIAG_CONTEXT
void process_pending_request(AsyncRequest *areq)
static int fb(int x)
tree ctl
Definition radixtree.h:1838
PGconn * GetConnection(void)
Definition streamutil.c:60
PGconn * conn
Definition streamutil.c:52
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
PGconn * conn
Definition connection.c:59
bool have_prep_stmt
Definition connection.c:64
PgFdwConnState state
Definition connection.c:75
ConnCacheKey key
Definition connection.c:58
bool parallel_commit
Definition connection.c:67
uint32 server_hashvalue
Definition connection.c:73
uint32 mapping_hashvalue
Definition connection.c:74
bool keep_connections
Definition connection.c:70
bool parallel_abort
Definition connection.c:68
bool changing_xact_state
Definition connection.c:66
bool xact_read_only
Definition connection.c:63
char * defname
Definition parsenodes.h:860
List * options
Definition foreign.h:43
char * servername
Definition foreign.h:40
Definition pg_list.h:54
AsyncRequest * pendingAreq
uint8 scram_ServerKey[SCRAM_MAX_KEY_LEN]
Definition libpq-be.h:187
bool has_scram_keys
Definition libpq-be.h:188
uint8 scram_ClientKey[SCRAM_MAX_KEY_LEN]
Definition libpq-be.h:186
bool superuser_arg(Oid roleid)
Definition superuser.c:57
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
#define GetSysCacheHashValue1(cacheId, key1)
Definition syscache.h:118
static void start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
Definition task.c:174
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
text * cstring_to_text(const char *s)
Definition varlena.c:184
char * text_to_cstring(const text *t)
Definition varlena.c:217
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition wait_event.c:149
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define EINTR
Definition win32_port.h:361
int GetCurrentTransactionNestLevel(void)
Definition xact.c:931
bool XactDeferrable
Definition xact.c:87
bool XactReadOnly
Definition xact.c:84
int GetTopReadOnlyTransactionNestLevel(void)
Definition xact.c:1062
void RegisterXactCallback(XactCallback callback, void *arg)
Definition xact.c:3855
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition xact.c:3915
SubXactEvent
Definition xact.h:142
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition xact.h:146
@ SUBXACT_EVENT_ABORT_SUB
Definition xact.h:145
XactEvent
Definition xact.h:128
@ XACT_EVENT_PRE_PREPARE
Definition xact.h:136
@ XACT_EVENT_COMMIT
Definition xact.h:129
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition xact.h:135
@ XACT_EVENT_PARALLEL_COMMIT
Definition xact.h:130
@ XACT_EVENT_ABORT
Definition xact.h:131
@ XACT_EVENT_PRE_COMMIT
Definition xact.h:134
@ XACT_EVENT_PARALLEL_ABORT
Definition xact.h:132
@ XACT_EVENT_PREPARE
Definition xact.h:133
#define IsolationIsSerializable()
Definition xact.h:53