PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
dblink.c
Go to the documentation of this file.
1/*
2 * dblink.c
3 *
4 * Functions returning results from a remote database
5 *
6 * Joe Conway <mail@joeconway.com>
7 * And contributors:
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10 *
11 * contrib/dblink/dblink.c
12 * Copyright (c) 2001-2025, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
14 *
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
19 *
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
25 *
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 *
32 */
33#include "postgres.h"
34
35#include <limits.h>
36
37#include "access/htup_details.h"
38#include "access/relation.h"
39#include "access/reloptions.h"
40#include "access/table.h"
41#include "catalog/namespace.h"
44#include "catalog/pg_type.h"
46#include "commands/defrem.h"
47#include "common/base64.h"
48#include "executor/spi.h"
49#include "foreign/foreign.h"
50#include "funcapi.h"
51#include "lib/stringinfo.h"
52#include "libpq-fe.h"
53#include "libpq/libpq-be.h"
55#include "mb/pg_wchar.h"
56#include "miscadmin.h"
57#include "parser/scansup.h"
58#include "utils/acl.h"
59#include "utils/builtins.h"
60#include "utils/fmgroids.h"
61#include "utils/guc.h"
62#include "utils/lsyscache.h"
63#include "utils/memutils.h"
64#include "utils/rel.h"
65#include "utils/varlena.h"
66#include "utils/wait_event.h"
67
69 .name = "dblink",
70 .version = PG_VERSION
71);
72
73typedef struct remoteConn
74{
75 PGconn *conn; /* Hold the remote connection */
76 int openCursorCount; /* The number of open cursors */
77 bool newXactForCursor; /* Opened a transaction for a cursor */
79
80typedef struct storeInfo
81{
86 char **cstrs;
87 /* temp storage for results to avoid leaks on exception */
91
92/*
93 * Internal declarations
94 */
95static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
96static void prepTuplestoreResult(FunctionCallInfo fcinfo);
98 PGresult *res);
100 PGconn *conn,
101 const char *conname,
102 const char *sql,
103 bool fail);
104static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
105static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
106static remoteConn *getConnectionByName(const char *name);
107static HTAB *createConnHash(void);
108static void createNewConnection(const char *name, remoteConn *rconn);
109static void deleteConnection(const char *name);
110static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
111static char **get_text_array_contents(ArrayType *array, int *numitems);
112static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
113static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
114static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
115static char *quote_ident_cstr(char *rawstr);
116static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
117static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
118static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
119static char *generate_relation_name(Relation rel);
120static void dblink_connstr_check(const char *connstr);
121static bool dblink_connstr_has_pw(const char *connstr);
122static void dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr);
123static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
124 bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
125static char *get_connect_string(const char *servername);
126static char *escape_param_str(const char *str);
127static void validate_pkattnums(Relation rel,
128 int2vector *pkattnums_arg, int32 pknumatts_arg,
129 int **pkattnums, int *pknumatts);
131 const char *option, Oid context);
132static int applyRemoteGucs(PGconn *conn);
133static void restoreLocalGucs(int nestlevel);
134static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
136static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
137 Oid context);
139
140/* Global */
141static remoteConn *pconn = NULL;
142static HTAB *remoteConnHash = NULL;
143
144/* custom wait event values, retrieved from shared memory */
148
149/*
150 * Following is list that holds multiple remote connections.
151 * Calling convention of each dblink function changes to accept
152 * connection name as the first parameter. The connection list is
153 * much like ecpg e.g. a mapping between a name and a PGconn object.
154 */
155
156typedef struct remoteConnHashEnt
157{
161
162/* initial number of connection hashes */
163#define NUMCONN 16
164
165static char *
166xpstrdup(const char *in)
167{
168 if (in == NULL)
169 return NULL;
170 return pstrdup(in);
171}
172
173pg_noreturn static void
175{
176 char *msg = pchomp(PQerrorMessage(conn));
177
178 PQclear(res);
179 elog(ERROR, "%s: %s", p2, msg);
180}
181
182pg_noreturn static void
183dblink_conn_not_avail(const char *conname)
184{
185 if (conname)
187 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
188 errmsg("connection \"%s\" not available", conname)));
189 else
191 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
192 errmsg("connection not available")));
193}
194
195static void
196dblink_get_conn(char *conname_or_str,
197 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
198{
199 remoteConn *rconn = getConnectionByName(conname_or_str);
200 PGconn *conn;
201 char *conname;
202 bool freeconn;
203
204 if (rconn)
205 {
206 conn = rconn->conn;
207 conname = conname_or_str;
208 freeconn = false;
209 }
210 else
211 {
212 const char *connstr;
213
214 connstr = get_connect_string(conname_or_str);
215 if (connstr == NULL)
216 connstr = conname_or_str;
218
219 /* first time, allocate or get the custom wait event */
220 if (dblink_we_get_conn == 0)
221 dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
222
223 /* OK to make connection */
225
227 {
228 char *msg = pchomp(PQerrorMessage(conn));
229
232 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
233 errmsg("could not establish connection"),
234 errdetail_internal("%s", msg)));
235 }
239 freeconn = true;
240 conname = NULL;
241 }
242
243 *conn_p = conn;
244 *conname_p = conname;
245 *freeconn_p = freeconn;
246}
247
248static PGconn *
249dblink_get_named_conn(const char *conname)
250{
251 remoteConn *rconn = getConnectionByName(conname);
252
253 if (rconn)
254 return rconn->conn;
255
256 dblink_conn_not_avail(conname);
257 return NULL; /* keep compiler quiet */
258}
259
260static void
262{
263 if (!pconn)
264 {
265 if (dblink_we_get_result == 0)
266 dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
267
269 pconn->conn = NULL;
271 pconn->newXactForCursor = false;
272 }
273}
274
275/*
276 * Create a persistent connection to another database
277 */
279Datum
281{
282 char *conname_or_str = NULL;
283 char *connstr = NULL;
284 char *connname = NULL;
285 char *msg;
286 PGconn *conn = NULL;
287 remoteConn *rconn = NULL;
288
289 dblink_init();
290
291 if (PG_NARGS() == 2)
292 {
293 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
294 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
295 }
296 else if (PG_NARGS() == 1)
297 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
298
299 if (connname)
300 {
302 sizeof(remoteConn));
303 rconn->conn = NULL;
304 rconn->openCursorCount = 0;
305 rconn->newXactForCursor = false;
306 }
307
308 /* first check for valid foreign data server */
309 connstr = get_connect_string(conname_or_str);
310 if (connstr == NULL)
311 connstr = conname_or_str;
312
313 /* check password in connection string if not superuser */
315
316 /* first time, allocate or get the custom wait event */
317 if (dblink_we_connect == 0)
318 dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
319
320 /* OK to make connection */
322
324 {
325 msg = pchomp(PQerrorMessage(conn));
327 if (rconn)
328 pfree(rconn);
329
331 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
332 errmsg("could not establish connection"),
333 errdetail_internal("%s", msg)));
334 }
335
336 /* check password actually used if not superuser */
338
339 /* attempt to set client encoding to match server encoding, if needed */
342
343 if (connname)
344 {
345 rconn->conn = conn;
346 createNewConnection(connname, rconn);
347 }
348 else
349 {
350 if (pconn->conn)
352 pconn->conn = conn;
353 }
354
356}
357
358/*
359 * Clear a persistent connection to another database
360 */
362Datum
364{
365 char *conname = NULL;
366 remoteConn *rconn = NULL;
367 PGconn *conn = NULL;
368
369 dblink_init();
370
371 if (PG_NARGS() == 1)
372 {
374 rconn = getConnectionByName(conname);
375 if (rconn)
376 conn = rconn->conn;
377 }
378 else
379 conn = pconn->conn;
380
381 if (!conn)
382 dblink_conn_not_avail(conname);
383
385 if (rconn)
386 {
387 deleteConnection(conname);
388 pfree(rconn);
389 }
390 else
391 pconn->conn = NULL;
392
394}
395
396/*
397 * opens a cursor using a persistent connection
398 */
400Datum
402{
403 PGresult *res = NULL;
404 PGconn *conn;
405 char *curname = NULL;
406 char *sql = NULL;
407 char *conname = NULL;
409 remoteConn *rconn = NULL;
410 bool fail = true; /* default to backward compatible behavior */
411
412 dblink_init();
414
415 if (PG_NARGS() == 2)
416 {
417 /* text,text */
420 rconn = pconn;
421 }
422 else if (PG_NARGS() == 3)
423 {
424 /* might be text,text,text or text,text,bool */
425 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
426 {
429 fail = PG_GETARG_BOOL(2);
430 rconn = pconn;
431 }
432 else
433 {
437 rconn = getConnectionByName(conname);
438 }
439 }
440 else if (PG_NARGS() == 4)
441 {
442 /* text,text,text,bool */
446 fail = PG_GETARG_BOOL(3);
447 rconn = getConnectionByName(conname);
448 }
449
450 if (!rconn || !rconn->conn)
451 dblink_conn_not_avail(conname);
452
453 conn = rconn->conn;
454
455 /* If we are not in a transaction, start one */
457 {
458 res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
460 dblink_res_internalerror(conn, res, "begin error");
461 PQclear(res);
462 rconn->newXactForCursor = true;
463
464 /*
465 * Since transaction state was IDLE, we force cursor count to
466 * initially be 0. This is needed as a previous ABORT might have wiped
467 * out our transaction without maintaining the cursor count for us.
468 */
469 rconn->openCursorCount = 0;
470 }
471
472 /* if we started a transaction, increment cursor count */
473 if (rconn->newXactForCursor)
474 (rconn->openCursorCount)++;
475
476 appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
478 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
479 {
480 dblink_res_error(conn, conname, res, fail,
481 "while opening cursor \"%s\"", curname);
483 }
484
485 PQclear(res);
487}
488
489/*
490 * closes a cursor
491 */
493Datum
495{
496 PGconn *conn;
497 PGresult *res = NULL;
498 char *curname = NULL;
499 char *conname = NULL;
501 remoteConn *rconn = NULL;
502 bool fail = true; /* default to backward compatible behavior */
503
504 dblink_init();
506
507 if (PG_NARGS() == 1)
508 {
509 /* text */
511 rconn = pconn;
512 }
513 else if (PG_NARGS() == 2)
514 {
515 /* might be text,text or text,bool */
516 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
517 {
519 fail = PG_GETARG_BOOL(1);
520 rconn = pconn;
521 }
522 else
523 {
526 rconn = getConnectionByName(conname);
527 }
528 }
529 if (PG_NARGS() == 3)
530 {
531 /* text,text,bool */
534 fail = PG_GETARG_BOOL(2);
535 rconn = getConnectionByName(conname);
536 }
537
538 if (!rconn || !rconn->conn)
539 dblink_conn_not_avail(conname);
540
541 conn = rconn->conn;
542
543 appendStringInfo(&buf, "CLOSE %s", curname);
544
545 /* close the cursor */
547 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
548 {
549 dblink_res_error(conn, conname, res, fail,
550 "while closing cursor \"%s\"", curname);
552 }
553
554 PQclear(res);
555
556 /* if we started a transaction, decrement cursor count */
557 if (rconn->newXactForCursor)
558 {
559 (rconn->openCursorCount)--;
560
561 /* if count is zero, commit the transaction */
562 if (rconn->openCursorCount == 0)
563 {
564 rconn->newXactForCursor = false;
565
566 res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
568 dblink_res_internalerror(conn, res, "commit error");
569 PQclear(res);
570 }
571 }
572
574}
575
576/*
577 * Fetch results from an open cursor
578 */
580Datum
582{
583 PGresult *res = NULL;
584 char *conname = NULL;
585 remoteConn *rconn = NULL;
586 PGconn *conn = NULL;
588 char *curname = NULL;
589 int howmany = 0;
590 bool fail = true; /* default to backward compatible */
591
592 prepTuplestoreResult(fcinfo);
593
594 dblink_init();
595
596 if (PG_NARGS() == 4)
597 {
598 /* text,text,int,bool */
601 howmany = PG_GETARG_INT32(2);
602 fail = PG_GETARG_BOOL(3);
603
604 rconn = getConnectionByName(conname);
605 if (rconn)
606 conn = rconn->conn;
607 }
608 else if (PG_NARGS() == 3)
609 {
610 /* text,text,int or text,int,bool */
611 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
612 {
614 howmany = PG_GETARG_INT32(1);
615 fail = PG_GETARG_BOOL(2);
616 conn = pconn->conn;
617 }
618 else
619 {
622 howmany = PG_GETARG_INT32(2);
623
624 rconn = getConnectionByName(conname);
625 if (rconn)
626 conn = rconn->conn;
627 }
628 }
629 else if (PG_NARGS() == 2)
630 {
631 /* text,int */
633 howmany = PG_GETARG_INT32(1);
634 conn = pconn->conn;
635 }
636
637 if (!conn)
638 dblink_conn_not_avail(conname);
639
641 appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
642
643 /*
644 * Try to execute the query. Note that since libpq uses malloc, the
645 * PGresult will be long-lived even though we are still in a short-lived
646 * memory context.
647 */
649 if (!res ||
652 {
653 dblink_res_error(conn, conname, res, fail,
654 "while fetching from cursor \"%s\"", curname);
655 return (Datum) 0;
656 }
657 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
658 {
659 /* cursor does not exist - closed already or bad name */
660 PQclear(res);
662 (errcode(ERRCODE_INVALID_CURSOR_NAME),
663 errmsg("cursor \"%s\" does not exist", curname)));
664 }
665
666 materializeResult(fcinfo, conn, res);
667 return (Datum) 0;
668}
669
670/*
671 * Note: this is the new preferred version of dblink
672 */
674Datum
676{
677 return dblink_record_internal(fcinfo, false);
678}
679
681Datum
683{
684 PGconn *conn;
685 char *sql;
686 int retval;
687
688 if (PG_NARGS() == 2)
689 {
692 }
693 else
694 /* shouldn't happen */
695 elog(ERROR, "wrong number of arguments");
696
697 /* async query send */
698 retval = PQsendQuery(conn, sql);
699 if (retval != 1)
700 elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
701
702 PG_RETURN_INT32(retval);
703}
704
706Datum
708{
709 return dblink_record_internal(fcinfo, true);
710}
711
712static Datum
714{
715 PGconn *volatile conn = NULL;
716 volatile bool freeconn = false;
717
718 prepTuplestoreResult(fcinfo);
719
720 dblink_init();
721
722 PG_TRY();
723 {
724 char *sql = NULL;
725 char *conname = NULL;
726 bool fail = true; /* default to backward compatible */
727
728 if (!is_async)
729 {
730 if (PG_NARGS() == 3)
731 {
732 /* text,text,bool */
735 fail = PG_GETARG_BOOL(2);
736 dblink_get_conn(conname, &conn, &conname, &freeconn);
737 }
738 else if (PG_NARGS() == 2)
739 {
740 /* text,text or text,bool */
741 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
742 {
744 fail = PG_GETARG_BOOL(1);
745 conn = pconn->conn;
746 }
747 else
748 {
751 dblink_get_conn(conname, &conn, &conname, &freeconn);
752 }
753 }
754 else if (PG_NARGS() == 1)
755 {
756 /* text */
757 conn = pconn->conn;
759 }
760 else
761 /* shouldn't happen */
762 elog(ERROR, "wrong number of arguments");
763 }
764 else /* is_async */
765 {
766 /* get async result */
768
769 if (PG_NARGS() == 2)
770 {
771 /* text,bool */
772 fail = PG_GETARG_BOOL(1);
773 conn = dblink_get_named_conn(conname);
774 }
775 else if (PG_NARGS() == 1)
776 {
777 /* text */
778 conn = dblink_get_named_conn(conname);
779 }
780 else
781 /* shouldn't happen */
782 elog(ERROR, "wrong number of arguments");
783 }
784
785 if (!conn)
786 dblink_conn_not_avail(conname);
787
788 if (!is_async)
789 {
790 /* synchronous query, use efficient tuple collection method */
791 materializeQueryResult(fcinfo, conn, conname, sql, fail);
792 }
793 else
794 {
795 /* async result retrieval, do it the old way */
797
798 /* NULL means we're all done with the async results */
799 if (res)
800 {
801 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
803 {
804 dblink_res_error(conn, conname, res, fail,
805 "while executing query");
806 /* if fail isn't set, we'll return an empty query result */
807 }
808 else
809 {
810 materializeResult(fcinfo, conn, res);
811 }
812 }
813 }
814 }
815 PG_FINALLY();
816 {
817 /* if needed, close the connection to the database */
818 if (freeconn)
820 }
821 PG_END_TRY();
822
823 return (Datum) 0;
824}
825
826/*
827 * Verify function caller can handle a tuplestore result, and set up for that.
828 *
829 * Note: if the caller returns without actually creating a tuplestore, the
830 * executor will treat the function result as an empty set.
831 */
832static void
834{
835 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
836
837 /* check to see if query supports us returning a tuplestore */
838 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
840 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
841 errmsg("set-valued function called in context that cannot accept a set")));
842 if (!(rsinfo->allowedModes & SFRM_Materialize))
844 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
845 errmsg("materialize mode required, but it is not allowed in this context")));
846
847 /* let the executor know we're sending back a tuplestore */
849
850 /* caller must fill these to return a non-empty result */
851 rsinfo->setResult = NULL;
852 rsinfo->setDesc = NULL;
853}
854
855/*
856 * Copy the contents of the PGresult into a tuplestore to be returned
857 * as the result of the current function.
858 * The PGresult will be released in this function.
859 */
860static void
862{
863 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
864
865 /* prepTuplestoreResult must have been called previously */
867
868 PG_TRY();
869 {
870 TupleDesc tupdesc;
871 bool is_sql_cmd;
872 int ntuples;
873 int nfields;
874
876 {
877 is_sql_cmd = true;
878
879 /*
880 * need a tuple descriptor representing one TEXT column to return
881 * the command status string as our result tuple
882 */
883 tupdesc = CreateTemplateTupleDesc(1);
884 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
885 TEXTOID, -1, 0);
886 ntuples = 1;
887 nfields = 1;
888 }
889 else
890 {
892
893 is_sql_cmd = false;
894
895 /* get a tuple descriptor for our result type */
896 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
897 {
899 /* success */
900 break;
901 case TYPEFUNC_RECORD:
902 /* failed to determine actual type of RECORD */
904 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
905 errmsg("function returning record called in context "
906 "that cannot accept type record")));
907 break;
908 default:
909 /* result type isn't composite */
910 elog(ERROR, "return type must be a row type");
911 break;
912 }
913
914 /* make sure we have a persistent copy of the tupdesc */
915 tupdesc = CreateTupleDescCopy(tupdesc);
916 ntuples = PQntuples(res);
917 nfields = PQnfields(res);
918 }
919
920 /*
921 * check result and tuple descriptor have the same number of columns
922 */
923 if (nfields != tupdesc->natts)
925 (errcode(ERRCODE_DATATYPE_MISMATCH),
926 errmsg("remote query result rowtype does not match "
927 "the specified FROM clause rowtype")));
928
929 if (ntuples > 0)
930 {
931 AttInMetadata *attinmeta;
932 int nestlevel = -1;
933 Tuplestorestate *tupstore;
934 MemoryContext oldcontext;
935 int row;
936 char **values;
937
938 attinmeta = TupleDescGetAttInMetadata(tupdesc);
939
940 /* Set GUCs to ensure we read GUC-sensitive data types correctly */
941 if (!is_sql_cmd)
942 nestlevel = applyRemoteGucs(conn);
943
945 tupstore = tuplestore_begin_heap(true, false, work_mem);
946 rsinfo->setResult = tupstore;
947 rsinfo->setDesc = tupdesc;
948 MemoryContextSwitchTo(oldcontext);
949
950 values = palloc_array(char *, nfields);
951
952 /* put all tuples into the tuplestore */
953 for (row = 0; row < ntuples; row++)
954 {
955 HeapTuple tuple;
956
957 if (!is_sql_cmd)
958 {
959 int i;
960
961 for (i = 0; i < nfields; i++)
962 {
963 if (PQgetisnull(res, row, i))
964 values[i] = NULL;
965 else
966 values[i] = PQgetvalue(res, row, i);
967 }
968 }
969 else
970 {
971 values[0] = PQcmdStatus(res);
972 }
973
974 /* build the tuple and put it into the tuplestore. */
975 tuple = BuildTupleFromCStrings(attinmeta, values);
976 tuplestore_puttuple(tupstore, tuple);
977 }
978
979 /* clean up GUC settings, if we changed any */
980 restoreLocalGucs(nestlevel);
981 }
982 }
983 PG_FINALLY();
984 {
985 /* be sure to release the libpq result */
986 PQclear(res);
987 }
988 PG_END_TRY();
989}
990
991/*
992 * Execute the given SQL command and store its results into a tuplestore
993 * to be returned as the result of the current function.
994 *
995 * This is equivalent to PQexec followed by materializeResult, but we make
996 * use of libpq's single-row mode to avoid accumulating the whole result
997 * inside libpq before it gets transferred to the tuplestore.
998 */
999static void
1001 PGconn *conn,
1002 const char *conname,
1003 const char *sql,
1004 bool fail)
1005{
1006 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1007 PGresult *volatile res = NULL;
1008 volatile storeInfo sinfo = {0};
1009
1010 /* prepTuplestoreResult must have been called previously */
1011 Assert(rsinfo->returnMode == SFRM_Materialize);
1012
1013 sinfo.fcinfo = fcinfo;
1014
1015 PG_TRY();
1016 {
1017 /* Create short-lived memory context for data conversions */
1019 "dblink temporary context",
1021
1022 /* execute query, collecting any tuples into the tuplestore */
1023 res = storeQueryResult(&sinfo, conn, sql);
1024
1025 if (!res ||
1028 {
1029 /*
1030 * dblink_res_error will clear the passed PGresult, so we need
1031 * this ugly dance to avoid doing so twice during error exit
1032 */
1033 PGresult *res1 = res;
1034
1035 res = NULL;
1036 dblink_res_error(conn, conname, res1, fail,
1037 "while executing query");
1038 /* if fail isn't set, we'll return an empty query result */
1039 }
1040 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1041 {
1042 /*
1043 * storeRow didn't get called, so we need to convert the command
1044 * status string to a tuple manually
1045 */
1046 TupleDesc tupdesc;
1047 AttInMetadata *attinmeta;
1048 Tuplestorestate *tupstore;
1049 HeapTuple tuple;
1050 char *values[1];
1051 MemoryContext oldcontext;
1052
1053 /*
1054 * need a tuple descriptor representing one TEXT column to return
1055 * the command status string as our result tuple
1056 */
1057 tupdesc = CreateTemplateTupleDesc(1);
1058 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1059 TEXTOID, -1, 0);
1060 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1061
1062 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1063 tupstore = tuplestore_begin_heap(true, false, work_mem);
1064 rsinfo->setResult = tupstore;
1065 rsinfo->setDesc = tupdesc;
1066 MemoryContextSwitchTo(oldcontext);
1067
1068 values[0] = PQcmdStatus(res);
1069
1070 /* build the tuple and put it into the tuplestore. */
1071 tuple = BuildTupleFromCStrings(attinmeta, values);
1072 tuplestore_puttuple(tupstore, tuple);
1073
1074 PQclear(res);
1075 res = NULL;
1076 }
1077 else
1078 {
1080 /* storeRow should have created a tuplestore */
1081 Assert(rsinfo->setResult != NULL);
1082
1083 PQclear(res);
1084 res = NULL;
1085 }
1086
1087 /* clean up data conversion short-lived memory context */
1088 if (sinfo.tmpcontext != NULL)
1089 MemoryContextDelete(sinfo.tmpcontext);
1090 sinfo.tmpcontext = NULL;
1091
1092 PQclear(sinfo.last_res);
1093 sinfo.last_res = NULL;
1094 PQclear(sinfo.cur_res);
1095 sinfo.cur_res = NULL;
1096 }
1097 PG_CATCH();
1098 {
1099 /* be sure to release any libpq result we collected */
1100 PQclear(res);
1101 PQclear(sinfo.last_res);
1102 PQclear(sinfo.cur_res);
1103 /* and clear out any pending data in libpq */
1105 NULL)
1106 PQclear(res);
1107 PG_RE_THROW();
1108 }
1109 PG_END_TRY();
1110}
1111
1112/*
1113 * Execute query, and send any result rows to sinfo->tuplestore.
1114 */
1115static PGresult *
1116storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1117{
1118 bool first = true;
1119 int nestlevel = -1;
1120 PGresult *res;
1121
1122 if (!PQsendQuery(conn, sql))
1123 elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1124
1125 if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1126 elog(ERROR, "failed to set single-row mode for dblink query");
1127
1128 for (;;)
1129 {
1131
1133 if (!sinfo->cur_res)
1134 break;
1135
1137 {
1138 /* got one row from possibly-bigger resultset */
1139
1140 /*
1141 * Set GUCs to ensure we read GUC-sensitive data types correctly.
1142 * We shouldn't do this until we have a row in hand, to ensure
1143 * libpq has seen any earlier ParameterStatus protocol messages.
1144 */
1145 if (first && nestlevel < 0)
1146 nestlevel = applyRemoteGucs(conn);
1147
1148 storeRow(sinfo, sinfo->cur_res, first);
1149
1150 PQclear(sinfo->cur_res);
1151 sinfo->cur_res = NULL;
1152 first = false;
1153 }
1154 else
1155 {
1156 /* if empty resultset, fill tuplestore header */
1157 if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1158 storeRow(sinfo, sinfo->cur_res, first);
1159
1160 /* store completed result at last_res */
1161 PQclear(sinfo->last_res);
1162 sinfo->last_res = sinfo->cur_res;
1163 sinfo->cur_res = NULL;
1164 first = true;
1165 }
1166 }
1167
1168 /* clean up GUC settings, if we changed any */
1169 restoreLocalGucs(nestlevel);
1170
1171 /* return last_res */
1172 res = sinfo->last_res;
1173 sinfo->last_res = NULL;
1174 return res;
1175}
1176
1177/*
1178 * Send single row to sinfo->tuplestore.
1179 *
1180 * If "first" is true, create the tuplestore using PGresult's metadata
1181 * (in this case the PGresult might contain either zero or one row).
1182 */
1183static void
1184storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1185{
1186 int nfields = PQnfields(res);
1187 HeapTuple tuple;
1188 int i;
1189 MemoryContext oldcontext;
1190
1191 if (first)
1192 {
1193 /* Prepare for new result set */
1194 ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1195 TupleDesc tupdesc;
1196
1197 /*
1198 * It's possible to get more than one result set if the query string
1199 * contained multiple SQL commands. In that case, we follow PQexec's
1200 * traditional behavior of throwing away all but the last result.
1201 */
1202 if (sinfo->tuplestore)
1203 tuplestore_end(sinfo->tuplestore);
1204 sinfo->tuplestore = NULL;
1205
1206 /* get a tuple descriptor for our result type */
1207 switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1208 {
1209 case TYPEFUNC_COMPOSITE:
1210 /* success */
1211 break;
1212 case TYPEFUNC_RECORD:
1213 /* failed to determine actual type of RECORD */
1214 ereport(ERROR,
1215 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1216 errmsg("function returning record called in context "
1217 "that cannot accept type record")));
1218 break;
1219 default:
1220 /* result type isn't composite */
1221 elog(ERROR, "return type must be a row type");
1222 break;
1223 }
1224
1225 /* make sure we have a persistent copy of the tupdesc */
1226 tupdesc = CreateTupleDescCopy(tupdesc);
1227
1228 /* check result and tuple descriptor have the same number of columns */
1229 if (nfields != tupdesc->natts)
1230 ereport(ERROR,
1231 (errcode(ERRCODE_DATATYPE_MISMATCH),
1232 errmsg("remote query result rowtype does not match "
1233 "the specified FROM clause rowtype")));
1234
1235 /* Prepare attinmeta for later data conversions */
1236 sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1237
1238 /* Create a new, empty tuplestore */
1240 sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1241 rsinfo->setResult = sinfo->tuplestore;
1242 rsinfo->setDesc = tupdesc;
1243 MemoryContextSwitchTo(oldcontext);
1244
1245 /* Done if empty resultset */
1246 if (PQntuples(res) == 0)
1247 return;
1248
1249 /*
1250 * Set up sufficiently-wide string pointers array; this won't change
1251 * in size so it's easy to preallocate.
1252 */
1253 if (sinfo->cstrs)
1254 pfree(sinfo->cstrs);
1255 sinfo->cstrs = palloc_array(char *, nfields);
1256 }
1257
1258 /* Should have a single-row result if we get here */
1259 Assert(PQntuples(res) == 1);
1260
1261 /*
1262 * Do the following work in a temp context that we reset after each tuple.
1263 * This cleans up not only the data we have direct access to, but any
1264 * cruft the I/O functions might leak.
1265 */
1266 oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1267
1268 /*
1269 * Fill cstrs with null-terminated strings of column values.
1270 */
1271 for (i = 0; i < nfields; i++)
1272 {
1273 if (PQgetisnull(res, 0, i))
1274 sinfo->cstrs[i] = NULL;
1275 else
1276 sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1277 }
1278
1279 /* Convert row to a tuple, and add it to the tuplestore */
1280 tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1281
1282 tuplestore_puttuple(sinfo->tuplestore, tuple);
1283
1284 /* Clean up */
1285 MemoryContextSwitchTo(oldcontext);
1287}
1288
1289/*
1290 * List all open dblink connections by name.
1291 * Returns an array of all connection names.
1292 * Takes no params
1293 */
1295Datum
1297{
1298 HASH_SEQ_STATUS status;
1299 remoteConnHashEnt *hentry;
1300 ArrayBuildState *astate = NULL;
1301
1302 if (remoteConnHash)
1303 {
1304 hash_seq_init(&status, remoteConnHash);
1305 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1306 {
1307 /* stash away current value */
1308 astate = accumArrayResult(astate,
1309 CStringGetTextDatum(hentry->name),
1310 false, TEXTOID, CurrentMemoryContext);
1311 }
1312 }
1313
1314 if (astate)
1317 else
1319}
1320
1321/*
1322 * Checks if a given remote connection is busy
1323 *
1324 * Returns 1 if the connection is busy, 0 otherwise
1325 * Params:
1326 * text connection_name - name of the connection to check
1327 *
1328 */
1330Datum
1332{
1333 PGconn *conn;
1334
1335 dblink_init();
1337
1340}
1341
1342/*
1343 * Cancels a running request on a connection
1344 *
1345 * Returns text:
1346 * "OK" if the cancel request has been sent correctly,
1347 * an error message otherwise
1348 *
1349 * Params:
1350 * text connection_name - name of the connection to check
1351 *
1352 */
1354Datum
1356{
1357 PGconn *conn;
1358 const char *msg;
1359 TimestampTz endtime;
1360
1361 dblink_init();
1364 30000);
1365 msg = libpqsrv_cancel(conn, endtime);
1366 if (msg == NULL)
1367 msg = "OK";
1368
1370}
1371
1372
1373/*
1374 * Get error message from a connection
1375 *
1376 * Returns text:
1377 * "OK" if no error, an error message otherwise
1378 *
1379 * Params:
1380 * text connection_name - name of the connection to check
1381 *
1382 */
1384Datum
1386{
1387 char *msg;
1388 PGconn *conn;
1389
1390 dblink_init();
1392
1393 msg = PQerrorMessage(conn);
1394 if (msg == NULL || msg[0] == '\0')
1396 else
1398}
1399
1400/*
1401 * Execute an SQL non-SELECT command
1402 */
1404Datum
1406{
1407 text *volatile sql_cmd_status = NULL;
1408 PGconn *volatile conn = NULL;
1409 volatile bool freeconn = false;
1410
1411 dblink_init();
1412
1413 PG_TRY();
1414 {
1415 PGresult *res = NULL;
1416 char *sql = NULL;
1417 char *conname = NULL;
1418 bool fail = true; /* default to backward compatible behavior */
1419
1420 if (PG_NARGS() == 3)
1421 {
1422 /* must be text,text,bool */
1423 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1425 fail = PG_GETARG_BOOL(2);
1426 dblink_get_conn(conname, &conn, &conname, &freeconn);
1427 }
1428 else if (PG_NARGS() == 2)
1429 {
1430 /* might be text,text or text,bool */
1431 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1432 {
1434 fail = PG_GETARG_BOOL(1);
1435 conn = pconn->conn;
1436 }
1437 else
1438 {
1439 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1441 dblink_get_conn(conname, &conn, &conname, &freeconn);
1442 }
1443 }
1444 else if (PG_NARGS() == 1)
1445 {
1446 /* must be single text argument */
1447 conn = pconn->conn;
1449 }
1450 else
1451 /* shouldn't happen */
1452 elog(ERROR, "wrong number of arguments");
1453
1454 if (!conn)
1455 dblink_conn_not_avail(conname);
1456
1458 if (!res ||
1461 {
1462 dblink_res_error(conn, conname, res, fail,
1463 "while executing command");
1464
1465 /*
1466 * and save a copy of the command status string to return as our
1467 * result tuple
1468 */
1469 sql_cmd_status = cstring_to_text("ERROR");
1470 }
1471 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1472 {
1473 /*
1474 * and save a copy of the command status string to return as our
1475 * result tuple
1476 */
1477 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1478 PQclear(res);
1479 }
1480 else
1481 {
1482 PQclear(res);
1483 ereport(ERROR,
1484 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1485 errmsg("statement returning results not allowed")));
1486 }
1487 }
1488 PG_FINALLY();
1489 {
1490 /* if needed, close the connection to the database */
1491 if (freeconn)
1493 }
1494 PG_END_TRY();
1495
1496 PG_RETURN_TEXT_P(sql_cmd_status);
1497}
1498
1499
1500/*
1501 * dblink_get_pkey
1502 *
1503 * Return list of primary key fields for the supplied relation,
1504 * or NULL if none exists.
1505 */
1507Datum
1509{
1510 int16 indnkeyatts;
1511 char **results;
1512 FuncCallContext *funcctx;
1513 int32 call_cntr;
1514 int32 max_calls;
1515 AttInMetadata *attinmeta;
1516 MemoryContext oldcontext;
1517
1518 /* stuff done only on the first call of the function */
1519 if (SRF_IS_FIRSTCALL())
1520 {
1521 Relation rel;
1522 TupleDesc tupdesc;
1523
1524 /* create a function context for cross-call persistence */
1525 funcctx = SRF_FIRSTCALL_INIT();
1526
1527 /*
1528 * switch to memory context appropriate for multiple function calls
1529 */
1530 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1531
1532 /* open target relation */
1534
1535 /* get the array of attnums */
1536 results = get_pkey_attnames(rel, &indnkeyatts);
1537
1539
1540 /*
1541 * need a tuple descriptor representing one INT and one TEXT column
1542 */
1543 tupdesc = CreateTemplateTupleDesc(2);
1544 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1545 INT4OID, -1, 0);
1546 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1547 TEXTOID, -1, 0);
1548
1549 /*
1550 * Generate attribute metadata needed later to produce tuples from raw
1551 * C strings
1552 */
1553 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1554 funcctx->attinmeta = attinmeta;
1555
1556 if ((results != NULL) && (indnkeyatts > 0))
1557 {
1558 funcctx->max_calls = indnkeyatts;
1559
1560 /* got results, keep track of them */
1561 funcctx->user_fctx = results;
1562 }
1563 else
1564 {
1565 /* fast track when no results */
1566 MemoryContextSwitchTo(oldcontext);
1567 SRF_RETURN_DONE(funcctx);
1568 }
1569
1570 MemoryContextSwitchTo(oldcontext);
1571 }
1572
1573 /* stuff done on every call of the function */
1574 funcctx = SRF_PERCALL_SETUP();
1575
1576 /*
1577 * initialize per-call variables
1578 */
1579 call_cntr = funcctx->call_cntr;
1580 max_calls = funcctx->max_calls;
1581
1582 results = (char **) funcctx->user_fctx;
1583 attinmeta = funcctx->attinmeta;
1584
1585 if (call_cntr < max_calls) /* do when there is more left to send */
1586 {
1587 char **values;
1588 HeapTuple tuple;
1589 Datum result;
1590
1591 values = palloc_array(char *, 2);
1592 values[0] = psprintf("%d", call_cntr + 1);
1593 values[1] = results[call_cntr];
1594
1595 /* build the tuple */
1596 tuple = BuildTupleFromCStrings(attinmeta, values);
1597
1598 /* make the tuple into a datum */
1599 result = HeapTupleGetDatum(tuple);
1600
1601 SRF_RETURN_NEXT(funcctx, result);
1602 }
1603 else
1604 {
1605 /* do when there is no more left */
1606 SRF_RETURN_DONE(funcctx);
1607 }
1608}
1609
1610
1611/*
1612 * dblink_build_sql_insert
1613 *
1614 * Used to generate an SQL insert statement
1615 * based on an existing tuple in a local relation.
1616 * This is useful for selectively replicating data
1617 * to another server via dblink.
1618 *
1619 * API:
1620 * <relname> - name of local table of interest
1621 * <pkattnums> - an int2vector of attnums which will be used
1622 * to identify the local tuple of interest
1623 * <pknumatts> - number of attnums in pkattnums
1624 * <src_pkattvals_arry> - text array of key values which will be used
1625 * to identify the local tuple of interest
1626 * <tgt_pkattvals_arry> - text array of key values which will be used
1627 * to build the string for execution remotely. These are substituted
1628 * for their counterparts in src_pkattvals_arry
1629 */
1631Datum
1633{
1634 text *relname_text = PG_GETARG_TEXT_PP(0);
1635 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1636 int32 pknumatts_arg = PG_GETARG_INT32(2);
1637 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1638 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1639 Relation rel;
1640 int *pkattnums;
1641 int pknumatts;
1642 char **src_pkattvals;
1643 char **tgt_pkattvals;
1644 int src_nitems;
1645 int tgt_nitems;
1646 char *sql;
1647
1648 /*
1649 * Open target relation.
1650 */
1651 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1652
1653 /*
1654 * Process pkattnums argument.
1655 */
1656 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1657 &pkattnums, &pknumatts);
1658
1659 /*
1660 * Source array is made up of key values that will be used to locate the
1661 * tuple of interest from the local system.
1662 */
1663 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1664
1665 /*
1666 * There should be one source array key value for each key attnum
1667 */
1668 if (src_nitems != pknumatts)
1669 ereport(ERROR,
1670 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1671 errmsg("source key array length must match number of key attributes")));
1672
1673 /*
1674 * Target array is made up of key values that will be used to build the
1675 * SQL string for use on the remote system.
1676 */
1677 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1678
1679 /*
1680 * There should be one target array key value for each key attnum
1681 */
1682 if (tgt_nitems != pknumatts)
1683 ereport(ERROR,
1684 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1685 errmsg("target key array length must match number of key attributes")));
1686
1687 /*
1688 * Prep work is finally done. Go get the SQL string.
1689 */
1690 sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1691
1692 /*
1693 * Now we can close the relation.
1694 */
1696
1697 /*
1698 * And send it
1699 */
1701}
1702
1703
1704/*
1705 * dblink_build_sql_delete
1706 *
1707 * Used to generate an SQL delete statement.
1708 * This is useful for selectively replicating a
1709 * delete to another server via dblink.
1710 *
1711 * API:
1712 * <relname> - name of remote table of interest
1713 * <pkattnums> - an int2vector of attnums which will be used
1714 * to identify the remote tuple of interest
1715 * <pknumatts> - number of attnums in pkattnums
1716 * <tgt_pkattvals_arry> - text array of key values which will be used
1717 * to build the string for execution remotely.
1718 */
1720Datum
1722{
1723 text *relname_text = PG_GETARG_TEXT_PP(0);
1724 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1725 int32 pknumatts_arg = PG_GETARG_INT32(2);
1726 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1727 Relation rel;
1728 int *pkattnums;
1729 int pknumatts;
1730 char **tgt_pkattvals;
1731 int tgt_nitems;
1732 char *sql;
1733
1734 /*
1735 * Open target relation.
1736 */
1737 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1738
1739 /*
1740 * Process pkattnums argument.
1741 */
1742 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1743 &pkattnums, &pknumatts);
1744
1745 /*
1746 * Target array is made up of key values that will be used to build the
1747 * SQL string for use on the remote system.
1748 */
1749 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1750
1751 /*
1752 * There should be one target array key value for each key attnum
1753 */
1754 if (tgt_nitems != pknumatts)
1755 ereport(ERROR,
1756 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1757 errmsg("target key array length must match number of key attributes")));
1758
1759 /*
1760 * Prep work is finally done. Go get the SQL string.
1761 */
1762 sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1763
1764 /*
1765 * Now we can close the relation.
1766 */
1768
1769 /*
1770 * And send it
1771 */
1773}
1774
1775
1776/*
1777 * dblink_build_sql_update
1778 *
1779 * Used to generate an SQL update statement
1780 * based on an existing tuple in a local relation.
1781 * This is useful for selectively replicating data
1782 * to another server via dblink.
1783 *
1784 * API:
1785 * <relname> - name of local table of interest
1786 * <pkattnums> - an int2vector of attnums which will be used
1787 * to identify the local tuple of interest
1788 * <pknumatts> - number of attnums in pkattnums
1789 * <src_pkattvals_arry> - text array of key values which will be used
1790 * to identify the local tuple of interest
1791 * <tgt_pkattvals_arry> - text array of key values which will be used
1792 * to build the string for execution remotely. These are substituted
1793 * for their counterparts in src_pkattvals_arry
1794 */
1796Datum
1798{
1799 text *relname_text = PG_GETARG_TEXT_PP(0);
1800 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1801 int32 pknumatts_arg = PG_GETARG_INT32(2);
1802 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1803 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1804 Relation rel;
1805 int *pkattnums;
1806 int pknumatts;
1807 char **src_pkattvals;
1808 char **tgt_pkattvals;
1809 int src_nitems;
1810 int tgt_nitems;
1811 char *sql;
1812
1813 /*
1814 * Open target relation.
1815 */
1816 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1817
1818 /*
1819 * Process pkattnums argument.
1820 */
1821 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1822 &pkattnums, &pknumatts);
1823
1824 /*
1825 * Source array is made up of key values that will be used to locate the
1826 * tuple of interest from the local system.
1827 */
1828 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1829
1830 /*
1831 * There should be one source array key value for each key attnum
1832 */
1833 if (src_nitems != pknumatts)
1834 ereport(ERROR,
1835 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1836 errmsg("source key array length must match number of key attributes")));
1837
1838 /*
1839 * Target array is made up of key values that will be used to build the
1840 * SQL string for use on the remote system.
1841 */
1842 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1843
1844 /*
1845 * There should be one target array key value for each key attnum
1846 */
1847 if (tgt_nitems != pknumatts)
1848 ereport(ERROR,
1849 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1850 errmsg("target key array length must match number of key attributes")));
1851
1852 /*
1853 * Prep work is finally done. Go get the SQL string.
1854 */
1855 sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1856
1857 /*
1858 * Now we can close the relation.
1859 */
1861
1862 /*
1863 * And send it
1864 */
1866}
1867
1868/*
1869 * dblink_current_query
1870 * return the current query string
1871 * to allow its use in (among other things)
1872 * rewrite rules
1873 */
1875Datum
1877{
1878 /* This is now just an alias for the built-in function current_query() */
1880}
1881
1882/*
1883 * Retrieve async notifications for a connection.
1884 *
1885 * Returns a setof record of notifications, or an empty set if none received.
1886 * Can optionally take a named connection as parameter, but uses the unnamed
1887 * connection per default.
1888 *
1889 */
1890#define DBLINK_NOTIFY_COLS 3
1891
1893Datum
1895{
1896 PGconn *conn;
1897 PGnotify *notify;
1898 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1899
1900 dblink_init();
1901 if (PG_NARGS() == 1)
1903 else
1904 conn = pconn->conn;
1905
1906 InitMaterializedSRF(fcinfo, 0);
1907
1909 while ((notify = PQnotifies(conn)) != NULL)
1910 {
1912 bool nulls[DBLINK_NOTIFY_COLS];
1913
1914 memset(values, 0, sizeof(values));
1915 memset(nulls, 0, sizeof(nulls));
1916
1917 if (notify->relname != NULL)
1918 values[0] = CStringGetTextDatum(notify->relname);
1919 else
1920 nulls[0] = true;
1921
1922 values[1] = Int32GetDatum(notify->be_pid);
1923
1924 if (notify->extra != NULL)
1925 values[2] = CStringGetTextDatum(notify->extra);
1926 else
1927 nulls[2] = true;
1928
1929 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1930
1931 PQfreemem(notify);
1933 }
1934
1935 return (Datum) 0;
1936}
1937
1938/*
1939 * Validate the options given to a dblink foreign server or user mapping.
1940 * Raise an error if any option is invalid.
1941 *
1942 * We just check the names of options here, so semantic errors in options,
1943 * such as invalid numeric format, will be detected at the attempt to connect.
1944 */
1946Datum
1948{
1949 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1950 Oid context = PG_GETARG_OID(1);
1951 ListCell *cell;
1952
1953 static const PQconninfoOption *options = NULL;
1954
1955 /*
1956 * Get list of valid libpq options.
1957 *
1958 * To avoid unnecessary work, we get the list once and use it throughout
1959 * the lifetime of this backend process. We don't need to care about
1960 * memory context issues, because PQconndefaults allocates with malloc.
1961 */
1962 if (!options)
1963 {
1965 if (!options) /* assume reason for failure is OOM */
1966 ereport(ERROR,
1967 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1968 errmsg("out of memory"),
1969 errdetail("Could not get libpq's default connection options.")));
1970 }
1971
1972 /* Validate each supplied option. */
1973 foreach(cell, options_list)
1974 {
1975 DefElem *def = (DefElem *) lfirst(cell);
1976
1977 if (!is_valid_dblink_fdw_option(options, def->defname, context))
1978 {
1979 /*
1980 * Unknown option, or invalid option for the context specified, so
1981 * complain about it. Provide a hint with a valid option that
1982 * looks similar, if there is one.
1983 */
1984 const PQconninfoOption *opt;
1985 const char *closest_match;
1987 bool has_valid_options = false;
1988
1990 for (opt = options; opt->keyword; opt++)
1991 {
1992 if (is_valid_dblink_option(options, opt->keyword, context))
1993 {
1994 has_valid_options = true;
1996 }
1997 }
1998
1999 closest_match = getClosestMatch(&match_state);
2000 ereport(ERROR,
2001 (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
2002 errmsg("invalid option \"%s\"", def->defname),
2003 has_valid_options ? closest_match ?
2004 errhint("Perhaps you meant the option \"%s\".",
2005 closest_match) : 0 :
2006 errhint("There are no valid options in this context.")));
2007 }
2008 }
2009
2011}
2012
2013
2014/*************************************************************
2015 * internal functions
2016 */
2017
2018
2019/*
2020 * get_pkey_attnames
2021 *
2022 * Get the primary key attnames for the given relation.
2023 * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2024 */
2025static char **
2027{
2028 Relation indexRelation;
2029 ScanKeyData skey;
2030 SysScanDesc scan;
2031 HeapTuple indexTuple;
2032 int i;
2033 char **result = NULL;
2034 TupleDesc tupdesc;
2035
2036 /* initialize indnkeyatts to 0 in case no primary key exists */
2037 *indnkeyatts = 0;
2038
2039 tupdesc = rel->rd_att;
2040
2041 /* Prepare to scan pg_index for entries having indrelid = this rel. */
2042 indexRelation = table_open(IndexRelationId, AccessShareLock);
2043 ScanKeyInit(&skey,
2044 Anum_pg_index_indrelid,
2045 BTEqualStrategyNumber, F_OIDEQ,
2047
2048 scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2049 NULL, 1, &skey);
2050
2051 while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2052 {
2054
2055 /* we're only interested if it is the primary key */
2056 if (index->indisprimary)
2057 {
2058 *indnkeyatts = index->indnkeyatts;
2059 if (*indnkeyatts > 0)
2060 {
2061 result = palloc_array(char *, *indnkeyatts);
2062
2063 for (i = 0; i < *indnkeyatts; i++)
2064 result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2065 }
2066 break;
2067 }
2068 }
2069
2070 systable_endscan(scan);
2071 table_close(indexRelation, AccessShareLock);
2072
2073 return result;
2074}
2075
2076/*
2077 * Deconstruct a text[] into C-strings (note any NULL elements will be
2078 * returned as NULL pointers)
2079 */
2080static char **
2082{
2083 int ndim = ARR_NDIM(array);
2084 int *dims = ARR_DIMS(array);
2085 int nitems;
2086 int16 typlen;
2087 bool typbyval;
2088 char typalign;
2089 char **values;
2090 char *ptr;
2091 bits8 *bitmap;
2092 int bitmask;
2093 int i;
2094
2095 Assert(ARR_ELEMTYPE(array) == TEXTOID);
2096
2097 *numitems = nitems = ArrayGetNItems(ndim, dims);
2098
2100 &typlen, &typbyval, &typalign);
2101
2102 values = palloc_array(char *, nitems);
2103
2104 ptr = ARR_DATA_PTR(array);
2105 bitmap = ARR_NULLBITMAP(array);
2106 bitmask = 1;
2107
2108 for (i = 0; i < nitems; i++)
2109 {
2110 if (bitmap && (*bitmap & bitmask) == 0)
2111 {
2112 values[i] = NULL;
2113 }
2114 else
2115 {
2117 ptr = att_addlength_pointer(ptr, typlen, ptr);
2118 ptr = (char *) att_align_nominal(ptr, typalign);
2119 }
2120
2121 /* advance bitmap pointer if any */
2122 if (bitmap)
2123 {
2124 bitmask <<= 1;
2125 if (bitmask == 0x100)
2126 {
2127 bitmap++;
2128 bitmask = 1;
2129 }
2130 }
2131 }
2132
2133 return values;
2134}
2135
2136static char *
2137get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2138{
2139 char *relname;
2140 HeapTuple tuple;
2141 TupleDesc tupdesc;
2142 int natts;
2144 char *val;
2145 int key;
2146 int i;
2147 bool needComma;
2148
2150
2151 /* get relation name including any needed schema prefix and quoting */
2153
2154 tupdesc = rel->rd_att;
2155 natts = tupdesc->natts;
2156
2157 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2158 if (!tuple)
2159 ereport(ERROR,
2160 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2161 errmsg("source row not found")));
2162
2163 appendStringInfo(&buf, "INSERT INTO %s(", relname);
2164
2165 needComma = false;
2166 for (i = 0; i < natts; i++)
2167 {
2168 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2169
2170 if (att->attisdropped)
2171 continue;
2172
2173 if (needComma)
2175
2177 quote_ident_cstr(NameStr(att->attname)));
2178 needComma = true;
2179 }
2180
2181 appendStringInfoString(&buf, ") VALUES(");
2182
2183 /*
2184 * Note: i is physical column number (counting from 0).
2185 */
2186 needComma = false;
2187 for (i = 0; i < natts; i++)
2188 {
2189 if (TupleDescAttr(tupdesc, i)->attisdropped)
2190 continue;
2191
2192 if (needComma)
2194
2195 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2196
2197 if (key >= 0)
2198 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2199 else
2200 val = SPI_getvalue(tuple, tupdesc, i + 1);
2201
2202 if (val != NULL)
2203 {
2205 pfree(val);
2206 }
2207 else
2208 appendStringInfoString(&buf, "NULL");
2209 needComma = true;
2210 }
2212
2213 return buf.data;
2214}
2215
2216static char *
2217get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2218{
2219 char *relname;
2220 TupleDesc tupdesc;
2222 int i;
2223
2225
2226 /* get relation name including any needed schema prefix and quoting */
2228
2229 tupdesc = rel->rd_att;
2230
2231 appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2232 for (i = 0; i < pknumatts; i++)
2233 {
2234 int pkattnum = pkattnums[i];
2235 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2236
2237 if (i > 0)
2238 appendStringInfoString(&buf, " AND ");
2239
2241 quote_ident_cstr(NameStr(attr->attname)));
2242
2243 if (tgt_pkattvals[i] != NULL)
2244 appendStringInfo(&buf, " = %s",
2245 quote_literal_cstr(tgt_pkattvals[i]));
2246 else
2247 appendStringInfoString(&buf, " IS NULL");
2248 }
2249
2250 return buf.data;
2251}
2252
2253static char *
2254get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2255{
2256 char *relname;
2257 HeapTuple tuple;
2258 TupleDesc tupdesc;
2259 int natts;
2261 char *val;
2262 int key;
2263 int i;
2264 bool needComma;
2265
2267
2268 /* get relation name including any needed schema prefix and quoting */
2270
2271 tupdesc = rel->rd_att;
2272 natts = tupdesc->natts;
2273
2274 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2275 if (!tuple)
2276 ereport(ERROR,
2277 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2278 errmsg("source row not found")));
2279
2280 appendStringInfo(&buf, "UPDATE %s SET ", relname);
2281
2282 /*
2283 * Note: i is physical column number (counting from 0).
2284 */
2285 needComma = false;
2286 for (i = 0; i < natts; i++)
2287 {
2288 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2289
2290 if (attr->attisdropped)
2291 continue;
2292
2293 if (needComma)
2295
2296 appendStringInfo(&buf, "%s = ",
2297 quote_ident_cstr(NameStr(attr->attname)));
2298
2299 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2300
2301 if (key >= 0)
2302 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2303 else
2304 val = SPI_getvalue(tuple, tupdesc, i + 1);
2305
2306 if (val != NULL)
2307 {
2309 pfree(val);
2310 }
2311 else
2312 appendStringInfoString(&buf, "NULL");
2313 needComma = true;
2314 }
2315
2316 appendStringInfoString(&buf, " WHERE ");
2317
2318 for (i = 0; i < pknumatts; i++)
2319 {
2320 int pkattnum = pkattnums[i];
2321 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2322
2323 if (i > 0)
2324 appendStringInfoString(&buf, " AND ");
2325
2327 quote_ident_cstr(NameStr(attr->attname)));
2328
2329 val = tgt_pkattvals[i];
2330
2331 if (val != NULL)
2333 else
2334 appendStringInfoString(&buf, " IS NULL");
2335 }
2336
2337 return buf.data;
2338}
2339
2340/*
2341 * Return a properly quoted identifier.
2342 * Uses quote_ident in quote.c
2343 */
2344static char *
2345quote_ident_cstr(char *rawstr)
2346{
2347 text *rawstr_text;
2348 text *result_text;
2349 char *result;
2350
2351 rawstr_text = cstring_to_text(rawstr);
2353 PointerGetDatum(rawstr_text)));
2354 result = text_to_cstring(result_text);
2355
2356 return result;
2357}
2358
2359static int
2360get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2361{
2362 int i;
2363
2364 /*
2365 * Not likely a long list anyway, so just scan for the value
2366 */
2367 for (i = 0; i < pknumatts; i++)
2368 if (key == pkattnums[i])
2369 return i;
2370
2371 return -1;
2372}
2373
2374static HeapTuple
2375get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2376{
2377 char *relname;
2378 TupleDesc tupdesc;
2379 int natts;
2381 int ret;
2382 HeapTuple tuple;
2383 int i;
2384
2385 /*
2386 * Connect to SPI manager
2387 */
2388 SPI_connect();
2389
2391
2392 /* get relation name including any needed schema prefix and quoting */
2394
2395 tupdesc = rel->rd_att;
2396 natts = tupdesc->natts;
2397
2398 /*
2399 * Build sql statement to look up tuple of interest, ie, the one matching
2400 * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2401 * generate a result tuple that matches the table's physical structure,
2402 * with NULLs for any dropped columns. Otherwise we have to deal with two
2403 * different tupdescs and everything's very confusing.
2404 */
2405 appendStringInfoString(&buf, "SELECT ");
2406
2407 for (i = 0; i < natts; i++)
2408 {
2409 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2410
2411 if (i > 0)
2413
2414 if (attr->attisdropped)
2415 appendStringInfoString(&buf, "NULL");
2416 else
2418 quote_ident_cstr(NameStr(attr->attname)));
2419 }
2420
2421 appendStringInfo(&buf, " FROM %s WHERE ", relname);
2422
2423 for (i = 0; i < pknumatts; i++)
2424 {
2425 int pkattnum = pkattnums[i];
2426 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2427
2428 if (i > 0)
2429 appendStringInfoString(&buf, " AND ");
2430
2432 quote_ident_cstr(NameStr(attr->attname)));
2433
2434 if (src_pkattvals[i] != NULL)
2435 appendStringInfo(&buf, " = %s",
2436 quote_literal_cstr(src_pkattvals[i]));
2437 else
2438 appendStringInfoString(&buf, " IS NULL");
2439 }
2440
2441 /*
2442 * Retrieve the desired tuple
2443 */
2444 ret = SPI_exec(buf.data, 0);
2445 pfree(buf.data);
2446
2447 /*
2448 * Only allow one qualifying tuple
2449 */
2450 if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2451 ereport(ERROR,
2452 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2453 errmsg("source criteria matched more than one record")));
2454
2455 else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2456 {
2457 SPITupleTable *tuptable = SPI_tuptable;
2458
2459 tuple = SPI_copytuple(tuptable->vals[0]);
2460 SPI_finish();
2461
2462 return tuple;
2463 }
2464 else
2465 {
2466 /*
2467 * no qualifying tuples
2468 */
2469 SPI_finish();
2470
2471 return NULL;
2472 }
2473
2474 /*
2475 * never reached, but keep compiler quiet
2476 */
2477 return NULL;
2478}
2479
2480/*
2481 * Open the relation named by relname_text, acquire specified type of lock,
2482 * verify we have specified permissions.
2483 * Caller must close rel when done with it.
2484 */
2485static Relation
2486get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2487{
2488 RangeVar *relvar;
2489 Relation rel;
2490 AclResult aclresult;
2491
2492 relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2493 rel = table_openrv(relvar, lockmode);
2494
2495 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2496 aclmode);
2497 if (aclresult != ACLCHECK_OK)
2498 aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2500
2501 return rel;
2502}
2503
2504/*
2505 * generate_relation_name - copied from ruleutils.c
2506 * Compute the name to display for a relation
2507 *
2508 * The result includes all necessary quoting and schema-prefixing.
2509 */
2510static char *
2512{
2513 char *nspname;
2514 char *result;
2515
2516 /* Qualify the name if not visible in search path */
2518 nspname = NULL;
2519 else
2520 nspname = get_namespace_name(rel->rd_rel->relnamespace);
2521
2523
2524 return result;
2525}
2526
2527
2528static remoteConn *
2530{
2531 remoteConnHashEnt *hentry;
2532 char *key;
2533
2534 if (!remoteConnHash)
2536
2537 key = pstrdup(name);
2538 truncate_identifier(key, strlen(key), false);
2540 key, HASH_FIND, NULL);
2541
2542 if (hentry)
2543 return hentry->rconn;
2544
2545 return NULL;
2546}
2547
2548static HTAB *
2550{
2551 HASHCTL ctl;
2552
2553 ctl.keysize = NAMEDATALEN;
2554 ctl.entrysize = sizeof(remoteConnHashEnt);
2555
2556 return hash_create("Remote Con hash", NUMCONN, &ctl,
2558}
2559
2560static void
2562{
2563 remoteConnHashEnt *hentry;
2564 bool found;
2565 char *key;
2566
2567 if (!remoteConnHash)
2569
2570 key = pstrdup(name);
2571 truncate_identifier(key, strlen(key), true);
2573 HASH_ENTER, &found);
2574
2575 if (found)
2576 {
2577 libpqsrv_disconnect(rconn->conn);
2578 pfree(rconn);
2579
2580 ereport(ERROR,
2582 errmsg("duplicate connection name")));
2583 }
2584
2585 hentry->rconn = rconn;
2586}
2587
2588static void
2590{
2591 remoteConnHashEnt *hentry;
2592 bool found;
2593 char *key;
2594
2595 if (!remoteConnHash)
2597
2598 key = pstrdup(name);
2599 truncate_identifier(key, strlen(key), false);
2601 key, HASH_REMOVE, &found);
2602
2603 if (!hentry)
2604 ereport(ERROR,
2605 (errcode(ERRCODE_UNDEFINED_OBJECT),
2606 errmsg("undefined connection name")));
2607}
2608
2609 /*
2610 * Ensure that require_auth and SCRAM keys are correctly set on connstr.
2611 * SCRAM keys used to pass-through are coming from the initial connection
2612 * from the client with the server.
2613 *
2614 * All required SCRAM options are set by dblink, so we just need to ensure
2615 * that these options are not overwritten by the user.
2616 *
2617 * See appendSCRAMKeysInfo and its usage for more.
2618 */
2619bool
2621{
2623 bool has_scram_server_key = false;
2624 bool has_scram_client_key = false;
2625 bool has_require_auth = false;
2626 bool has_scram_keys = false;
2627
2629 if (options)
2630 {
2631 /*
2632 * Continue iterating even if we found the keys that we need to
2633 * validate to make sure that there is no other declaration of these
2634 * keys that can overwrite the first.
2635 */
2636 for (PQconninfoOption *option = options; option->keyword != NULL; option++)
2637 {
2638 if (strcmp(option->keyword, "require_auth") == 0)
2639 {
2640 if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
2641 has_require_auth = true;
2642 else
2643 has_require_auth = false;
2644 }
2645
2646 if (strcmp(option->keyword, "scram_client_key") == 0)
2647 {
2648 if (option->val != NULL && option->val[0] != '\0')
2649 has_scram_client_key = true;
2650 else
2651 has_scram_client_key = false;
2652 }
2653
2654 if (strcmp(option->keyword, "scram_server_key") == 0)
2655 {
2656 if (option->val != NULL && option->val[0] != '\0')
2657 has_scram_server_key = true;
2658 else
2659 has_scram_server_key = false;
2660 }
2661 }
2663 }
2664
2665 has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort->has_scram_keys;
2666
2667 return (has_scram_keys && has_require_auth);
2668}
2669
2670/*
2671 * We need to make sure that the connection made used credentials
2672 * which were provided by the user, so check what credentials were
2673 * used to connect and then make sure that they came from the user.
2674 */
2675static void
2677{
2678 /* Superuser bypasses security check */
2679 if (superuser())
2680 return;
2681
2682 /* If password was used to connect, make sure it was one provided */
2684 return;
2685
2686 /*
2687 * Password was not used to connect, check if SCRAM pass-through is in
2688 * use.
2689 *
2690 * If dblink_connstr_has_required_scram_options is true we assume that
2691 * UseScramPassthrough is also true because the required SCRAM keys are
2692 * only added if UseScramPassthrough is set, and the user is not allowed
2693 * to add the SCRAM keys on fdw and user mapping options.
2694 */
2696 return;
2697
2698#ifdef ENABLE_GSS
2699 /* If GSSAPI creds used to connect, make sure it was one delegated */
2701 return;
2702#endif
2703
2704 /* Otherwise, fail out */
2706 if (rconn)
2707 pfree(rconn);
2708
2709 ereport(ERROR,
2710 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2711 errmsg("password or GSSAPI delegated credentials required"),
2712 errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
2713 errhint("Ensure provided credentials match target server's authentication method.")));
2714}
2715
2716/*
2717 * Function to check if the connection string includes an explicit
2718 * password, needed to ensure that non-superuser password-based auth
2719 * is using a provided password and not one picked up from the
2720 * environment.
2721 */
2722static bool
2724{
2727 bool connstr_gives_password = false;
2728
2730 if (options)
2731 {
2732 for (option = options; option->keyword != NULL; option++)
2733 {
2734 if (strcmp(option->keyword, "password") == 0)
2735 {
2736 if (option->val != NULL && option->val[0] != '\0')
2737 {
2738 connstr_gives_password = true;
2739 break;
2740 }
2741 }
2742 }
2744 }
2745
2746 return connstr_gives_password;
2747}
2748
2749/*
2750 * For non-superusers, insist that the connstr specify a password, except if
2751 * GSSAPI credentials have been delegated (and we check that they are used for
2752 * the connection in dblink_security_check later) or if SCRAM pass-through is
2753 * being used. This prevents a password or GSSAPI credentials from being
2754 * picked up from .pgpass, a service file, the environment, etc. We don't want
2755 * the postgres user's passwords or Kerberos credentials to be accessible to
2756 * non-superusers. In case of SCRAM pass-through insist that the connstr
2757 * has the required SCRAM pass-through options.
2758 */
2759static void
2761{
2762 if (superuser())
2763 return;
2764
2766 return;
2767
2769 return;
2770
2771#ifdef ENABLE_GSS
2773 return;
2774#endif
2775
2776 ereport(ERROR,
2777 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2778 errmsg("password or GSSAPI delegated credentials required"),
2779 errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
2780}
2781
2782/*
2783 * Report an error received from the remote server
2784 *
2785 * res: the received error result (will be freed)
2786 * fail: true for ERROR ereport, false for NOTICE
2787 * fmt and following args: sprintf-style format and values for errcontext;
2788 * the resulting string should be worded like "while <some action>"
2789 */
2790static void
2791dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2792 bool fail, const char *fmt,...)
2793{
2794 int level;
2795 char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2796 char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2797 char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2798 char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2799 char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2800 int sqlstate;
2801 char *message_primary;
2802 char *message_detail;
2803 char *message_hint;
2804 char *message_context;
2805 va_list ap;
2806 char dblink_context_msg[512];
2807
2808 if (fail)
2809 level = ERROR;
2810 else
2811 level = NOTICE;
2812
2813 if (pg_diag_sqlstate)
2814 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2815 pg_diag_sqlstate[1],
2816 pg_diag_sqlstate[2],
2817 pg_diag_sqlstate[3],
2818 pg_diag_sqlstate[4]);
2819 else
2820 sqlstate = ERRCODE_CONNECTION_FAILURE;
2821
2822 message_primary = xpstrdup(pg_diag_message_primary);
2823 message_detail = xpstrdup(pg_diag_message_detail);
2824 message_hint = xpstrdup(pg_diag_message_hint);
2825 message_context = xpstrdup(pg_diag_context);
2826
2827 /*
2828 * If we don't get a message from the PGresult, try the PGconn. This is
2829 * needed because for connection-level failures, PQgetResult may just
2830 * return NULL, not a PGresult at all.
2831 */
2832 if (message_primary == NULL)
2833 message_primary = pchomp(PQerrorMessage(conn));
2834
2835 /*
2836 * Now that we've copied all the data we need out of the PGresult, it's
2837 * safe to free it. We must do this to avoid PGresult leakage. We're
2838 * leaking all the strings too, but those are in palloc'd memory that will
2839 * get cleaned up eventually.
2840 */
2841 PQclear(res);
2842
2843 /*
2844 * Format the basic errcontext string. Below, we'll add on something
2845 * about the connection name. That's a violation of the translatability
2846 * guidelines about constructing error messages out of parts, but since
2847 * there's no translation support for dblink, there's no need to worry
2848 * about that (yet).
2849 */
2850 va_start(ap, fmt);
2851 vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2852 va_end(ap);
2853
2854 ereport(level,
2855 (errcode(sqlstate),
2856 (message_primary != NULL && message_primary[0] != '\0') ?
2857 errmsg_internal("%s", message_primary) :
2858 errmsg("could not obtain message string for remote error"),
2859 message_detail ? errdetail_internal("%s", message_detail) : 0,
2860 message_hint ? errhint("%s", message_hint) : 0,
2861 message_context ? (errcontext("%s", message_context)) : 0,
2862 conname ?
2863 (errcontext("%s on dblink connection named \"%s\"",
2864 dblink_context_msg, conname)) :
2865 (errcontext("%s on unnamed dblink connection",
2866 dblink_context_msg))));
2867}
2868
2869/*
2870 * Obtain connection string for a foreign server
2871 */
2872static char *
2873get_connect_string(const char *servername)
2874{
2875 ForeignServer *foreign_server = NULL;
2876 UserMapping *user_mapping;
2877 ListCell *cell;
2879 ForeignDataWrapper *fdw;
2880 AclResult aclresult;
2881 char *srvname;
2882
2883 static const PQconninfoOption *options = NULL;
2884
2886
2887 /*
2888 * Get list of valid libpq options.
2889 *
2890 * To avoid unnecessary work, we get the list once and use it throughout
2891 * the lifetime of this backend process. We don't need to care about
2892 * memory context issues, because PQconndefaults allocates with malloc.
2893 */
2894 if (!options)
2895 {
2897 if (!options) /* assume reason for failure is OOM */
2898 ereport(ERROR,
2899 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2900 errmsg("out of memory"),
2901 errdetail("Could not get libpq's default connection options.")));
2902 }
2903
2904 /* first gather the server connstr options */
2905 srvname = pstrdup(servername);
2906 truncate_identifier(srvname, strlen(srvname), false);
2907 foreign_server = GetForeignServerByName(srvname, true);
2908
2909 if (foreign_server)
2910 {
2911 Oid serverid = foreign_server->serverid;
2912 Oid fdwid = foreign_server->fdwid;
2913 Oid userid = GetUserId();
2914
2915 user_mapping = GetUserMapping(userid, serverid);
2916 fdw = GetForeignDataWrapper(fdwid);
2917
2918 /* Check permissions, user must have usage on the server. */
2919 aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
2920 if (aclresult != ACLCHECK_OK)
2921 aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2922
2923 /*
2924 * First append hardcoded options needed for SCRAM pass-through, so if
2925 * the user overwrites these options we can ereport on
2926 * dblink_connstr_check and dblink_security_check.
2927 */
2928 if (MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
2930
2931 foreach(cell, fdw->options)
2932 {
2933 DefElem *def = lfirst(cell);
2934
2935 if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2936 appendStringInfo(&buf, "%s='%s' ", def->defname,
2937 escape_param_str(strVal(def->arg)));
2938 }
2939
2940 foreach(cell, foreign_server->options)
2941 {
2942 DefElem *def = lfirst(cell);
2943
2944 if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2945 appendStringInfo(&buf, "%s='%s' ", def->defname,
2946 escape_param_str(strVal(def->arg)));
2947 }
2948
2949 foreach(cell, user_mapping->options)
2950 {
2951
2952 DefElem *def = lfirst(cell);
2953
2954 if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2955 appendStringInfo(&buf, "%s='%s' ", def->defname,
2956 escape_param_str(strVal(def->arg)));
2957 }
2958
2959 return buf.data;
2960 }
2961 else
2962 return NULL;
2963}
2964
2965/*
2966 * Escaping libpq connect parameter strings.
2967 *
2968 * Replaces "'" with "\'" and "\" with "\\".
2969 */
2970static char *
2972{
2973 const char *cp;
2975
2977
2978 for (cp = str; *cp; cp++)
2979 {
2980 if (*cp == '\\' || *cp == '\'')
2981 appendStringInfoChar(&buf, '\\');
2983 }
2984
2985 return buf.data;
2986}
2987
2988/*
2989 * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2990 * functions, and translate to the internal representation.
2991 *
2992 * The user supplies an int2vector of 1-based logical attnums, plus a count
2993 * argument (the need for the separate count argument is historical, but we
2994 * still check it). We check that each attnum corresponds to a valid,
2995 * non-dropped attribute of the rel. We do *not* prevent attnums from being
2996 * listed twice, though the actual use-case for such things is dubious.
2997 * Note that before Postgres 9.0, the user's attnums were interpreted as
2998 * physical not logical column numbers; this was changed for future-proofing.
2999 *
3000 * The internal representation is a palloc'd int array of 0-based physical
3001 * attnums.
3002 */
3003static void
3005 int2vector *pkattnums_arg, int32 pknumatts_arg,
3006 int **pkattnums, int *pknumatts)
3007{
3008 TupleDesc tupdesc = rel->rd_att;
3009 int natts = tupdesc->natts;
3010 int i;
3011
3012 /* Don't take more array elements than there are */
3013 pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
3014
3015 /* Must have at least one pk attnum selected */
3016 if (pknumatts_arg <= 0)
3017 ereport(ERROR,
3018 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3019 errmsg("number of key attributes must be > 0")));
3020
3021 /* Allocate output array */
3022 *pkattnums = palloc_array(int, pknumatts_arg);
3023 *pknumatts = pknumatts_arg;
3024
3025 /* Validate attnums and convert to internal form */
3026 for (i = 0; i < pknumatts_arg; i++)
3027 {
3028 int pkattnum = pkattnums_arg->values[i];
3029 int lnum;
3030 int j;
3031
3032 /* Can throw error immediately if out of range */
3033 if (pkattnum <= 0 || pkattnum > natts)
3034 ereport(ERROR,
3035 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3036 errmsg("invalid attribute number %d", pkattnum)));
3037
3038 /* Identify which physical column has this logical number */
3039 lnum = 0;
3040 for (j = 0; j < natts; j++)
3041 {
3042 /* dropped columns don't count */
3043 if (TupleDescAttr(tupdesc, j)->attisdropped)
3044 continue;
3045
3046 if (++lnum == pkattnum)
3047 break;
3048 }
3049
3050 if (j < natts)
3051 (*pkattnums)[i] = j;
3052 else
3053 ereport(ERROR,
3054 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3055 errmsg("invalid attribute number %d", pkattnum)));
3056 }
3057}
3058
3059/*
3060 * Check if the specified connection option is valid.
3061 *
3062 * We basically allow whatever libpq thinks is an option, with these
3063 * restrictions:
3064 * debug options: disallowed
3065 * "client_encoding": disallowed
3066 * "user": valid only in USER MAPPING options
3067 * secure options (eg password): valid only in USER MAPPING options
3068 * others: valid only in FOREIGN SERVER options
3069 *
3070 * We disallow client_encoding because it would be overridden anyway via
3071 * PQclientEncoding; allowing it to be specified would merely promote
3072 * confusion.
3073 */
3074static bool
3076 Oid context)
3077{
3078 const PQconninfoOption *opt;
3079
3080 /* Look up the option in libpq result */
3081 for (opt = options; opt->keyword; opt++)
3082 {
3083 if (strcmp(opt->keyword, option) == 0)
3084 break;
3085 }
3086 if (opt->keyword == NULL)
3087 return false;
3088
3089 /* Disallow debug options (particularly "replication") */
3090 if (strchr(opt->dispchar, 'D'))
3091 return false;
3092
3093 /* Disallow "client_encoding" */
3094 if (strcmp(opt->keyword, "client_encoding") == 0)
3095 return false;
3096
3097 /*
3098 * If the option is "user" or marked secure, it should be specified only
3099 * in USER MAPPING. Others should be specified only in SERVER.
3100 */
3101 if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3102 {
3103 if (context != UserMappingRelationId)
3104 return false;
3105 }
3106 else
3107 {
3108 if (context != ForeignServerRelationId)
3109 return false;
3110 }
3111
3112 return true;
3113}
3114
3115/*
3116 * Same as is_valid_dblink_option but also check for only dblink_fdw specific
3117 * options.
3118 */
3119static bool
3121 Oid context)
3122{
3123 if (strcmp(option, "use_scram_passthrough") == 0)
3124 return true;
3125
3126 return is_valid_dblink_option(options, option, context);
3127}
3128
3129/*
3130 * Copy the remote session's values of GUCs that affect datatype I/O
3131 * and apply them locally in a new GUC nesting level. Returns the new
3132 * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3133 * or -1 if no new nestlevel was needed.
3134 *
3135 * We use the equivalent of a function SET option to allow the settings to
3136 * persist only until the caller calls restoreLocalGucs. If an error is
3137 * thrown in between, guc.c will take care of undoing the settings.
3138 */
3139static int
3141{
3142 static const char *const GUCsAffectingIO[] = {
3143 "DateStyle",
3144 "IntervalStyle"
3145 };
3146
3147 int nestlevel = -1;
3148 int i;
3149
3150 for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3151 {
3152 const char *gucName = GUCsAffectingIO[i];
3153 const char *remoteVal = PQparameterStatus(conn, gucName);
3154 const char *localVal;
3155
3156 /*
3157 * If the remote server is pre-8.4, it won't have IntervalStyle, but
3158 * that's okay because its output format won't be ambiguous. So just
3159 * skip the GUC if we don't get a value for it. (We might eventually
3160 * need more complicated logic with remote-version checks here.)
3161 */
3162 if (remoteVal == NULL)
3163 continue;
3164
3165 /*
3166 * Avoid GUC-setting overhead if the remote and local GUCs already
3167 * have the same value.
3168 */
3169 localVal = GetConfigOption(gucName, false, false);
3170 Assert(localVal != NULL);
3171
3172 if (strcmp(remoteVal, localVal) == 0)
3173 continue;
3174
3175 /* Create new GUC nest level if we didn't already */
3176 if (nestlevel < 0)
3177 nestlevel = NewGUCNestLevel();
3178
3179 /* Apply the option (this will throw error on failure) */
3180 (void) set_config_option(gucName, remoteVal,
3182 GUC_ACTION_SAVE, true, 0, false);
3183 }
3184
3185 return nestlevel;
3186}
3187
3188/*
3189 * Restore local GUCs after they have been overlaid with remote settings.
3190 */
3191static void
3192restoreLocalGucs(int nestlevel)
3193{
3194 /* Do nothing if no new nestlevel was created */
3195 if (nestlevel > 0)
3196 AtEOXact_GUC(true, nestlevel);
3197}
3198
3199/*
3200 * Append SCRAM client key and server key information from the global
3201 * MyProcPort into the given StringInfo buffer.
3202 */
3203static void
3205{
3206 int len;
3207 int encoded_len;
3208 char *client_key;
3209 char *server_key;
3210
3212 /* don't forget the zero-terminator */
3213 client_key = palloc0(len + 1);
3214 encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ClientKey,
3215 sizeof(MyProcPort->scram_ClientKey),
3216 client_key, len);
3217 if (encoded_len < 0)
3218 elog(ERROR, "could not encode SCRAM client key");
3219
3221 /* don't forget the zero-terminator */
3222 server_key = palloc0(len + 1);
3223 encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ServerKey,
3224 sizeof(MyProcPort->scram_ServerKey),
3225 server_key, len);
3226 if (encoded_len < 0)
3227 elog(ERROR, "could not encode SCRAM server key");
3228
3229 appendStringInfo(buf, "scram_client_key='%s' ", client_key);
3230 appendStringInfo(buf, "scram_server_key='%s' ", server_key);
3231 appendStringInfoString(buf, "require_auth='scram-sha-256' ");
3232
3233 pfree(client_key);
3234 pfree(server_key);
3235}
3236
3237
3238static bool
3240{
3241 ListCell *cell;
3242
3243 foreach(cell, foreign_server->options)
3244 {
3245 DefElem *def = lfirst(cell);
3246
3247 if (strcmp(def->defname, "use_scram_passthrough") == 0)
3248 return defGetBoolean(def);
3249 }
3250
3251 foreach(cell, user->options)
3252 {
3253 DefElem *def = (DefElem *) lfirst(cell);
3254
3255 if (strcmp(def->defname, "use_scram_passthrough") == 0)
3256 return defGetBoolean(def);
3257 }
3258
3259 return false;
3260}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2639
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition: aclchk.c:3821
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4024
#define ARR_NDIM(a)
Definition: array.h:290
#define PG_GETARG_ARRAYTYPE_P(n)
Definition: array.h:263
#define ARR_DATA_PTR(a)
Definition: array.h:322
#define ARR_NULLBITMAP(a)
Definition: array.h:300
#define ARR_ELEMTYPE(a)
Definition: array.h:292
#define ARR_DIMS(a)
Definition: array.h:294
ArrayBuildState * accumArrayResult(ArrayBuildState *astate, Datum dvalue, bool disnull, Oid element_type, MemoryContext rcontext)
Definition: arrayfuncs.c:5350
Datum makeArrayResult(ArrayBuildState *astate, MemoryContext rcontext)
Definition: arrayfuncs.c:5420
int ArrayGetNItems(int ndim, const int *dims)
Definition: arrayutils.c:57
int16 AttrNumber
Definition: attnum.h:21
Datum current_query(PG_FUNCTION_ARGS)
Definition: misc.c:212
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
int pg_b64_enc_len(int srclen)
Definition: base64.c:224
int pg_b64_encode(const char *src, int len, char *dst, int dstlen)
Definition: base64.c:49
bool be_gssapi_get_delegation(Port *port)
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define NameStr(name)
Definition: c.h:717
#define Min(x, y)
Definition: c.h:975
#define pg_noreturn
Definition: c.h:165
#define pg_attribute_printf(f, a)
Definition: c.h:233
int16_t int16
Definition: c.h:497
uint8 bits8
Definition: c.h:509
int32_t int32
Definition: c.h:498
uint32_t uint32
Definition: c.h:502
#define lengthof(array)
Definition: c.h:759
int64 TimestampTz
Definition: timestamp.h:39
bool defGetBoolean(DefElem *def)
Definition: define.c:94
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1158
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1231
int errdetail(const char *fmt,...)
Definition: elog.c:1204
int errhint(const char *fmt,...)
Definition: elog.c:1318
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define PG_RE_THROW()
Definition: elog.h:405
#define errcontext
Definition: elog.h:197
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:382
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define elog(elevel,...)
Definition: elog.h:226
#define NOTICE
Definition: elog.h:35
#define PG_FINALLY(...)
Definition: elog.h:389
#define ereport(elevel,...)
Definition: elog.h:149
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2324
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2275
@ SFRM_Materialize
Definition: execnodes.h:336
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:7564
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7687
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7434
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:6150
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:7574
int PQconnectionUsedGSSAPI(const PGconn *conn)
Definition: fe-connect.c:7698
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7556
int PQclientEncoding(const PGconn *conn)
Definition: fe-connect.c:7709
PQconninfoOption * PQconndefaults(void)
Definition: fe-connect.c:2190
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7619
int PQsetClientEncoding(PGconn *conn, const char *encoding)
Definition: fe-connect.c:7717
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1948
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
void PQclear(PGresult *res)
Definition: fe-exec.c:721
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3901
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3752
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
PGnotify * PQnotifies(PGconn *conn)
Definition: fe-exec.c:2667
#define palloc_array(type, count)
Definition: fe_memutils.h:76
Oid get_fn_expr_argtype(FmgrInfo *flinfo, int argnum)
Definition: fmgr.c:1910
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_OID(n)
Definition: fmgr.h:275
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define DatumGetTextPP(X)
Definition: fmgr.h:292
#define PG_GETARG_POINTER(n)
Definition: fmgr.h:276
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:682
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
#define PG_NARGS()
Definition: fmgr.h:203
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_TEXT_P(x)
Definition: fmgr.h:372
#define PG_RETURN_INT32(x)
Definition: fmgr.h:354
#define PG_GETARG_INT32(n)
Definition: fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
ForeignDataWrapper * GetForeignDataWrapper(Oid fdwid)
Definition: foreign.c:37
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition: foreign.c:182
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition: foreign.c:200
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:308
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
@ TYPEFUNC_RECORD
Definition: funcapi.h:151
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:306
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:328
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:388
struct Port * MyProcPort
Definition: globals.c:52
int work_mem
Definition: globals.c:132
int NewGUCNestLevel(void)
Definition: guc.c:2235
const char * GetConfigOption(const char *name, bool missing_ok, bool restrict_privileged)
Definition: guc.c:4355
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition: guc.c:2262
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:3342
@ GUC_ACTION_SAVE
Definition: guc.h:205
@ PGC_S_SESSION
Definition: guc.h:126
@ PGC_USERSET
Definition: guc.h:79
Assert(PointerIsAligned(start, uint64))
const char * str
#define HASH_STRINGS
Definition: hsearch.h:96
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define nitems(x)
Definition: indent.h:31
struct parser_state match_state[5]
long val
Definition: informix.c:689
int j
Definition: isn.c:78
int i
Definition: isn.c:77
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
static PGresult * libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
static PGconn * libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
static PGresult * libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
static void libpqsrv_disconnect(PGconn *conn)
@ CONNECTION_BAD
Definition: libpq-fe.h:85
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:138
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
@ PQTRANS_IDLE
Definition: libpq-fe.h:147
int LOCKMODE
Definition: lockdefs.h:26
#define AccessShareLock
Definition: lockdefs.h:36
void get_typlenbyvalalign(Oid typid, int16 *typlen, bool *typbyval, char *typalign)
Definition: lsyscache.c:2411
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3506
int GetDatabaseEncoding(void)
Definition: mbutils.c:1261
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1267
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1256
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:414
char * pstrdup(const char *in)
Definition: mcxt.c:2322
void pfree(void *pointer)
Definition: mcxt.c:2147
void * palloc0(Size size)
Definition: mcxt.c:1970
MemoryContext TopMemoryContext
Definition: mcxt.c:165
char * pchomp(const char *in)
Definition: mcxt.c:2350
MemoryContext CurrentMemoryContext
Definition: mcxt.c:159
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:485
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
Oid GetUserId(void)
Definition: miscinit.c:520
bool RelationIsVisible(Oid relid)
Definition: namespace.c:913
RangeVar * makeRangeVarFromNameList(const List *names)
Definition: namespace.c:3554
#define IsA(nodeptr, _type_)
Definition: nodes.h:164
ObjectType get_relkind_objtype(char relkind)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
uint64 AclMode
Definition: parsenodes.h:74
#define ACL_USAGE
Definition: parsenodes.h:84
@ OBJECT_FOREIGN_SERVER
Definition: parsenodes.h:2334
#define ACL_SELECT
Definition: parsenodes.h:77
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
NameData relname
Definition: pg_class.h:38
#define NAMEDATALEN
const void size_t len
static const char * connstr
Definition: pg_dumpall.c:84
FormData_pg_index * Form_pg_index
Definition: pg_index.h:70
#define lfirst(lc)
Definition: pg_list.h:172
static char ** options
static char * user
Definition: pg_regress.c:119
static char * buf
Definition: pg_test_fsync.c:72
char typalign
Definition: pg_type.h:176
#define vsnprintf
Definition: port.h:238
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
uintptr_t Datum
Definition: postgres.h:69
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:217
unsigned int Oid
Definition: postgres_ext.h:30
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:55
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:52
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:53
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:54
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:59
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
Datum quote_ident(PG_FUNCTION_ARGS)
Definition: quote.c:25
tree ctl
Definition: radixtree.h:1838
#define RelationGetRelid(relation)
Definition: rel.h:516
#define RelationGetRelationName(relation)
Definition: rel.h:550
List * untransformRelOptions(Datum options)
Definition: reloptions.c:1342
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:13103
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
void truncate_identifier(char *ident, int len, bool warn)
Definition: scansup.c:93
uint64 SPI_processed
Definition: spi.c:44
SPITupleTable * SPI_tuptable
Definition: spi.c:45
int SPI_connect(void)
Definition: spi.c:95
int SPI_finish(void)
Definition: spi.c:183
int SPI_exec(const char *src, long tcount)
Definition: spi.c:631
char * SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber)
Definition: spi.c:1221
HeapTuple SPI_copytuple(HeapTuple tuple)
Definition: spi.c:1048
char * SPI_fname(TupleDesc tupdesc, int fnumber)
Definition: spi.c:1199
#define SPI_OK_SELECT
Definition: spi.h:86
void relation_close(Relation relation, LOCKMODE lockmode)
Definition: relation.c:205
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
PGconn * conn
Definition: streamutil.c:52
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
char * defname
Definition: parsenodes.h:826
Node * arg
Definition: parsenodes.h:827
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:275
List * options
Definition: foreign.h:31
List * options
Definition: foreign.h:42
char * servername
Definition: foreign.h:39
Oid serverid
Definition: foreign.h:36
void * user_fctx
Definition: funcapi.h:82
uint64 max_calls
Definition: funcapi.h:74
uint64 call_cntr
Definition: funcapi.h:65
AttInMetadata * attinmeta
Definition: funcapi.h:91
MemoryContext multi_call_memory_ctx
Definition: funcapi.h:101
fmNodePtr resultinfo
Definition: fmgr.h:89
FmgrInfo * flinfo
Definition: fmgr.h:87
Definition: dynahash.c:220
Definition: pg_list.h:54
uint8 scram_ServerKey[SCRAM_MAX_KEY_LEN]
Definition: libpq-be.h:187
bool has_scram_keys
Definition: libpq-be.h:188
uint8 scram_ClientKey[SCRAM_MAX_KEY_LEN]
Definition: libpq-be.h:186
TupleDesc rd_att
Definition: rel.h:112
Form_pg_class rd_rel
Definition: rel.h:111
SetFunctionReturnMode returnMode
Definition: execnodes.h:355
ExprContext * econtext
Definition: execnodes.h:351
TupleDesc setDesc
Definition: execnodes.h:359
Tuplestorestate * setResult
Definition: execnodes.h:358
int allowedModes
Definition: execnodes.h:353
HeapTuple * vals
Definition: spi.h:26
List * options
Definition: foreign.h:50
Definition: type.h:96
Definition: c.h:686
int dim1
Definition: c.h:691
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:693
int val
Definition: getopt_long.h:22
int be_pid
Definition: libpq-fe.h:231
char * relname
Definition: libpq-fe.h:230
char * extra
Definition: libpq-fe.h:232
char name[NAMEDATALEN]
Definition: dblink.c:158
remoteConn * rconn
Definition: dblink.c:159
bool newXactForCursor
Definition: dblink.c:77
int openCursorCount
Definition: dblink.c:76
PGconn * conn
Definition: dblink.c:75
char ** cstrs
Definition: dblink.c:86
MemoryContext tmpcontext
Definition: dblink.c:85
PGresult * cur_res
Definition: dblink.c:89
Tuplestorestate * tuplestore
Definition: dblink.c:83
FunctionCallInfo fcinfo
Definition: dblink.c:82
PGresult * last_res
Definition: dblink.c:88
AttInMetadata * attinmeta
Definition: dblink.c:84
Definition: c.h:658
bool superuser(void)
Definition: superuser.c:46
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:83
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:175
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:245
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:835
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:330
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
void tuplestore_end(Tuplestorestate *state)
Definition: tuplestore.c:492
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:764
#define att_align_nominal(cur_offset, attalign)
Definition: tupmacs.h:150
#define att_addlength_pointer(cur_offset, attlen, attptr)
Definition: tupmacs.h:185
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define strVal(v)
Definition: value.h:82
const char * getClosestMatch(ClosestMatchState *state)
Definition: varlena.c:6445
text * cstring_to_text(const char *s)
Definition: varlena.c:192
void initClosestMatch(ClosestMatchState *state, const char *source, int max_d)
Definition: varlena.c:6390
void updateClosestMatch(ClosestMatchState *state, const char *candidate)
Definition: varlena.c:6410
char * text_to_cstring(const text *t)
Definition: varlena.c:225
List * textToQualifiedNameList(text *textval)
Definition: varlena.c:3467
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition: wait_event.c:163
const char * name