98#define CONNECTION_CLEANUP_TIMEOUT 30000
105#define RETRY_CANCEL_TIMEOUT 1000
108#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
111 snprintf((sql), sizeof(sql), \
112 "ABORT TRANSACTION"); \
114 snprintf((sql), sizeof(sql), \
115 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
116 (entry)->xact_depth, (entry)->xact_depth); \
170 PGresult **result,
bool *timed_out);
173 List **pending_entries,
174 List **cancel_requested);
179 List *cancel_requested,
269 elog(
DEBUG3,
"closing connection %p for option changes to take effect",
279 if (entry->
conn == NULL)
314 if (errdata->
sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
341 (
errmsg_internal(
"could not start remote transaction on connection %p",
345 elog(
DEBUG3,
"closing connection %p to reestablish a new one",
389 memset(&entry->
state, 0,
sizeof(entry->
state));
413 if (strcmp(def->
defname,
"keep_connections") == 0)
415 else if (strcmp(def->
defname,
"parallel_commit") == 0)
417 else if (strcmp(def->
defname,
"parallel_abort") == 0)
424 elog(
DEBUG3,
"new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
472 (
errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
473 errmsg(
"password or GSSAPI delegated credentials required"),
474 errdetail(
"Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
475 errhint(
"Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
493 char *appname = NULL;
536 for (
int i = n - 1;
i >= 0;
i--)
538 if (strcmp(
keywords[
i],
"application_name") == 0 &&
546 if (appname[0] !=
'\0')
564 keywords[n] =
"fallback_application_name";
565 values[n] =
"postgres_fdw";
587 elog(
ERROR,
"could not encode SCRAM client key");
598 elog(
ERROR,
"could not encode SCRAM server key");
606 values[n] =
"scram-sha-256";
626 (
errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
627 errmsg(
"could not connect to server \"%s\"",
632 "received message via remote connection");
661 if (entry->
conn != NULL)
677 foreach(cell,
user->options)
681 if (strcmp(def->
defname,
"password_required") == 0)
697 if (strcmp(def->
defname,
"use_scram_passthrough") == 0)
701 foreach(cell,
user->options)
705 if (strcmp(def->
defname,
"use_scram_passthrough") == 0)
756 (
errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
757 errmsg(
"password or GSSAPI delegated credentials required"),
758 errdetail(
"Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
799 if (remoteversion >= 80400)
801 if (remoteversion >= 90000)
863 elog(
DEBUG3,
"starting remote transaction on connection %p",
867 sql =
"START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
869 sql =
"START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
1019 sqlstate = ERRCODE_CONNECTION_FAILURE;
1026 if (message_primary == NULL)
1031 (message_primary != NULL && message_primary[0] !=
'\0') ?
1033 errmsg(
"could not obtain message string for remote error"),
1035 message_hint ?
errhint(
"%s", message_hint) : 0,
1036 message_context ?
errcontext(
"%s", message_context) : 0,
1037 sql ?
errcontext(
"remote SQL command: %s", sql) : 0));
1070 if (entry->
conn == NULL)
1076 elog(
DEBUG3,
"closing remote transaction on connection %p",
1095 pending_entries =
lappend(pending_entries, entry);
1137 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1138 errmsg(
"cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1144 elog(
ERROR,
"missed cleaning up connection during pre-commit");
1167 if (pending_entries || cancel_requested)
1235 elog(
ERROR,
"missed cleaning up remote subtransaction at level %d",
1247 snprintf(sql,
sizeof(sql),
"RELEASE SAVEPOINT s%d", curlevel);
1252 pending_entries =
lappend(pending_entries, entry);
1277 if (pending_entries || cancel_requested)
1317 Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1324 if (entry->
conn == NULL)
1328 if (hashvalue == 0 ||
1329 (cacheid == FOREIGNSERVEROID &&
1331 (cacheid == USERMAPPINGOID &&
1377 (
errcode(ERRCODE_CONNECTION_EXCEPTION),
1378 errmsg(
"connection to server \"%s\" was lost",
1468 if (errormsg != NULL)
1470 errcode(ERRCODE_CONNECTION_FAILURE),
1471 errmsg(
"could not send cancel request: %s", errormsg));
1473 return errormsg == NULL;
1492 (
errcode(ERRCODE_CONNECTION_FAILURE),
1493 errmsg(
"could not get result of cancel request: %s",
1500 &result, &timed_out))
1504 (
errmsg(
"could not get result of cancel request due to timeout")));
1507 (
errcode(ERRCODE_CONNECTION_FAILURE),
1508 errmsg(
"could not get result of cancel request: %s",
1547 false, ignore_errors);
1595 (
errmsg(
"could not get query result due to timeout"),
1596 errcontext(
"remote SQL command: %s", query)));
1607 return ignore_errors;
1634 bool failed =
false;
1659 if (
now >= retrycanceltime)
1670 canceldelta += canceldelta;
1677 if (cur_timeout <= 0)
1792 memset(&entry->
state, 0,
sizeof(entry->
state));
1809 List **pending_entries,
List **cancel_requested)
1846 *cancel_requested =
lappend(*cancel_requested, entry);
1855 *pending_entries =
lappend(*pending_entries, entry);
1877 foreach(lc, pending_entries)
1896 pending_deallocs =
lappend(pending_deallocs, entry);
1907 if (!pending_deallocs)
1914 foreach(lc, pending_deallocs)
1951 snprintf(sql,
sizeof(sql),
"RELEASE SAVEPOINT s%d", curlevel);
1952 foreach(lc, pending_entries)
1984 if (cancel_requested)
1986 foreach(lc, cancel_requested)
2011 retrycanceltime,
true))
2026 pending_entries =
lappend(pending_entries, entry);
2031 if (!pending_entries)
2037 foreach(lc, pending_entries)
2074 pending_deallocs =
lappend(pending_deallocs, entry);
2083 memset(&entry->
state, 0,
sizeof(entry->
state));
2091 if (!pending_deallocs)
2099 foreach(lc, pending_deallocs)
2117 endtime,
true,
true))
2128 memset(&entry->
state, 0,
sizeof(entry->
state));
2137#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
2138#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 6
2139#define POSTGRES_FDW_GET_CONNECTIONS_COLS 6
2186 elog(
ERROR,
"incorrect number of output arguments");
2190 elog(
ERROR,
"incorrect number of output arguments");
2193 elog(
ERROR,
"incorrect number of output arguments");
2244 Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2270 Assert(entry->conn && entry->xact_depth > 0 &&
2271 entry->invalidated);
2402 bool result =
false;
2418 if (all || entry->
serverid == serverid)
2442 (
errmsg(
"cannot close dropped server connection because it is still in use")));
2446 (
errmsg(
"cannot close connection for server \"%s\" because it is still in use",
2476#if (defined(HAVE_POLL) && defined(POLLRDHUP))
2478 struct pollfd input_fd;
2482 input_fd.events = POLLRDHUP;
2483 input_fd.revents = 0;
2486 result = poll(&input_fd, 1, 0);
2487 while (result < 0 && errno ==
EINTR);
2492 return (input_fd.revents &
2493 (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
2508#if (defined(HAVE_POLL) && defined(POLLRDHUP))
2526 bool has_scram_server_key =
false;
2527 bool has_scram_client_key =
false;
2528 bool has_require_auth =
false;
2529 bool has_scram_keys =
false;
2538 if (strcmp(
keywords[
i],
"scram_client_key") == 0)
2541 has_scram_client_key =
true;
2543 has_scram_client_key =
false;
2546 if (strcmp(
keywords[
i],
"scram_server_key") == 0)
2549 has_scram_server_key =
true;
2551 has_scram_server_key =
false;
2554 if (strcmp(
keywords[
i],
"require_auth") == 0)
2556 if (
values[
i] != NULL && strcmp(
values[
i],
"scram-sha-256") == 0)
2557 has_require_auth =
true;
2559 has_require_auth =
false;
2565 return (has_scram_keys && has_require_auth);
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
int pg_b64_enc_len(int srclen)
int pg_b64_encode(const uint8 *src, int len, char *dst, int dstlen)
bool be_gssapi_get_delegation(Port *port)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define OidIsValid(objectId)
static unsigned int prep_stmt_number
unsigned int GetCursorNumber(PGconn *conn)
static bool UserMappingPasswordRequired(UserMapping *user)
Datum postgres_fdw_get_connections(PG_FUNCTION_ARGS)
void do_sql_command(PGconn *conn, const char *sql)
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
static void disconnect_pg_server(ConnCacheEntry *entry)
void ReleaseConnection(PGconn *conn)
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
static uint32 pgfdw_we_get_result
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections)
static bool UseScramPassthrough(ForeignServer *server, UserMapping *user)
#define RETRY_CANCEL_TIMEOUT
PGresult * pgfdw_get_result(PGconn *conn)
void pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
static void pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, bool toplevel)
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
static int pgfdw_conn_check(PGconn *conn)
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1
static void configure_remote_session(PGconn *conn)
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, TimestampTz retrycanceltime, bool consume_input)
static bool xact_got_connection
#define POSTGRES_FDW_GET_CONNECTIONS_COLS
struct ConnCacheEntry ConnCacheEntry
void pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
Datum postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel)
static bool pgfdw_conn_checkable(void)
static uint32 pgfdw_we_cleanup_result
static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, List **pending_entries, List **cancel_requested)
static HTAB * ConnectionHash
static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, TimestampTz endtime, bool consume_input, bool ignore_errors)
static unsigned int cursor_number
static bool pgfdw_has_required_scram_options(const char **keywords, const char **values)
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user)
static void pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
Datum postgres_fdw_disconnect(PG_FUNCTION_ARGS)
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
unsigned int GetPrepStmtNumber(PGconn *conn)
Datum postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
static void check_conn_params(const char **keywords, const char **values, UserMapping *user)
static uint32 pgfdw_we_connect
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
static void pgfdw_xact_callback(XactEvent event, void *arg)
static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo, enum pgfdwVersion api_version)
static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn, const char *sql)
#define CONNECTION_CLEANUP_TIMEOUT
static void do_sql_command_begin(PGconn *conn, const char *sql)
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, TimestampTz retrycanceltime, PGresult **result, bool *timed_out)
static void begin_remote_xact(ConnCacheEntry *entry)
static bool pgfdw_cancel_query(PGconn *conn)
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries)
static bool disconnect_cached_connections(Oid serverid)
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
char * pgfdw_application_name
char * process_pgfdw_appname(const char *appname)
bool defGetBoolean(DefElem *def)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
int errmsg_internal(const char *fmt,...)
void FreeErrorData(ErrorData *edata)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
ErrorData * CopyErrorData(void)
void FlushErrorState(void)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
bool in_error_recursion_trouble(void)
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
#define ereport(elevel,...)
int PQserverVersion(const PGconn *conn)
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
int PQconnectionUsedPassword(const PGconn *conn)
int PQconnectionUsedGSSAPI(const PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
PQnoticeReceiver PQsetNoticeReceiver(PGconn *conn, PQnoticeReceiver proc, void *arg)
int PQbackendPID(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQconsumeInput(PGconn *conn)
int PQsendQuery(PGconn *conn, const char *query)
int PQisBusy(PGconn *conn)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_BOOL(n)
#define PG_RETURN_BOOL(x)
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
ForeignServer * GetForeignServer(Oid serverid)
ForeignServer * GetForeignServerExtended(Oid serverid, bits16 flags)
#define MappingUserName(userid)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
static const JsonPathKeyword keywords[]
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void ResetLatch(Latch *latch)
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static void libpqsrv_notice_receiver(void *arg, const PGresult *res)
static void libpqsrv_disconnect(PGconn *conn)
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
#define PQresultErrorField
List * lappend(List *list, void *datum)
const char * GetDatabaseEncodingName(void)
void pfree(void *pointer)
void * palloc0(Size size)
char * pchomp(const char *in)
MemoryContext CurrentMemoryContext
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int list_length(const List *l)
FormData_pg_user_mapping * Form_pg_user_mapping
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum Int32GetDatum(int32 X)
#define PG_DIAG_MESSAGE_HINT
#define PG_DIAG_MESSAGE_PRIMARY
#define PG_DIAG_MESSAGE_DETAIL
void process_pending_request(AsyncRequest *areq)
AsyncRequest * pendingAreq
uint8 scram_ServerKey[SCRAM_MAX_KEY_LEN]
uint8 scram_ClientKey[SCRAM_MAX_KEY_LEN]
Tuplestorestate * setResult
bool superuser_arg(Oid roleid)
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
#define GetSysCacheHashValue1(cacheId, key1)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
#define TimestampTzPlusMilliseconds(tz, ms)
char * text_to_cstring(const text *t)
uint32 WaitEventExtensionNew(const char *wait_event_name)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
int GetCurrentTransactionNestLevel(void)
void RegisterXactCallback(XactCallback callback, void *arg)
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
@ SUBXACT_EVENT_PRE_COMMIT_SUB
@ SUBXACT_EVENT_ABORT_SUB
@ XACT_EVENT_PARALLEL_PRE_COMMIT
@ XACT_EVENT_PARALLEL_COMMIT
@ XACT_EVENT_PARALLEL_ABORT
#define IsolationIsSerializable()