PostgreSQL Source Code git master
Loading...
Searching...
No Matches
connection.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * connection.c
4 * Connection management functions for postgres_fdw
5 *
6 * Portions Copyright (c) 2012-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/connection.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#if HAVE_POLL_H
16#include <poll.h>
17#endif
18
19#include "access/htup_details.h"
20#include "access/xact.h"
22#include "commands/defrem.h"
23#include "common/base64.h"
24#include "funcapi.h"
25#include "libpq/libpq-be.h"
27#include "mb/pg_wchar.h"
28#include "miscadmin.h"
29#include "pgstat.h"
30#include "postgres_fdw.h"
31#include "storage/latch.h"
32#include "utils/builtins.h"
33#include "utils/hsearch.h"
34#include "utils/inval.h"
35#include "utils/syscache.h"
36
37/*
38 * Connection cache hash table entry
39 *
40 * The lookup key in this hash table is the user mapping OID. We use just one
41 * connection per user mapping ID, which ensures that all the scans use the
42 * same snapshot during a query. Using the user mapping OID rather than
43 * the foreign server OID + user OID avoids creating multiple connections when
44 * the public user mapping applies to all user OIDs.
45 *
46 * The "conn" pointer can be NULL if we don't currently have a live connection.
47 * When we do have a connection, xact_depth tracks the current depth of
48 * transactions and subtransactions open on the remote side. We need to issue
49 * commands at the same nesting depth on the remote as we're executing at
50 * ourselves, so that rolling back a subtransaction will kill the right
51 * queries and not the wrong ones.
52 */
54
55typedef struct ConnCacheEntry
56{
57 ConnCacheKey key; /* hash key (must be first) */
58 PGconn *conn; /* connection to foreign server, or NULL */
59 /* Remaining fields are invalid when conn is NULL: */
60 int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
61 * one level of subxact open, etc */
62 bool have_prep_stmt; /* have we prepared any stmts in this xact? */
63 bool have_error; /* have any subxacts aborted in this xact? */
64 bool changing_xact_state; /* xact state change in process */
65 bool parallel_commit; /* do we commit (sub)xacts in parallel? */
66 bool parallel_abort; /* do we abort (sub)xacts in parallel? */
67 bool invalidated; /* true if reconnect is pending */
68 bool keep_connections; /* setting value of keep_connections
69 * server option */
70 Oid serverid; /* foreign server OID used to get server name */
71 uint32 server_hashvalue; /* hash value of foreign server OID */
72 uint32 mapping_hashvalue; /* hash value of user mapping OID */
73 PgFdwConnState state; /* extra per-connection state */
75
76/*
77 * Connection cache (initialized on first use)
78 */
80
81/* for assigning cursor numbers and prepared statement numbers */
82static unsigned int cursor_number = 0;
83static unsigned int prep_stmt_number = 0;
84
85/* tracks whether any work is needed in callback functions */
86static bool xact_got_connection = false;
87
88/* custom wait event values, retrieved from shared memory */
92
93/*
94 * Milliseconds to wait to cancel an in-progress query or execute a cleanup
95 * query; if it takes longer than 30 seconds to do these, we assume the
96 * connection is dead.
97 */
98#define CONNECTION_CLEANUP_TIMEOUT 30000
99
100/*
101 * Milliseconds to wait before issuing another cancel request. This covers
102 * the race condition where the remote session ignored our cancel request
103 * because it arrived while idle.
104 */
105#define RETRY_CANCEL_TIMEOUT 1000
106
107/* Macro for constructing abort command to be sent */
108#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
109 do { \
110 if (toplevel) \
111 snprintf((sql), sizeof(sql), \
112 "ABORT TRANSACTION"); \
113 else \
114 snprintf((sql), sizeof(sql), \
115 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
116 (entry)->xact_depth, (entry)->xact_depth); \
117 } while(0)
118
119/*
120 * Extension version number, for supporting older extension versions' objects
121 */
127
128/*
129 * SQL functions
130 */
135
136/* prototypes of private functions */
139static void disconnect_pg_server(ConnCacheEntry *entry);
140static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
142static void do_sql_command_begin(PGconn *conn, const char *sql);
143static void do_sql_command_end(PGconn *conn, const char *sql,
144 bool consume_input);
145static void begin_remote_xact(ConnCacheEntry *entry);
146static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
147 const char *sql);
148static void pgfdw_xact_callback(XactEvent event, void *arg);
149static void pgfdw_subxact_callback(SubXactEvent event,
152 void *arg);
154 uint32 hashvalue);
156static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
157static bool pgfdw_cancel_query(PGconn *conn);
161 bool consume_input);
162static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
163 bool ignore_errors);
164static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
165static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
167 bool consume_input,
168 bool ignore_errors);
171 PGresult **result, bool *timed_out);
172static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
173static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
178 int curlevel);
181 bool toplevel);
182static void pgfdw_security_check(const char **keywords, const char **values,
186static bool disconnect_cached_connections(Oid serverid);
188 enum pgfdwVersion api_version);
189static int pgfdw_conn_check(PGconn *conn);
190static bool pgfdw_conn_checkable(void);
191static bool pgfdw_has_required_scram_options(const char **keywords, const char **values);
192
193/*
194 * Get a PGconn which can be used to execute queries on the remote PostgreSQL
195 * server with the user's authorization. A new connection is established
196 * if we don't already have a suitable one, and a transaction is opened at
197 * the right subtransaction nesting depth if we didn't do that already.
198 *
199 * will_prep_stmt must be true if caller intends to create any prepared
200 * statements. Since those don't go away automatically at transaction end
201 * (not even on error), we need this flag to cue manual cleanup.
202 *
203 * If state is not NULL, *state receives the per-connection state associated
204 * with the PGconn.
205 */
206PGconn *
208{
209 bool found;
210 bool retry = false;
211 ConnCacheEntry *entry;
212 ConnCacheKey key;
214
215 /* First time through, initialize connection cache hashtable */
216 if (ConnectionHash == NULL)
217 {
218 HASHCTL ctl;
219
220 if (pgfdw_we_get_result == 0)
222 WaitEventExtensionNew("PostgresFdwGetResult");
223
224 ctl.keysize = sizeof(ConnCacheKey);
225 ctl.entrysize = sizeof(ConnCacheEntry);
226 ConnectionHash = hash_create("postgres_fdw connections", 8,
227 &ctl,
229
230 /*
231 * Register some callback functions that manage connection cleanup.
232 * This should be done just once in each backend.
233 */
240 }
241
242 /* Set flag that we did GetConnection during the current transaction */
243 xact_got_connection = true;
244
245 /* Create hash key for the entry. Assume no pad bytes in key struct */
246 key = user->umid;
247
248 /*
249 * Find or create cached entry for requested connection.
250 */
251 entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
252 if (!found)
253 {
254 /*
255 * We need only clear "conn" here; remaining fields will be filled
256 * later when "conn" is set.
257 */
258 entry->conn = NULL;
259 }
260
261 /* Reject further use of connections which failed abort cleanup. */
263
264 /*
265 * If the connection needs to be remade due to invalidation, disconnect as
266 * soon as we're out of all transactions.
267 */
268 if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
269 {
270 elog(DEBUG3, "closing connection %p for option changes to take effect",
271 entry->conn);
273 }
274
275 /*
276 * If cache entry doesn't have a connection, we have to establish a new
277 * connection. (If connect_pg_server throws an error, the cache entry
278 * will remain in a valid empty state, ie conn == NULL.)
279 */
280 if (entry->conn == NULL)
282
283 /*
284 * We check the health of the cached connection here when using it. In
285 * cases where we're out of all transactions, if a broken connection is
286 * detected, we try to reestablish a new connection later.
287 */
288 PG_TRY();
289 {
290 /* Process a pending asynchronous request if any. */
291 if (entry->state.pendingAreq)
293 /* Start a new transaction or subtransaction if needed. */
294 begin_remote_xact(entry);
295 }
296 PG_CATCH();
297 {
300
301 /*
302 * Determine whether to try to reestablish the connection.
303 *
304 * After a broken connection is detected in libpq, any error other
305 * than connection failure (e.g., out-of-memory) can be thrown
306 * somewhere between return from libpq and the expected ereport() call
307 * in pgfdw_report_error(). In this case, since PQstatus() indicates
308 * CONNECTION_BAD, checking only PQstatus() causes the false detection
309 * of connection failure. To avoid this, we also verify that the
310 * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
311 * checking only the sqlstate can cause another false detection
312 * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
313 * for any libpq-originated error condition.
314 */
315 if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
316 PQstatus(entry->conn) != CONNECTION_BAD ||
317 entry->xact_depth > 0)
318 {
320 PG_RE_THROW();
321 }
322
323 /* Clean up the error state */
326 errdata = NULL;
327
328 retry = true;
329 }
330 PG_END_TRY();
331
332 /*
333 * If a broken connection is detected, disconnect it, reestablish a new
334 * connection and retry a new remote transaction. If connection failure is
335 * reported again, we give up getting a connection.
336 */
337 if (retry)
338 {
339 Assert(entry->xact_depth == 0);
340
342 (errmsg_internal("could not start remote transaction on connection %p",
343 entry->conn)),
345
346 elog(DEBUG3, "closing connection %p to reestablish a new one",
347 entry->conn);
349
351
352 begin_remote_xact(entry);
353 }
354
355 /* Remember if caller will prepare statements */
357
358 /* If caller needs access to the per-connection state, return it. */
359 if (state)
360 *state = &entry->state;
361
362 return entry->conn;
363}
364
365/*
366 * Reset all transient state fields in the cached connection entry and
367 * establish new connection to the remote server.
368 */
369static void
371{
372 ForeignServer *server = GetForeignServer(user->serverid);
373 ListCell *lc;
374
375 Assert(entry->conn == NULL);
376
377 /* Reset all transient state fields, to be sure all are clean */
378 entry->xact_depth = 0;
379 entry->have_prep_stmt = false;
380 entry->have_error = false;
381 entry->changing_xact_state = false;
382 entry->invalidated = false;
383 entry->serverid = server->serverid;
384 entry->server_hashvalue =
386 ObjectIdGetDatum(server->serverid));
387 entry->mapping_hashvalue =
389 ObjectIdGetDatum(user->umid));
390 memset(&entry->state, 0, sizeof(entry->state));
391
392 /*
393 * Determine whether to keep the connection that we're about to make here
394 * open even after the transaction using it ends, so that the subsequent
395 * transactions can re-use it.
396 *
397 * By default, all the connections to any foreign servers are kept open.
398 *
399 * Also determine whether to commit/abort (sub)transactions opened on the
400 * remote server in parallel at (sub)transaction end, which is disabled by
401 * default.
402 *
403 * Note: it's enough to determine these only when making a new connection
404 * because if these settings for it are changed, it will be closed and
405 * re-made later.
406 */
407 entry->keep_connections = true;
408 entry->parallel_commit = false;
409 entry->parallel_abort = false;
410 foreach(lc, server->options)
411 {
412 DefElem *def = (DefElem *) lfirst(lc);
413
414 if (strcmp(def->defname, "keep_connections") == 0)
415 entry->keep_connections = defGetBoolean(def);
416 else if (strcmp(def->defname, "parallel_commit") == 0)
417 entry->parallel_commit = defGetBoolean(def);
418 else if (strcmp(def->defname, "parallel_abort") == 0)
419 entry->parallel_abort = defGetBoolean(def);
420 }
421
422 /* Now try to make the connection */
423 entry->conn = connect_pg_server(server, user);
424
425 elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
426 entry->conn, server->servername, user->umid, user->userid);
427}
428
429/*
430 * Check that non-superuser has used password or delegated credentials
431 * to establish connection; otherwise, he's piggybacking on the
432 * postgres server's user identity. See also dblink_security_check()
433 * in contrib/dblink and check_conn_params.
434 */
435static void
437{
438 /* Superusers bypass the check */
439 if (superuser_arg(user->userid))
440 return;
441
442#ifdef ENABLE_GSS
443 /* Connected via GSSAPI with delegated credentials- all good. */
445 return;
446#endif
447
448 /* Ok if superuser set PW required false. */
450 return;
451
452 /* Connected via PW, with PW required true, and provided non-empty PW. */
454 {
455 /* ok if params contain a non-empty password */
456 for (int i = 0; keywords[i] != NULL; i++)
457 {
458 if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
459 return;
460 }
461 }
462
463 /*
464 * Ok if SCRAM pass-through is being used and all required SCRAM options
465 * are set correctly. If pgfdw_has_required_scram_options returns true we
466 * assume that UseScramPassthrough is also true since SCRAM options are
467 * only set when UseScramPassthrough is enabled.
468 */
470 return;
471
474 errmsg("password or GSSAPI delegated credentials required"),
475 errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
476 errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
477}
478
479/*
480 * Connect to remote server using specified server and user mapping properties.
481 */
482static PGconn *
484{
485 PGconn *volatile conn = NULL;
486
487 /*
488 * Use PG_TRY block to ensure closing connection on error.
489 */
490 PG_TRY();
491 {
492 const char **keywords;
493 const char **values;
494 char *appname = NULL;
495 int n;
496
497 /*
498 * Construct connection params from generic options of ForeignServer
499 * and UserMapping. (Some of them might not be libpq options, in
500 * which case we'll just waste a few array slots.) Add 4 extra slots
501 * for application_name, fallback_application_name, client_encoding,
502 * end marker, and 3 extra slots for scram keys and required scram
503 * pass-through options.
504 */
505 n = list_length(server->options) + list_length(user->options) + 4 + 3;
506 keywords = (const char **) palloc(n * sizeof(char *));
507 values = (const char **) palloc(n * sizeof(char *));
508
509 n = 0;
511 keywords + n, values + n);
512 n += ExtractConnectionOptions(user->options,
513 keywords + n, values + n);
514
515 /*
516 * Use pgfdw_application_name as application_name if set.
517 *
518 * PQconnectdbParams() processes the parameter arrays from start to
519 * end. If any key word is repeated, the last value is used. Therefore
520 * note that pgfdw_application_name must be added to the arrays after
521 * options of ForeignServer are, so that it can override
522 * application_name set in ForeignServer.
523 */
525 {
526 keywords[n] = "application_name";
528 n++;
529 }
530
531 /*
532 * Search the parameter arrays to find application_name setting, and
533 * replace escape sequences in it with status information if found.
534 * The arrays are searched backwards because the last value is used if
535 * application_name is repeatedly set.
536 */
537 for (int i = n - 1; i >= 0; i--)
538 {
539 if (strcmp(keywords[i], "application_name") == 0 &&
540 *(values[i]) != '\0')
541 {
542 /*
543 * Use this application_name setting if it's not empty string
544 * even after any escape sequences in it are replaced.
545 */
546 appname = process_pgfdw_appname(values[i]);
547 if (appname[0] != '\0')
548 {
549 values[i] = appname;
550 break;
551 }
552
553 /*
554 * This empty application_name is not used, so we set
555 * values[i] to NULL and keep searching the array to find the
556 * next one.
557 */
558 values[i] = NULL;
559 pfree(appname);
560 appname = NULL;
561 }
562 }
563
564 /* Use "postgres_fdw" as fallback_application_name */
565 keywords[n] = "fallback_application_name";
566 values[n] = "postgres_fdw";
567 n++;
568
569 /* Set client_encoding so that libpq can convert encoding properly. */
570 keywords[n] = "client_encoding";
572 n++;
573
574 /* Add required SCRAM pass-through connection options if it's enabled. */
576 {
577 int len;
578 int encoded_len;
579
580 keywords[n] = "scram_client_key";
582 /* don't forget the zero-terminator */
583 values[n] = palloc0(len + 1);
586 (char *) values[n], len);
587 if (encoded_len < 0)
588 elog(ERROR, "could not encode SCRAM client key");
589 n++;
590
591 keywords[n] = "scram_server_key";
593 /* don't forget the zero-terminator */
594 values[n] = palloc0(len + 1);
597 (char *) values[n], len);
598 if (encoded_len < 0)
599 elog(ERROR, "could not encode SCRAM server key");
600 n++;
601
602 /*
603 * Require scram-sha-256 to ensure that no other auth method is
604 * used when connecting with foreign server.
605 */
606 keywords[n] = "require_auth";
607 values[n] = "scram-sha-256";
608 n++;
609 }
610
611 keywords[n] = values[n] = NULL;
612
613 /* Verify the set of connection parameters. */
615
616 /* first time, allocate or get the custom wait event */
617 if (pgfdw_we_connect == 0)
618 pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
619
620 /* OK to make connection */
622 false, /* expand_dbname */
624
625 if (!conn || PQstatus(conn) != CONNECTION_OK)
628 errmsg("could not connect to server \"%s\"",
629 server->servername),
631
633 "received message via remote connection");
634
635 /* Perform post-connection security checks. */
637
638 /* Prepare new session for use */
640
641 if (appname != NULL)
642 pfree(appname);
644 pfree(values);
645 }
646 PG_CATCH();
647 {
649 PG_RE_THROW();
650 }
651 PG_END_TRY();
652
653 return conn;
654}
655
656/*
657 * Disconnect any open connection for a connection cache entry.
658 */
659static void
661{
662 if (entry->conn != NULL)
663 {
665 entry->conn = NULL;
666 }
667}
668
669/*
670 * Check and return the value of password_required, if defined; otherwise,
671 * return true, which is the default value of it. The mapping has been
672 * pre-validated.
673 */
674static bool
676{
677 ListCell *cell;
678
679 foreach(cell, user->options)
680 {
681 DefElem *def = (DefElem *) lfirst(cell);
682
683 if (strcmp(def->defname, "password_required") == 0)
684 return defGetBoolean(def);
685 }
686
687 return true;
688}
689
690static bool
692{
693 ListCell *cell;
694
695 foreach(cell, server->options)
696 {
697 DefElem *def = (DefElem *) lfirst(cell);
698
699 if (strcmp(def->defname, "use_scram_passthrough") == 0)
700 return defGetBoolean(def);
701 }
702
703 foreach(cell, user->options)
704 {
705 DefElem *def = (DefElem *) lfirst(cell);
706
707 if (strcmp(def->defname, "use_scram_passthrough") == 0)
708 return defGetBoolean(def);
709 }
710
711 return false;
712}
713
714/*
715 * For non-superusers, insist that the connstr specify a password or that the
716 * user provided their own GSSAPI delegated credentials. This
717 * prevents a password from being picked up from .pgpass, a service file, the
718 * environment, etc. We don't want the postgres user's passwords,
719 * certificates, etc to be accessible to non-superusers. (See also
720 * dblink_connstr_check in contrib/dblink.)
721 */
722static void
723check_conn_params(const char **keywords, const char **values, UserMapping *user)
724{
725 int i;
726
727 /* no check required if superuser */
728 if (superuser_arg(user->userid))
729 return;
730
731#ifdef ENABLE_GSS
732 /* ok if the user provided their own delegated credentials */
734 return;
735#endif
736
737 /* ok if params contain a non-empty password */
738 for (i = 0; keywords[i] != NULL; i++)
739 {
740 if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
741 return;
742 }
743
744 /* ok if the superuser explicitly said so at user mapping creation time */
746 return;
747
748 /*
749 * Ok if SCRAM pass-through is being used and all required scram options
750 * are set correctly. If pgfdw_has_required_scram_options returns true we
751 * assume that UseScramPassthrough is also true since SCRAM options are
752 * only set when UseScramPassthrough is enabled.
753 */
755 return;
756
759 errmsg("password or GSSAPI delegated credentials required"),
760 errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
761}
762
763/*
764 * Issue SET commands to make sure remote session is configured properly.
765 *
766 * We do this just once at connection, assuming nothing will change the
767 * values later. Since we'll never send volatile function calls to the
768 * remote, there shouldn't be any way to break this assumption from our end.
769 * It's possible to think of ways to break it at the remote end, eg making
770 * a foreign table point to a view that includes a set_config call ---
771 * but once you admit the possibility of a malicious view definition,
772 * there are any number of ways to break things.
773 */
774static void
776{
778
779 /* Force the search path to contain only pg_catalog (see deparse.c) */
780 do_sql_command(conn, "SET search_path = pg_catalog");
781
782 /*
783 * Set remote timezone; this is basically just cosmetic, since all
784 * transmitted and returned timestamptzs should specify a zone explicitly
785 * anyway. However it makes the regression test outputs more predictable.
786 *
787 * We don't risk setting remote zone equal to ours, since the remote
788 * server might use a different timezone database. Instead, use GMT
789 * (quoted, because very old servers are picky about case). That's
790 * guaranteed to work regardless of the remote's timezone database,
791 * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
792 */
793 do_sql_command(conn, "SET timezone = 'GMT'");
794
795 /*
796 * Set values needed to ensure unambiguous data output from remote. (This
797 * logic should match what pg_dump does. See also set_transmission_modes
798 * in postgres_fdw.c.)
799 */
800 do_sql_command(conn, "SET datestyle = ISO");
801 if (remoteversion >= 80400)
802 do_sql_command(conn, "SET intervalstyle = postgres");
803 if (remoteversion >= 90000)
804 do_sql_command(conn, "SET extra_float_digits = 3");
805 else
806 do_sql_command(conn, "SET extra_float_digits = 2");
807}
808
809/*
810 * Convenience subroutine to issue a non-data-returning SQL command to remote
811 */
812void
813do_sql_command(PGconn *conn, const char *sql)
814{
816 do_sql_command_end(conn, sql, false);
817}
818
819static void
821{
822 if (!PQsendQuery(conn, sql))
824}
825
826static void
828{
829 PGresult *res;
830
831 /*
832 * If requested, consume whatever data is available from the socket. (Note
833 * that if all data is available, this allows pgfdw_get_result to call
834 * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
835 * would be large compared to the overhead of PQconsumeInput.)
836 */
839 res = pgfdw_get_result(conn);
841 pgfdw_report_error(res, conn, sql);
842 PQclear(res);
843}
844
845/*
846 * Start remote transaction or subtransaction, if needed.
847 *
848 * Note that we always use at least REPEATABLE READ in the remote session.
849 * This is so that, if a query initiates multiple scans of the same or
850 * different foreign tables, we will get snapshot-consistent results from
851 * those scans. A disadvantage is that we can't provide sane emulation of
852 * READ COMMITTED behavior --- it would be nice if we had some other way to
853 * control which remote queries share a snapshot.
854 */
855static void
857{
859
860 /* Start main transaction if we haven't yet */
861 if (entry->xact_depth <= 0)
862 {
863 const char *sql;
864
865 elog(DEBUG3, "starting remote transaction on connection %p",
866 entry->conn);
867
869 sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
870 else
871 sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
872 entry->changing_xact_state = true;
873 do_sql_command(entry->conn, sql);
874 entry->xact_depth = 1;
875 entry->changing_xact_state = false;
876 }
877
878 /*
879 * If we're in a subtransaction, stack up savepoints to match our level.
880 * This ensures we can rollback just the desired effects when a
881 * subtransaction aborts.
882 */
883 while (entry->xact_depth < curlevel)
884 {
885 char sql[64];
886
887 snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
888 entry->changing_xact_state = true;
889 do_sql_command(entry->conn, sql);
890 entry->xact_depth++;
891 entry->changing_xact_state = false;
892 }
893}
894
895/*
896 * Release connection reference count created by calling GetConnection.
897 */
898void
900{
901 /*
902 * Currently, we don't actually track connection references because all
903 * cleanup is managed on a transaction or subtransaction basis instead. So
904 * there's nothing to do here.
905 */
906}
907
908/*
909 * Assign a "unique" number for a cursor.
910 *
911 * These really only need to be unique per connection within a transaction.
912 * For the moment we ignore the per-connection point and assign them across
913 * all connections in the transaction, but we ask for the connection to be
914 * supplied in case we want to refine that.
915 *
916 * Note that even if wraparound happens in a very long transaction, actual
917 * collisions are highly improbable; just be sure to use %u not %d to print.
918 */
919unsigned int
921{
922 return ++cursor_number;
923}
924
925/*
926 * Assign a "unique" number for a prepared statement.
927 *
928 * This works much like GetCursorNumber, except that we never reset the counter
929 * within a session. That's because we can't be 100% sure we've gotten rid
930 * of all prepared statements on all connections, and it's not really worth
931 * increasing the risk of prepared-statement name collisions by resetting.
932 */
933unsigned int
938
939/*
940 * Submit a query and wait for the result.
941 *
942 * Since we don't use non-blocking mode, this can't process interrupts while
943 * pushing the query text to the server. That risk is relatively small, so we
944 * ignore that for now.
945 *
946 * Caller is responsible for the error handling on the result.
947 */
948PGresult *
950{
951 /* First, process a pending asynchronous request, if any. */
952 if (state && state->pendingAreq)
953 process_pending_request(state->pendingAreq);
954
955 if (!PQsendQuery(conn, query))
956 return NULL;
957 return pgfdw_get_result(conn);
958}
959
960/*
961 * Wrap libpqsrv_get_result_last(), adding wait event.
962 *
963 * Caller is responsible for the error handling on the result.
964 */
965PGresult *
970
971/*
972 * Report an error we got from the remote server.
973 *
974 * Callers should use pgfdw_report_error() to throw an error, or use
975 * pgfdw_report() for lesser message levels. (We make this distinction
976 * so that pgfdw_report_error() can be marked noreturn.)
977 *
978 * res: PGresult containing the error (might be NULL)
979 * conn: connection we did the query on
980 * sql: NULL, or text of remote command we tried to execute
981 *
982 * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
983 * in which case memory context cleanup will clear it eventually).
984 *
985 * Note: callers that choose not to throw ERROR for a remote error are
986 * responsible for making sure that the associated ConnCacheEntry gets
987 * marked with have_error = true.
988 */
989void
990pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
991{
992 pgfdw_report_internal(ERROR, res, conn, sql);
994}
995
996void
997pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
998{
999 Assert(elevel < ERROR); /* use pgfdw_report_error for that */
1000 pgfdw_report_internal(elevel, res, conn, sql);
1001}
1002
1003static void
1005 const char *sql)
1006{
1012 int sqlstate;
1013
1014 if (diag_sqlstate)
1015 sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1016 diag_sqlstate[1],
1017 diag_sqlstate[2],
1018 diag_sqlstate[3],
1019 diag_sqlstate[4]);
1020 else
1021 sqlstate = ERRCODE_CONNECTION_FAILURE;
1022
1023 /*
1024 * If we don't get a message from the PGresult, try the PGconn. This is
1025 * needed because for connection-level failures, PQgetResult may just
1026 * return NULL, not a PGresult at all.
1027 */
1028 if (message_primary == NULL)
1030
1031 ereport(elevel,
1032 (errcode(sqlstate),
1033 (message_primary != NULL && message_primary[0] != '\0') ?
1035 errmsg("could not obtain message string for remote error"),
1037 message_hint ? errhint("%s", message_hint) : 0,
1039 sql ? errcontext("remote SQL command: %s", sql) : 0));
1040 PQclear(res);
1041}
1042
1043/*
1044 * pgfdw_xact_callback --- cleanup at main-transaction end.
1045 *
1046 * This runs just late enough that it must not enter user-defined code
1047 * locally. (Entering such code on the remote side is fine. Its remote
1048 * COMMIT TRANSACTION may run deferred triggers.)
1049 */
1050static void
1052{
1053 HASH_SEQ_STATUS scan;
1054 ConnCacheEntry *entry;
1057
1058 /* Quick exit if no connections were touched in this transaction. */
1060 return;
1061
1062 /*
1063 * Scan all connection cache entries to find open remote transactions, and
1064 * close them.
1065 */
1067 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1068 {
1069 PGresult *res;
1070
1071 /* Ignore cache entry if no open connection right now */
1072 if (entry->conn == NULL)
1073 continue;
1074
1075 /* If it has an open remote transaction, try to close it */
1076 if (entry->xact_depth > 0)
1077 {
1078 elog(DEBUG3, "closing remote transaction on connection %p",
1079 entry->conn);
1080
1081 switch (event)
1082 {
1085
1086 /*
1087 * If abort cleanup previously failed for this connection,
1088 * we can't issue any more commands against it.
1089 */
1091
1092 /* Commit all remote transactions during pre-commit */
1093 entry->changing_xact_state = true;
1094 if (entry->parallel_commit)
1095 {
1096 do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
1098 continue;
1099 }
1100 do_sql_command(entry->conn, "COMMIT TRANSACTION");
1101 entry->changing_xact_state = false;
1102
1103 /*
1104 * If there were any errors in subtransactions, and we
1105 * made prepared statements, do a DEALLOCATE ALL to make
1106 * sure we get rid of all prepared statements. This is
1107 * annoying and not terribly bulletproof, but it's
1108 * probably not worth trying harder.
1109 *
1110 * DEALLOCATE ALL only exists in 8.3 and later, so this
1111 * constrains how old a server postgres_fdw can
1112 * communicate with. We intentionally ignore errors in
1113 * the DEALLOCATE, so that we can hobble along to some
1114 * extent with older servers (leaking prepared statements
1115 * as we go; but we don't really support update operations
1116 * pre-8.3 anyway).
1117 */
1118 if (entry->have_prep_stmt && entry->have_error)
1119 {
1120 res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
1121 NULL);
1122 PQclear(res);
1123 }
1124 entry->have_prep_stmt = false;
1125 entry->have_error = false;
1126 break;
1128
1129 /*
1130 * We disallow any remote transactions, since it's not
1131 * very reasonable to hold them open until the prepared
1132 * transaction is committed. For the moment, throw error
1133 * unconditionally; later we might allow read-only cases.
1134 * Note that the error will cause us to come right back
1135 * here with event == XACT_EVENT_ABORT, so we'll clean up
1136 * the connection state at that point.
1137 */
1138 ereport(ERROR,
1140 errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1141 break;
1143 case XACT_EVENT_COMMIT:
1144 case XACT_EVENT_PREPARE:
1145 /* Pre-commit should have closed the open transaction */
1146 elog(ERROR, "missed cleaning up connection during pre-commit");
1147 break;
1149 case XACT_EVENT_ABORT:
1150 /* Rollback all remote transactions during abort */
1151 if (entry->parallel_abort)
1152 {
1153 if (pgfdw_abort_cleanup_begin(entry, true,
1156 continue;
1157 }
1158 else
1159 pgfdw_abort_cleanup(entry, true);
1160 break;
1161 }
1162 }
1163
1164 /* Reset state to show we're out of a transaction */
1165 pgfdw_reset_xact_state(entry, true);
1166 }
1167
1168 /* If there are any pending connections, finish cleaning them up */
1170 {
1171 if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1172 event == XACT_EVENT_PRE_COMMIT)
1173 {
1176 }
1177 else
1178 {
1180 event == XACT_EVENT_ABORT);
1182 true);
1183 }
1184 }
1185
1186 /*
1187 * Regardless of the event type, we can now mark ourselves as out of the
1188 * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1189 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1190 */
1191 xact_got_connection = false;
1192
1193 /* Also reset cursor numbering for next transaction */
1194 cursor_number = 0;
1195}
1196
1197/*
1198 * pgfdw_subxact_callback --- cleanup at subtransaction end.
1199 */
1200static void
1203{
1204 HASH_SEQ_STATUS scan;
1205 ConnCacheEntry *entry;
1206 int curlevel;
1209
1210 /* Nothing to do at subxact start, nor after commit. */
1211 if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1212 event == SUBXACT_EVENT_ABORT_SUB))
1213 return;
1214
1215 /* Quick exit if no connections were touched in this transaction. */
1217 return;
1218
1219 /*
1220 * Scan all connection cache entries to find open remote subtransactions
1221 * of the current level, and close them.
1222 */
1225 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1226 {
1227 char sql[100];
1228
1229 /*
1230 * We only care about connections with open remote subtransactions of
1231 * the current level.
1232 */
1233 if (entry->conn == NULL || entry->xact_depth < curlevel)
1234 continue;
1235
1236 if (entry->xact_depth > curlevel)
1237 elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1238 entry->xact_depth);
1239
1240 if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1241 {
1242 /*
1243 * If abort cleanup previously failed for this connection, we
1244 * can't issue any more commands against it.
1245 */
1247
1248 /* Commit all remote subtransactions during pre-commit */
1249 snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1250 entry->changing_xact_state = true;
1251 if (entry->parallel_commit)
1252 {
1253 do_sql_command_begin(entry->conn, sql);
1255 continue;
1256 }
1257 do_sql_command(entry->conn, sql);
1258 entry->changing_xact_state = false;
1259 }
1260 else
1261 {
1262 /* Rollback all remote subtransactions during abort */
1263 if (entry->parallel_abort)
1264 {
1265 if (pgfdw_abort_cleanup_begin(entry, false,
1268 continue;
1269 }
1270 else
1271 pgfdw_abort_cleanup(entry, false);
1272 }
1273
1274 /* OK, we're outta that level of subtransaction */
1275 pgfdw_reset_xact_state(entry, false);
1276 }
1277
1278 /* If there are any pending connections, finish cleaning them up */
1280 {
1281 if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1282 {
1285 }
1286 else
1287 {
1290 false);
1291 }
1292 }
1293}
1294
1295/*
1296 * Connection invalidation callback function
1297 *
1298 * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1299 * close connections depending on that entry immediately if current transaction
1300 * has not used those connections yet. Otherwise, mark those connections as
1301 * invalid and then make pgfdw_xact_callback() close them at the end of current
1302 * transaction, since they cannot be closed in the midst of the transaction
1303 * using them. Closed connections will be remade at the next opportunity if
1304 * necessary.
1305 *
1306 * Although most cache invalidation callbacks blow away all the related stuff
1307 * regardless of the given hashvalue, connections are expensive enough that
1308 * it's worth trying to avoid that.
1309 *
1310 * NB: We could avoid unnecessary disconnection more strictly by examining
1311 * individual option values, but it seems too much effort for the gain.
1312 */
1313static void
1315{
1316 HASH_SEQ_STATUS scan;
1317 ConnCacheEntry *entry;
1318
1320
1321 /* ConnectionHash must exist already, if we're registered */
1323 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1324 {
1325 /* Ignore invalid entries */
1326 if (entry->conn == NULL)
1327 continue;
1328
1329 /* hashvalue == 0 means a cache reset, must clear all state */
1330 if (hashvalue == 0 ||
1332 entry->server_hashvalue == hashvalue) ||
1333 (cacheid == USERMAPPINGOID &&
1334 entry->mapping_hashvalue == hashvalue))
1335 {
1336 /*
1337 * Close the connection immediately if it's not used yet in this
1338 * transaction. Otherwise mark it as invalid so that
1339 * pgfdw_xact_callback() can close it at the end of this
1340 * transaction.
1341 */
1342 if (entry->xact_depth == 0)
1343 {
1344 elog(DEBUG3, "discarding connection %p", entry->conn);
1345 disconnect_pg_server(entry);
1346 }
1347 else
1348 entry->invalidated = true;
1349 }
1350 }
1351}
1352
1353/*
1354 * Raise an error if the given connection cache entry is marked as being
1355 * in the middle of an xact state change. This should be called at which no
1356 * such change is expected to be in progress; if one is found to be in
1357 * progress, it means that we aborted in the middle of a previous state change
1358 * and now don't know what the remote transaction state actually is.
1359 * Such connections can't safely be further used. Re-establishing the
1360 * connection would change the snapshot and roll back any writes already
1361 * performed, so that's not an option, either. Thus, we must abort.
1362 */
1363static void
1365{
1366 ForeignServer *server;
1367
1368 /* nothing to do for inactive entries and entries of sane state */
1369 if (entry->conn == NULL || !entry->changing_xact_state)
1370 return;
1371
1372 /* make sure this entry is inactive */
1373 disconnect_pg_server(entry);
1374
1375 /* find server name to be shown in the message below */
1376 server = GetForeignServer(entry->serverid);
1377
1378 ereport(ERROR,
1380 errmsg("connection to server \"%s\" was lost",
1381 server->servername)));
1382}
1383
1384/*
1385 * Reset state to show we're out of a (sub)transaction.
1386 */
1387static void
1389{
1390 if (toplevel)
1391 {
1392 /* Reset state to show we're out of a transaction */
1393 entry->xact_depth = 0;
1394
1395 /*
1396 * If the connection isn't in a good idle state, it is marked as
1397 * invalid or keep_connections option of its server is disabled, then
1398 * discard it to recover. Next GetConnection will open a new
1399 * connection.
1400 */
1401 if (PQstatus(entry->conn) != CONNECTION_OK ||
1403 entry->changing_xact_state ||
1404 entry->invalidated ||
1405 !entry->keep_connections)
1406 {
1407 elog(DEBUG3, "discarding connection %p", entry->conn);
1408 disconnect_pg_server(entry);
1409 }
1410 }
1411 else
1412 {
1413 /* Reset state to show we're out of a subtransaction */
1414 entry->xact_depth--;
1415 }
1416}
1417
1418/*
1419 * Cancel the currently-in-progress query (whose query text we do not have)
1420 * and ignore the result. Returns true if we successfully cancel the query
1421 * and discard any pending result, and false if not.
1422 *
1423 * It's not a huge problem if we throw an ERROR here, but if we get into error
1424 * recursion trouble, we'll end up slamming the connection shut, which will
1425 * necessitate failing the entire toplevel transaction even if subtransactions
1426 * were used. Try to use WARNING where we can.
1427 *
1428 * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1429 * query text from the pendingAreq saved in the per-connection state, then
1430 * report the query using it.
1431 */
1432static bool
1434{
1438
1439 /*
1440 * If it takes too long to cancel the query and discard the result, assume
1441 * the connection is dead.
1442 */
1444
1445 /*
1446 * Also, lose patience and re-issue the cancel request after a little bit.
1447 * (This serves to close some race conditions.)
1448 */
1450
1452 return false;
1454}
1455
1456/*
1457 * Submit a cancel request to the given connection, waiting only until
1458 * the given time.
1459 *
1460 * We sleep interruptibly until we receive confirmation that the cancel
1461 * request has been accepted, and if it is, return true; if the timeout
1462 * lapses without that, or the request fails for whatever reason, return
1463 * false.
1464 */
1465static bool
1467{
1468 const char *errormsg = libpqsrv_cancel(conn, endtime);
1469
1470 if (errormsg != NULL)
1473 errmsg("could not send cancel request: %s", errormsg));
1474
1475 return errormsg == NULL;
1476}
1477
1478static bool
1481{
1482 PGresult *result;
1483 bool timed_out;
1484
1485 /*
1486 * If requested, consume whatever data is available from the socket. (Note
1487 * that if all data is available, this allows pgfdw_get_cleanup_result to
1488 * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1489 * which would be large compared to the overhead of PQconsumeInput.)
1490 */
1492 {
1495 errmsg("could not get result of cancel request: %s",
1497 return false;
1498 }
1499
1500 /* Get and discard the result of the query. */
1502 &result, &timed_out))
1503 {
1504 if (timed_out)
1506 (errmsg("could not get result of cancel request due to timeout")));
1507 else
1510 errmsg("could not get result of cancel request: %s",
1512
1513 return false;
1514 }
1515 PQclear(result);
1516
1517 return true;
1518}
1519
1520/*
1521 * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1522 * result. If the query is executed without error, the return value is true.
1523 * If the query is executed successfully but returns an error, the return
1524 * value is true if and only if ignore_errors is set. If the query can't be
1525 * sent or times out, the return value is false.
1526 *
1527 * It's not a huge problem if we throw an ERROR here, but if we get into error
1528 * recursion trouble, we'll end up slamming the connection shut, which will
1529 * necessitate failing the entire toplevel transaction even if subtransactions
1530 * were used. Try to use WARNING where we can.
1531 */
1532static bool
1534{
1536
1537 /*
1538 * If it takes too long to execute a cleanup query, assume the connection
1539 * is dead. It's fairly likely that this is why we aborted in the first
1540 * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1541 * be too long.
1542 */
1545
1547 return false;
1549 false, ignore_errors);
1550}
1551
1552static bool
1554{
1555 Assert(query != NULL);
1556
1557 /*
1558 * Submit a query. Since we don't use non-blocking mode, this also can
1559 * block. But its risk is relatively small, so we ignore that for now.
1560 */
1561 if (!PQsendQuery(conn, query))
1562 {
1563 pgfdw_report(WARNING, NULL, conn, query);
1564 return false;
1565 }
1566
1567 return true;
1568}
1569
1570static bool
1573 bool ignore_errors)
1574{
1575 PGresult *result;
1576 bool timed_out;
1577
1578 Assert(query != NULL);
1579
1580 /*
1581 * If requested, consume whatever data is available from the socket. (Note
1582 * that if all data is available, this allows pgfdw_get_cleanup_result to
1583 * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1584 * which would be large compared to the overhead of PQconsumeInput.)
1585 */
1587 {
1588 pgfdw_report(WARNING, NULL, conn, query);
1589 return false;
1590 }
1591
1592 /* Get the result of the query. */
1594 {
1595 if (timed_out)
1597 (errmsg("could not get query result due to timeout"),
1598 errcontext("remote SQL command: %s", query)));
1599 else
1600 pgfdw_report(WARNING, NULL, conn, query);
1601
1602 return false;
1603 }
1604
1605 /* Issue a warning if not successful. */
1606 if (PQresultStatus(result) != PGRES_COMMAND_OK)
1607 {
1608 pgfdw_report(WARNING, result, conn, query);
1609 return ignore_errors;
1610 }
1611 PQclear(result);
1612
1613 return true;
1614}
1615
1616/*
1617 * Get, during abort cleanup, the result of a query that is in progress.
1618 * This might be a query that is being interrupted by a cancel request or by
1619 * transaction abort, or it might be a query that was initiated as part of
1620 * transaction abort to get the remote side back to the appropriate state.
1621 *
1622 * endtime is the time at which we should give up and assume the remote side
1623 * is dead. retrycanceltime is the time at which we should issue a fresh
1624 * cancel request (pass the same value as endtime if this is not wanted).
1625 *
1626 * Returns true if the timeout expired or connection trouble occurred,
1627 * false otherwise. Sets *result except in case of a true result.
1628 * Sets *timed_out to true only when the timeout expired.
1629 */
1630static bool
1633 PGresult **result,
1634 bool *timed_out)
1635{
1636 bool failed = false;
1637 PGresult *last_res = NULL;
1639
1640 *result = NULL;
1641 *timed_out = false;
1642 for (;;)
1643 {
1644 PGresult *res;
1645
1646 while (PQisBusy(conn))
1647 {
1648 int wc;
1650 long cur_timeout;
1651
1652 /* If timeout has expired, give up. */
1653 if (now >= endtime)
1654 {
1655 *timed_out = true;
1656 failed = true;
1657 goto exit;
1658 }
1659
1660 /* If we need to re-issue the cancel request, do that. */
1661 if (now >= retrycanceltime)
1662 {
1663 /* We ignore failure to issue the repeated request. */
1665
1666 /* Recompute "now" in case that took measurable time. */
1668
1669 /* Adjust re-cancel timeout in increasing steps. */
1671 canceldelta);
1673 }
1674
1675 /* If timeout has expired, give up, else get sleep time. */
1677 Min(endtime,
1679 if (cur_timeout <= 0)
1680 {
1681 *timed_out = true;
1682 failed = true;
1683 goto exit;
1684 }
1685
1686 /* first time, allocate or get the custom wait event */
1687 if (pgfdw_we_cleanup_result == 0)
1688 pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1689
1690 /* Sleep until there's something to do */
1694 PQsocket(conn),
1697
1699
1700 /* Data available in socket? */
1701 if (wc & WL_SOCKET_READABLE)
1702 {
1703 if (!PQconsumeInput(conn))
1704 {
1705 /* connection trouble */
1706 failed = true;
1707 goto exit;
1708 }
1709 }
1710 }
1711
1712 res = PQgetResult(conn);
1713 if (res == NULL)
1714 break; /* query is complete */
1715
1716 PQclear(last_res);
1717 last_res = res;
1718 }
1719exit:
1720 if (failed)
1721 PQclear(last_res);
1722 else
1723 *result = last_res;
1724 return failed;
1725}
1726
1727/*
1728 * Abort remote transaction or subtransaction.
1729 *
1730 * "toplevel" should be set to true if toplevel (main) transaction is
1731 * rollbacked, false otherwise.
1732 *
1733 * Set entry->changing_xact_state to false on success, true on failure.
1734 */
1735static void
1737{
1738 char sql[100];
1739
1740 /*
1741 * Don't try to clean up the connection if we're already in error
1742 * recursion trouble.
1743 */
1745 entry->changing_xact_state = true;
1746
1747 /*
1748 * If connection is already unsalvageable, don't touch it further.
1749 */
1750 if (entry->changing_xact_state)
1751 return;
1752
1753 /*
1754 * Mark this connection as in the process of changing transaction state.
1755 */
1756 entry->changing_xact_state = true;
1757
1758 /* Assume we might have lost track of prepared statements */
1759 entry->have_error = true;
1760
1761 /*
1762 * If a command has been submitted to the remote server by using an
1763 * asynchronous execution function, the command might not have yet
1764 * completed. Check to see if a command is still being processed by the
1765 * remote server, and if so, request cancellation of the command.
1766 */
1767 if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1768 !pgfdw_cancel_query(entry->conn))
1769 return; /* Unable to cancel running query */
1770
1771 CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1772 if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1773 return; /* Unable to abort remote (sub)transaction */
1774
1775 if (toplevel)
1776 {
1777 if (entry->have_prep_stmt && entry->have_error &&
1779 "DEALLOCATE ALL",
1780 true))
1781 return; /* Trouble clearing prepared statements */
1782
1783 entry->have_prep_stmt = false;
1784 entry->have_error = false;
1785 }
1786
1787 /*
1788 * If pendingAreq of the per-connection state is not NULL, it means that
1789 * an asynchronous fetch begun by fetch_more_data_begin() was not done
1790 * successfully and thus the per-connection state was not reset in
1791 * fetch_more_data(); in that case reset the per-connection state here.
1792 */
1793 if (entry->state.pendingAreq)
1794 memset(&entry->state, 0, sizeof(entry->state));
1795
1796 /* Disarm changing_xact_state if it all worked */
1797 entry->changing_xact_state = false;
1798}
1799
1800/*
1801 * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1802 * don't wait for the result.
1803 *
1804 * Returns true if the abort command or cancel request is successfully issued,
1805 * false otherwise. If the abort command is successfully issued, the given
1806 * connection cache entry is appended to *pending_entries. Otherwise, if the
1807 * cancel request is successfully issued, it is appended to *cancel_requested.
1808 */
1809static bool
1812{
1813 /*
1814 * Don't try to clean up the connection if we're already in error
1815 * recursion trouble.
1816 */
1818 entry->changing_xact_state = true;
1819
1820 /*
1821 * If connection is already unsalvageable, don't touch it further.
1822 */
1823 if (entry->changing_xact_state)
1824 return false;
1825
1826 /*
1827 * Mark this connection as in the process of changing transaction state.
1828 */
1829 entry->changing_xact_state = true;
1830
1831 /* Assume we might have lost track of prepared statements */
1832 entry->have_error = true;
1833
1834 /*
1835 * If a command has been submitted to the remote server by using an
1836 * asynchronous execution function, the command might not have yet
1837 * completed. Check to see if a command is still being processed by the
1838 * remote server, and if so, request cancellation of the command.
1839 */
1841 {
1843
1847 return false; /* Unable to cancel running query */
1849 }
1850 else
1851 {
1852 char sql[100];
1853
1854 CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1855 if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1856 return false; /* Unable to abort remote transaction */
1858 }
1859
1860 return true;
1861}
1862
1863/*
1864 * Finish pre-commit cleanup of connections on each of which we've sent a
1865 * COMMIT command to the remote server.
1866 */
1867static void
1869{
1870 ConnCacheEntry *entry;
1872 ListCell *lc;
1873
1875
1876 /*
1877 * Get the result of the COMMIT command for each of the pending entries
1878 */
1879 foreach(lc, pending_entries)
1880 {
1881 entry = (ConnCacheEntry *) lfirst(lc);
1882
1884
1885 /*
1886 * We might already have received the result on the socket, so pass
1887 * consume_input=true to try to consume it first
1888 */
1889 do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1890 entry->changing_xact_state = false;
1891
1892 /* Do a DEALLOCATE ALL in parallel if needed */
1893 if (entry->have_prep_stmt && entry->have_error)
1894 {
1895 /* Ignore errors (see notes in pgfdw_xact_callback) */
1896 if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1897 {
1899 continue;
1900 }
1901 }
1902 entry->have_prep_stmt = false;
1903 entry->have_error = false;
1904
1905 pgfdw_reset_xact_state(entry, true);
1906 }
1907
1908 /* No further work if no pending entries */
1909 if (!pending_deallocs)
1910 return;
1911
1912 /*
1913 * Get the result of the DEALLOCATE command for each of the pending
1914 * entries
1915 */
1916 foreach(lc, pending_deallocs)
1917 {
1918 PGresult *res;
1919
1920 entry = (ConnCacheEntry *) lfirst(lc);
1921
1922 /* Ignore errors (see notes in pgfdw_xact_callback) */
1923 while ((res = PQgetResult(entry->conn)) != NULL)
1924 {
1925 PQclear(res);
1926 /* Stop if the connection is lost (else we'll loop infinitely) */
1927 if (PQstatus(entry->conn) == CONNECTION_BAD)
1928 break;
1929 }
1930 entry->have_prep_stmt = false;
1931 entry->have_error = false;
1932
1933 pgfdw_reset_xact_state(entry, true);
1934 }
1935}
1936
1937/*
1938 * Finish pre-subcommit cleanup of connections on each of which we've sent a
1939 * RELEASE command to the remote server.
1940 */
1941static void
1943{
1944 ConnCacheEntry *entry;
1945 char sql[100];
1946 ListCell *lc;
1947
1949
1950 /*
1951 * Get the result of the RELEASE command for each of the pending entries
1952 */
1953 snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1954 foreach(lc, pending_entries)
1955 {
1956 entry = (ConnCacheEntry *) lfirst(lc);
1957
1959
1960 /*
1961 * We might already have received the result on the socket, so pass
1962 * consume_input=true to try to consume it first
1963 */
1964 do_sql_command_end(entry->conn, sql, true);
1965 entry->changing_xact_state = false;
1966
1967 pgfdw_reset_xact_state(entry, false);
1968 }
1969}
1970
1971/*
1972 * Finish abort cleanup of connections on each of which we've sent an abort
1973 * command or cancel request to the remote server.
1974 */
1975static void
1977 bool toplevel)
1978{
1980 ListCell *lc;
1981
1982 /*
1983 * For each of the pending cancel requests (if any), get and discard the
1984 * result of the query, and submit an abort command to the remote server.
1985 */
1986 if (cancel_requested)
1987 {
1988 foreach(lc, cancel_requested)
1989 {
1990 ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1994 char sql[100];
1995
1997
1998 /*
1999 * Set end time. You might think we should do this before issuing
2000 * cancel request like in normal mode, but that is problematic,
2001 * because if, for example, it took longer than 30 seconds to
2002 * process the first few entries in the cancel_requested list, it
2003 * would cause a timeout error when processing each of the
2004 * remaining entries in the list, leading to slamming that entry's
2005 * connection shut.
2006 */
2011
2012 if (!pgfdw_cancel_query_end(entry->conn, endtime,
2013 retrycanceltime, true))
2014 {
2015 /* Unable to cancel running query */
2016 pgfdw_reset_xact_state(entry, toplevel);
2017 continue;
2018 }
2019
2020 /* Send an abort command in parallel if needed */
2021 CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2022 if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
2023 {
2024 /* Unable to abort remote (sub)transaction */
2025 pgfdw_reset_xact_state(entry, toplevel);
2026 }
2027 else
2029 }
2030 }
2031
2032 /* No further work if no pending entries */
2033 if (!pending_entries)
2034 return;
2035
2036 /*
2037 * Get the result of the abort command for each of the pending entries
2038 */
2039 foreach(lc, pending_entries)
2040 {
2041 ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2043 char sql[100];
2044
2046
2047 /*
2048 * Set end time. We do this now, not before issuing the command like
2049 * in normal mode, for the same reason as for the cancel_requested
2050 * entries.
2051 */
2054
2055 CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2056 if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
2057 true, false))
2058 {
2059 /* Unable to abort remote (sub)transaction */
2060 pgfdw_reset_xact_state(entry, toplevel);
2061 continue;
2062 }
2063
2064 if (toplevel)
2065 {
2066 /* Do a DEALLOCATE ALL in parallel if needed */
2067 if (entry->have_prep_stmt && entry->have_error)
2068 {
2070 "DEALLOCATE ALL"))
2071 {
2072 /* Trouble clearing prepared statements */
2073 pgfdw_reset_xact_state(entry, toplevel);
2074 }
2075 else
2077 continue;
2078 }
2079 entry->have_prep_stmt = false;
2080 entry->have_error = false;
2081 }
2082
2083 /* Reset the per-connection state if needed */
2084 if (entry->state.pendingAreq)
2085 memset(&entry->state, 0, sizeof(entry->state));
2086
2087 /* We're done with this entry; unset the changing_xact_state flag */
2088 entry->changing_xact_state = false;
2089 pgfdw_reset_xact_state(entry, toplevel);
2090 }
2091
2092 /* No further work if no pending entries */
2093 if (!pending_deallocs)
2094 return;
2095 Assert(toplevel);
2096
2097 /*
2098 * Get the result of the DEALLOCATE command for each of the pending
2099 * entries
2100 */
2101 foreach(lc, pending_deallocs)
2102 {
2103 ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2105
2107 Assert(entry->have_prep_stmt);
2108 Assert(entry->have_error);
2109
2110 /*
2111 * Set end time. We do this now, not before issuing the command like
2112 * in normal mode, for the same reason as for the cancel_requested
2113 * entries.
2114 */
2117
2118 if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
2119 endtime, true, true))
2120 {
2121 /* Trouble clearing prepared statements */
2122 pgfdw_reset_xact_state(entry, toplevel);
2123 continue;
2124 }
2125 entry->have_prep_stmt = false;
2126 entry->have_error = false;
2127
2128 /* Reset the per-connection state if needed */
2129 if (entry->state.pendingAreq)
2130 memset(&entry->state, 0, sizeof(entry->state));
2131
2132 /* We're done with this entry; unset the changing_xact_state flag */
2133 entry->changing_xact_state = false;
2134 pgfdw_reset_xact_state(entry, toplevel);
2135 }
2136}
2137
2138/* Number of output arguments (columns) for various API versions */
2139#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
2140#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 6
2141#define POSTGRES_FDW_GET_CONNECTIONS_COLS 6 /* maximum of above */
2142
2143/*
2144 * Internal function used by postgres_fdw_get_connections variants.
2145 *
2146 * For API version 1.1, this function takes no input parameter and
2147 * returns a set of records with the following values:
2148 *
2149 * - server_name - server name of active connection. In case the foreign server
2150 * is dropped but still the connection is active, then the server name will
2151 * be NULL in output.
2152 * - valid - true/false representing whether the connection is valid or not.
2153 * Note that connections can become invalid in pgfdw_inval_callback.
2154 *
2155 * For API version 1.2 and later, this function takes an input parameter
2156 * to check a connection status and returns the following
2157 * additional values along with the four values from version 1.1:
2158 *
2159 * - user_name - the local user name of the active connection. In case the
2160 * user mapping is dropped but the connection is still active, then the
2161 * user name will be NULL in the output.
2162 * - used_in_xact - true if the connection is used in the current transaction.
2163 * - closed - true if the connection is closed.
2164 * - remote_backend_pid - process ID of the remote backend, on the foreign
2165 * server, handling the connection.
2166 *
2167 * No records are returned when there are no cached connections at all.
2168 */
2169static void
2171 enum pgfdwVersion api_version)
2172{
2174 HASH_SEQ_STATUS scan;
2175 ConnCacheEntry *entry;
2176
2177 InitMaterializedSRF(fcinfo, 0);
2178
2179 /* If cache doesn't exist, we return no records */
2180 if (!ConnectionHash)
2181 return;
2182
2183 /* Check we have the expected number of output arguments */
2184 switch (rsinfo->setDesc->natts)
2185 {
2187 if (api_version != PGFDW_V1_1)
2188 elog(ERROR, "incorrect number of output arguments");
2189 break;
2191 if (api_version != PGFDW_V1_2)
2192 elog(ERROR, "incorrect number of output arguments");
2193 break;
2194 default:
2195 elog(ERROR, "incorrect number of output arguments");
2196 }
2197
2199 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2200 {
2201 ForeignServer *server;
2203 bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2204 int i = 0;
2205
2206 /* We only look for open remote connections */
2207 if (!entry->conn)
2208 continue;
2209
2211
2212 /*
2213 * The foreign server may have been dropped in current explicit
2214 * transaction. It is not possible to drop the server from another
2215 * session when the connection associated with it is in use in the
2216 * current transaction, if tried so, the drop query in another session
2217 * blocks until the current transaction finishes.
2218 *
2219 * Even though the server is dropped in the current transaction, the
2220 * cache can still have associated active connection entry, say we
2221 * call such connections dangling. Since we can not fetch the server
2222 * name from system catalogs for dangling connections, instead we show
2223 * NULL value for server name in output.
2224 *
2225 * We could have done better by storing the server name in the cache
2226 * entry instead of server oid so that it could be used in the output.
2227 * But the server name in each cache entry requires 64 bytes of
2228 * memory, which is huge, when there are many cached connections and
2229 * the use case i.e. dropping the foreign server within the explicit
2230 * current transaction seems rare. So, we chose to show NULL value for
2231 * server name in output.
2232 *
2233 * Such dangling connections get closed either in next use or at the
2234 * end of current explicit transaction in pgfdw_xact_callback.
2235 */
2236 if (!server)
2237 {
2238 /*
2239 * If the server has been dropped in the current explicit
2240 * transaction, then this entry would have been invalidated in
2241 * pgfdw_inval_callback at the end of drop server command. Note
2242 * that this connection would not have been closed in
2243 * pgfdw_inval_callback because it is still being used in the
2244 * current explicit transaction. So, assert that here.
2245 */
2246 Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2247
2248 /* Show null, if no server name was found */
2249 nulls[i++] = true;
2250 }
2251 else
2252 values[i++] = CStringGetTextDatum(server->servername);
2253
2254 if (api_version >= PGFDW_V1_2)
2255 {
2256 HeapTuple tp;
2257
2258 /* Use the system cache to obtain the user mapping */
2260
2261 /*
2262 * Just like in the foreign server case, user mappings can also be
2263 * dropped in the current explicit transaction. Therefore, the
2264 * similar check as in the server case is required.
2265 */
2266 if (!HeapTupleIsValid(tp))
2267 {
2268 /*
2269 * If we reach here, this entry must have been invalidated in
2270 * pgfdw_inval_callback, same as in the server case.
2271 */
2272 Assert(entry->conn && entry->xact_depth > 0 &&
2273 entry->invalidated);
2274
2275 nulls[i++] = true;
2276 }
2277 else
2278 {
2279 Oid userid;
2280
2281 userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
2283 ReleaseSysCache(tp);
2284 }
2285 }
2286
2287 values[i++] = BoolGetDatum(!entry->invalidated);
2288
2289 if (api_version >= PGFDW_V1_2)
2290 {
2291 bool check_conn = PG_GETARG_BOOL(0);
2292
2293 /* Is this connection used in the current transaction? */
2294 values[i++] = BoolGetDatum(entry->xact_depth > 0);
2295
2296 /*
2297 * If a connection status check is requested and supported, return
2298 * whether the connection is closed. Otherwise, return NULL.
2299 */
2301 values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
2302 else
2303 nulls[i++] = true;
2304
2305 /* Return process ID of remote backend */
2306 values[i++] = Int32GetDatum(PQbackendPID(entry->conn));
2307 }
2308
2309 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2310 }
2311}
2312
2313/*
2314 * List active foreign server connections.
2315 *
2316 * The SQL API of this function has changed multiple times, and will likely
2317 * do so again in future. To support the case where a newer version of this
2318 * loadable module is being used with an old SQL declaration of the function,
2319 * we continue to support the older API versions.
2320 */
2321Datum
2328
2329Datum
2336
2337/*
2338 * Disconnect the specified cached connections.
2339 *
2340 * This function discards the open connections that are established by
2341 * postgres_fdw from the local session to the foreign server with
2342 * the given name. Note that there can be multiple connections to
2343 * the given server using different user mappings. If the connections
2344 * are used in the current local transaction, they are not disconnected
2345 * and warning messages are reported. This function returns true
2346 * if it disconnects at least one connection, otherwise false. If no
2347 * foreign server with the given name is found, an error is reported.
2348 */
2349Datum
2351{
2352 ForeignServer *server;
2353 char *servername;
2354
2355 servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2356 server = GetForeignServerByName(servername, false);
2357
2359}
2360
2361/*
2362 * Disconnect all the cached connections.
2363 *
2364 * This function discards all the open connections that are established by
2365 * postgres_fdw from the local session to the foreign servers.
2366 * If the connections are used in the current local transaction, they are
2367 * not disconnected and warning messages are reported. This function
2368 * returns true if it disconnects at least one connection, otherwise false.
2369 */
2370Datum
2375
2376/*
2377 * Workhorse to disconnect cached connections.
2378 *
2379 * This function scans all the connection cache entries and disconnects
2380 * the open connections whose foreign server OID matches with
2381 * the specified one. If InvalidOid is specified, it disconnects all
2382 * the cached connections.
2383 *
2384 * This function emits a warning for each connection that's used in
2385 * the current transaction and doesn't close it. It returns true if
2386 * it disconnects at least one connection, otherwise false.
2387 *
2388 * Note that this function disconnects even the connections that are
2389 * established by other users in the same local session using different
2390 * user mappings. This leads even non-superuser to be able to close
2391 * the connections established by superusers in the same local session.
2392 *
2393 * XXX As of now we don't see any security risk doing this. But we should
2394 * set some restrictions on that, for example, prevent non-superuser
2395 * from closing the connections established by superusers even
2396 * in the same session?
2397 */
2398static bool
2400{
2401 HASH_SEQ_STATUS scan;
2402 ConnCacheEntry *entry;
2403 bool all = !OidIsValid(serverid);
2404 bool result = false;
2405
2406 /*
2407 * Connection cache hashtable has not been initialized yet in this
2408 * session, so return false.
2409 */
2410 if (!ConnectionHash)
2411 return false;
2412
2414 while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2415 {
2416 /* Ignore cache entry if no open connection right now. */
2417 if (!entry->conn)
2418 continue;
2419
2420 if (all || entry->serverid == serverid)
2421 {
2422 /*
2423 * Emit a warning because the connection to close is used in the
2424 * current transaction and cannot be disconnected right now.
2425 */
2426 if (entry->xact_depth > 0)
2427 {
2428 ForeignServer *server;
2429
2430 server = GetForeignServerExtended(entry->serverid,
2432
2433 if (!server)
2434 {
2435 /*
2436 * If the foreign server was dropped while its connection
2437 * was used in the current transaction, the connection
2438 * must have been marked as invalid by
2439 * pgfdw_inval_callback at the end of DROP SERVER command.
2440 */
2441 Assert(entry->invalidated);
2442
2444 (errmsg("cannot close dropped server connection because it is still in use")));
2445 }
2446 else
2448 (errmsg("cannot close connection for server \"%s\" because it is still in use",
2449 server->servername)));
2450 }
2451 else
2452 {
2453 elog(DEBUG3, "discarding connection %p", entry->conn);
2454 disconnect_pg_server(entry);
2455 result = true;
2456 }
2457 }
2458 }
2459
2460 return result;
2461}
2462
2463/*
2464 * Check if the remote server closed the connection.
2465 *
2466 * Returns 1 if the connection is closed, -1 if an error occurred,
2467 * and 0 if it's not closed or if the connection check is unavailable
2468 * on this platform.
2469 */
2470static int
2472{
2473 int sock = PQsocket(conn);
2474
2475 if (PQstatus(conn) != CONNECTION_OK || sock == -1)
2476 return -1;
2477
2478#if (defined(HAVE_POLL) && defined(POLLRDHUP))
2479 {
2480 struct pollfd input_fd;
2481 int result;
2482
2483 input_fd.fd = sock;
2484 input_fd.events = POLLRDHUP;
2485 input_fd.revents = 0;
2486
2487 do
2488 result = poll(&input_fd, 1, 0);
2489 while (result < 0 && errno == EINTR);
2490
2491 if (result < 0)
2492 return -1;
2493
2494 return (input_fd.revents &
2495 (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
2496 }
2497#else
2498 return 0;
2499#endif
2500}
2501
2502/*
2503 * Check if connection status checking is available on this platform.
2504 *
2505 * Returns true if available, false otherwise.
2506 */
2507static bool
2509{
2510#if (defined(HAVE_POLL) && defined(POLLRDHUP))
2511 return true;
2512#else
2513 return false;
2514#endif
2515}
2516
2517/*
2518 * Ensure that require_auth and SCRAM keys are correctly set on values. SCRAM
2519 * keys used to pass-through are coming from the initial connection from the
2520 * client with the server.
2521 *
2522 * All required SCRAM options are set by postgres_fdw, so we just need to
2523 * ensure that these options are not overwritten by the user.
2524 */
2525static bool
2527{
2528 bool has_scram_server_key = false;
2529 bool has_scram_client_key = false;
2530 bool has_require_auth = false;
2531 bool has_scram_keys = false;
2532
2533 /*
2534 * Continue iterating even if we found the keys that we need to validate
2535 * to make sure that there is no other declaration of these keys that can
2536 * overwrite the first.
2537 */
2538 for (int i = 0; keywords[i] != NULL; i++)
2539 {
2540 if (strcmp(keywords[i], "scram_client_key") == 0)
2541 {
2542 if (values[i] != NULL && values[i][0] != '\0')
2543 has_scram_client_key = true;
2544 else
2545 has_scram_client_key = false;
2546 }
2547
2548 if (strcmp(keywords[i], "scram_server_key") == 0)
2549 {
2550 if (values[i] != NULL && values[i][0] != '\0')
2551 has_scram_server_key = true;
2552 else
2553 has_scram_server_key = false;
2554 }
2555
2556 if (strcmp(keywords[i], "require_auth") == 0)
2557 {
2558 if (values[i] != NULL && strcmp(values[i], "scram-sha-256") == 0)
2559 has_require_auth = true;
2560 else
2561 has_require_auth = false;
2562 }
2563 }
2564
2566
2567 return (has_scram_keys && has_require_auth);
2568}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
int pg_b64_enc_len(int srclen)
Definition base64.c:224
int pg_b64_encode(const uint8 *src, int len, char *dst, int dstlen)
Definition base64.c:49
bool be_gssapi_get_delegation(Port *port)
static Datum values[MAXATTR]
Definition bootstrap.c:147
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Min(x, y)
Definition c.h:1019
uint32 SubTransactionId
Definition c.h:682
#define Assert(condition)
Definition c.h:885
#define pg_unreachable()
Definition c.h:353
uint32_t uint32
Definition c.h:558
#define OidIsValid(objectId)
Definition c.h:800
Oid ConnCacheKey
Definition connection.c:53
static unsigned int prep_stmt_number
Definition connection.c:83
unsigned int GetCursorNumber(PGconn *conn)
Definition connection.c:920
static bool UserMappingPasswordRequired(UserMapping *user)
Definition connection.c:675
Datum postgres_fdw_get_connections(PG_FUNCTION_ARGS)
void do_sql_command(PGconn *conn, const char *sql)
Definition connection.c:813
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
Definition connection.c:949
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
static void disconnect_pg_server(ConnCacheEntry *entry)
Definition connection.c:660
void ReleaseConnection(PGconn *conn)
Definition connection.c:899
static uint32 pgfdw_we_get_result
Definition connection.c:91
static bool UseScramPassthrough(ForeignServer *server, UserMapping *user)
Definition connection.c:691
#define RETRY_CANCEL_TIMEOUT
Definition connection.c:105
PGresult * pgfdw_get_result(PGconn *conn)
Definition connection.c:966
void pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
Definition connection.c:990
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
static void pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, bool toplevel)
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
static int pgfdw_conn_check(PGconn *conn)
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1
static void configure_remote_session(PGconn *conn)
Definition connection.c:775
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, TimestampTz retrycanceltime, bool consume_input)
static bool xact_got_connection
Definition connection.c:86
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
void pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
Definition connection.c:997
Datum postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
Definition connection.c:827
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
Definition connection.c:108
static bool pgfdw_conn_checkable(void)
static uint32 pgfdw_we_cleanup_result
Definition connection.c:89
static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
static HTAB * ConnectionHash
Definition connection.c:79
static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
static unsigned int cursor_number
Definition connection.c:82
static bool pgfdw_has_required_scram_options(const char **keywords, const char **values)
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
Definition connection.c:370
static void pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
Definition connection.c:436
Datum postgres_fdw_disconnect(PG_FUNCTION_ARGS)
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition connection.c:483
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition connection.c:934
Datum postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
Definition connection.c:723
static uint32 pgfdw_we_connect
Definition connection.c:90
static void pgfdw_xact_callback(XactEvent event, void *arg)
static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo, enum pgfdwVersion api_version)
static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn, const char *sql)
#define CONNECTION_CLEANUP_TIMEOUT
Definition connection.c:98
static void do_sql_command_begin(PGconn *conn, const char *sql)
Definition connection.c:820
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, TimestampTz retrycanceltime, PGresult **result, bool *timed_out)
static void begin_remote_xact(ConnCacheEntry *entry)
Definition connection.c:856
pgfdwVersion
Definition connection.c:123
@ PGFDW_V1_1
Definition connection.c:124
@ PGFDW_V1_2
Definition connection.c:125
static void pgfdw_inval_callback(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
static bool pgfdw_cancel_query(PGconn *conn)
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
static bool disconnect_cached_connections(Oid serverid)
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition option.c:414
char * pgfdw_application_name
Definition option.c:46
char * process_pgfdw_appname(const char *appname)
Definition option.c:491
int64 TimestampTz
Definition timestamp.h:39
bool defGetBoolean(DefElem *def)
Definition define.c:93
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
Datum arg
Definition elog.c:1322
void FreeErrorData(ErrorData *edata)
Definition elog.c:2013
ErrorData * CopyErrorData(void)
Definition elog.c:1941
void FlushErrorState(void)
Definition elog.c:2062
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
bool in_error_recursion_trouble(void)
Definition elog.c:305
#define PG_RE_THROW()
Definition elog.h:405
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
#define errcontext
Definition elog.h:198
int errhint(const char *fmt,...) pg_attribute_printf(1
#define DEBUG3
Definition elog.h:28
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:372
#define WARNING
Definition elog.h:36
#define PG_END_TRY(...)
Definition elog.h:397
#define ERROR
Definition elog.h:39
#define PG_CATCH(...)
Definition elog.h:382
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition elog.h:56
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
int PQserverVersion(const PGconn *conn)
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
int PQconnectionUsedPassword(const PGconn *conn)
int PQconnectionUsedGSSAPI(const PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
PQnoticeReceiver PQsetNoticeReceiver(PGconn *conn, PQnoticeReceiver proc, void *arg)
int PQbackendPID(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQconsumeInput(PGconn *conn)
Definition fe-exec.c:2001
int PQsendQuery(PGconn *conn, const char *query)
Definition fe-exec.c:1433
int PQisBusy(PGconn *conn)
Definition fe-exec.c:2048
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_FUNCTION_INFO_V1(funcname)
Definition fmgr.h:417
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition foreign.c:183
ForeignServer * GetForeignServer(Oid serverid)
Definition foreign.c:112
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
Definition foreign.c:124
#define MappingUserName(userid)
Definition foreign.h:20
#define FSV_MISSING_OK
Definition foreign.h:61
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
struct Port * MyProcPort
Definition globals.c:51
struct Latch * MyLatch
Definition globals.c:63
@ HASH_ENTER
Definition hsearch.h:114
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_BLOBS
Definition hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1816
int i
Definition isn.c:77
static const JsonPathKeyword keywords[]
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
void ResetLatch(Latch *latch)
Definition latch.c:374
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static void libpqsrv_notice_receiver(void *arg, const PGresult *res)
static void libpqsrv_disconnect(PGconn *conn)
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
#define PQgetResult
#define PQclear
#define PQresultErrorField
#define PQresultStatus
@ CONNECTION_BAD
Definition libpq-fe.h:85
@ CONNECTION_OK
Definition libpq-fe.h:84
@ PGRES_COMMAND_OK
Definition libpq-fe.h:125
@ PQTRANS_IDLE
Definition libpq-fe.h:147
@ PQTRANS_ACTIVE
Definition libpq-fe.h:148
List * lappend(List *list, void *datum)
Definition list.c:339
const char * GetDatabaseEncodingName(void)
Definition mbutils.c:1395
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc0(Size size)
Definition mcxt.c:1417
char * pchomp(const char *in)
Definition mcxt.c:1809
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
const void size_t len
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
static char * user
Definition pg_regress.c:119
END_CATALOG_STRUCT typedef FormData_pg_user_mapping * Form_pg_user_mapping
#define snprintf
Definition port.h:260
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition postgres.h:222
#define InvalidOid
unsigned int Oid
#define PG_DIAG_MESSAGE_HINT
#define PG_DIAG_SQLSTATE
#define PG_DIAG_MESSAGE_PRIMARY
#define PG_DIAG_MESSAGE_DETAIL
#define PG_DIAG_CONTEXT
void process_pending_request(AsyncRequest *areq)
static int fb(int x)
tree ctl
Definition radixtree.h:1838
PGconn * GetConnection(void)
Definition streamutil.c:60
PGconn * conn
Definition streamutil.c:52
PGconn * conn
Definition connection.c:58
bool have_prep_stmt
Definition connection.c:62
PgFdwConnState state
Definition connection.c:73
ConnCacheKey key
Definition connection.c:57
bool parallel_commit
Definition connection.c:65
uint32 server_hashvalue
Definition connection.c:71
uint32 mapping_hashvalue
Definition connection.c:72
bool keep_connections
Definition connection.c:68
bool parallel_abort
Definition connection.c:66
bool changing_xact_state
Definition connection.c:64
char * defname
Definition parsenodes.h:844
List * options
Definition foreign.h:42
char * servername
Definition foreign.h:39
Definition pg_list.h:54
AsyncRequest * pendingAreq
uint8 scram_ServerKey[SCRAM_MAX_KEY_LEN]
Definition libpq-be.h:187
bool has_scram_keys
Definition libpq-be.h:188
uint8 scram_ClientKey[SCRAM_MAX_KEY_LEN]
Definition libpq-be.h:186
bool superuser_arg(Oid roleid)
Definition superuser.c:57
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:220
#define GetSysCacheHashValue1(cacheId, key1)
Definition syscache.h:118
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:784
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
char * text_to_cstring(const text *t)
Definition varlena.c:215
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition wait_event.c:163
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define EINTR
Definition win32_port.h:361
int GetCurrentTransactionNestLevel(void)
Definition xact.c:930
void RegisterXactCallback(XactCallback callback, void *arg)
Definition xact.c:3826
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition xact.c:3886
SubXactEvent
Definition xact.h:142
@ SUBXACT_EVENT_PRE_COMMIT_SUB
Definition xact.h:146
@ SUBXACT_EVENT_ABORT_SUB
Definition xact.h:145
XactEvent
Definition xact.h:128
@ XACT_EVENT_PRE_PREPARE
Definition xact.h:136
@ XACT_EVENT_COMMIT
Definition xact.h:129
@ XACT_EVENT_PARALLEL_PRE_COMMIT
Definition xact.h:135
@ XACT_EVENT_PARALLEL_COMMIT
Definition xact.h:130
@ XACT_EVENT_ABORT
Definition xact.h:131
@ XACT_EVENT_PRE_COMMIT
Definition xact.h:134
@ XACT_EVENT_PARALLEL_ABORT
Definition xact.h:132
@ XACT_EVENT_PREPARE
Definition xact.h:133
#define IsolationIsSerializable()
Definition xact.h:53