PostgreSQL Source Code git master
Loading...
Searching...
No Matches
postgres_fdw.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * postgres_fdw.c
4 * Foreign-data wrapper for remote PostgreSQL servers
5 *
6 * Portions Copyright (c) 2012-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/postgres_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include <limits.h>
16
17#include "access/htup_details.h"
18#include "access/sysattr.h"
19#include "access/table.h"
20#include "catalog/pg_opfamily.h"
21#include "commands/defrem.h"
24#include "executor/execAsync.h"
25#include "executor/instrument.h"
26#include "foreign/fdwapi.h"
27#include "funcapi.h"
28#include "miscadmin.h"
29#include "nodes/makefuncs.h"
30#include "nodes/nodeFuncs.h"
32#include "optimizer/cost.h"
33#include "optimizer/inherit.h"
34#include "optimizer/optimizer.h"
35#include "optimizer/pathnode.h"
36#include "optimizer/paths.h"
37#include "optimizer/planmain.h"
38#include "optimizer/prep.h"
40#include "optimizer/tlist.h"
41#include "parser/parsetree.h"
42#include "postgres_fdw.h"
43#include "storage/latch.h"
44#include "utils/builtins.h"
45#include "utils/float.h"
46#include "utils/guc.h"
47#include "utils/lsyscache.h"
48#include "utils/memutils.h"
49#include "utils/rel.h"
50#include "utils/sampling.h"
51#include "utils/selfuncs.h"
52
54 .name = "postgres_fdw",
55 .version = PG_VERSION
56);
57
58/* Default CPU cost to start up a foreign query. */
59#define DEFAULT_FDW_STARTUP_COST 100.0
60
61/* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
62#define DEFAULT_FDW_TUPLE_COST 0.2
63
64/* If no remote estimates, assume a sort costs 20% extra */
65#define DEFAULT_FDW_SORT_MULTIPLIER 1.2
66
67/*
68 * Indexes of FDW-private information stored in fdw_private lists.
69 *
70 * These items are indexed with the enum FdwScanPrivateIndex, so an item
71 * can be fetched with list_nth(). For example, to get the SELECT statement:
72 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
73 */
75{
76 /* SQL statement to execute remotely (as a String node) */
78 /* Integer list of attribute numbers retrieved by the SELECT */
80 /* Integer representing the desired fetch_size */
82
83 /*
84 * String describing join i.e. names of relations being joined and types
85 * of join, added when the scan is join
86 */
88};
89
90/*
91 * Similarly, this enum describes what's kept in the fdw_private list for
92 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
93 *
94 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
95 * 2) Integer list of target attribute numbers for INSERT/UPDATE
96 * (NIL for a DELETE)
97 * 3) Length till the end of VALUES clause for INSERT
98 * (-1 for a DELETE/UPDATE)
99 * 4) Boolean flag showing if the remote query has a RETURNING clause
100 * 5) Integer list of attribute numbers retrieved by RETURNING, if any
101 */
103{
104 /* SQL statement to execute remotely (as a String node) */
106 /* Integer list of target attribute numbers for INSERT/UPDATE */
108 /* Length till the end of VALUES clause (as an Integer node) */
110 /* has-returning flag (as a Boolean node) */
112 /* Integer list of attribute numbers retrieved by RETURNING */
114};
115
116/*
117 * Similarly, this enum describes what's kept in the fdw_private list for
118 * a ForeignScan node that modifies a foreign table directly. We store:
119 *
120 * 1) UPDATE/DELETE statement text to be sent to the remote server
121 * 2) Boolean flag showing if the remote query has a RETURNING clause
122 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
123 * 4) Boolean flag showing if we set the command es_processed
124 */
126{
127 /* SQL statement to execute remotely (as a String node) */
129 /* has-returning flag (as a Boolean node) */
131 /* Integer list of attribute numbers retrieved by RETURNING */
133 /* set-processed flag (as a Boolean node) */
135};
136
137/*
138 * Execution state of a foreign scan using postgres_fdw.
139 */
140typedef struct PgFdwScanState
141{
142 Relation rel; /* relcache entry for the foreign table. NULL
143 * for a foreign join scan. */
144 TupleDesc tupdesc; /* tuple descriptor of scan */
145 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
146
147 /* extracted fdw_private data */
148 char *query; /* text of SELECT command */
149 List *retrieved_attrs; /* list of retrieved attribute numbers */
150
151 /* for remote query execution */
152 PGconn *conn; /* connection for the scan */
153 PgFdwConnState *conn_state; /* extra per-connection state */
154 unsigned int cursor_number; /* quasi-unique ID for my cursor */
155 bool cursor_exists; /* have we created the cursor? */
156 int numParams; /* number of parameters passed to query */
157 FmgrInfo *param_flinfo; /* output conversion functions for them */
158 List *param_exprs; /* executable expressions for param values */
159 const char **param_values; /* textual values of query parameters */
160
161 /* for storing result tuples */
162 HeapTuple *tuples; /* array of currently-retrieved tuples */
163 int num_tuples; /* # of tuples in array */
164 int next_tuple; /* index of next one to return */
165
166 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
167 int fetch_ct_2; /* Min(# of fetches done, 2) */
168 bool eof_reached; /* true if last fetch reached EOF */
169
170 /* for asynchronous execution */
171 bool async_capable; /* engage asynchronous-capable logic? */
172
173 /* working memory contexts */
174 MemoryContext batch_cxt; /* context holding current batch of tuples */
175 MemoryContext temp_cxt; /* context for per-tuple temporary data */
176
177 int fetch_size; /* number of tuples per fetch */
179
180/*
181 * Execution state of a foreign insert/update/delete operation.
182 */
183typedef struct PgFdwModifyState
184{
185 Relation rel; /* relcache entry for the foreign table */
186 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
187
188 /* for remote query execution */
189 PGconn *conn; /* connection for the scan */
190 PgFdwConnState *conn_state; /* extra per-connection state */
191 char *p_name; /* name of prepared statement, if created */
192
193 /* extracted fdw_private data */
194 char *query; /* text of INSERT/UPDATE/DELETE command */
195 char *orig_query; /* original text of INSERT command */
196 List *target_attrs; /* list of target attribute numbers */
197 int values_end; /* length up to the end of VALUES */
198 int batch_size; /* value of FDW option "batch_size" */
199 bool has_returning; /* is there a RETURNING clause? */
200 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
201
202 /* info about parameters for prepared statement */
203 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
204 int p_nums; /* number of parameters to transmit */
205 FmgrInfo *p_flinfo; /* output conversion functions for them */
206
207 /* batch operation stuff */
208 int num_slots; /* number of slots to insert */
209
210 /* working memory context */
211 MemoryContext temp_cxt; /* context for per-tuple temporary data */
212
213 /* for update row movement if subplan result rel */
214 struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
215 * created */
217
218/*
219 * Execution state of a foreign scan that modifies a foreign table directly.
220 */
222{
223 Relation rel; /* relcache entry for the foreign table */
224 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
225
226 /* extracted fdw_private data */
227 char *query; /* text of UPDATE/DELETE command */
228 bool has_returning; /* is there a RETURNING clause? */
229 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
230 bool set_processed; /* do we set the command es_processed? */
231
232 /* for remote query execution */
233 PGconn *conn; /* connection for the update */
234 PgFdwConnState *conn_state; /* extra per-connection state */
235 int numParams; /* number of parameters passed to query */
236 FmgrInfo *param_flinfo; /* output conversion functions for them */
237 List *param_exprs; /* executable expressions for param values */
238 const char **param_values; /* textual values of query parameters */
239
240 /* for storing result tuples */
241 PGresult *result; /* result for query */
242 int num_tuples; /* # of result tuples */
243 int next_tuple; /* index of next one to return */
244 Relation resultRel; /* relcache entry for the target relation */
245 AttrNumber *attnoMap; /* array of attnums of input user columns */
246 AttrNumber ctidAttno; /* attnum of input ctid column */
247 AttrNumber oidAttno; /* attnum of input oid column */
248 bool hasSystemCols; /* are there system columns of resultRel? */
249
250 /* working memory context */
251 MemoryContext temp_cxt; /* context for per-tuple temporary data */
253
254/*
255 * Workspace for analyzing a foreign table.
256 */
257typedef struct PgFdwAnalyzeState
258{
259 Relation rel; /* relcache entry for the foreign table */
260 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
261 List *retrieved_attrs; /* attr numbers retrieved by query */
262
263 /* collected sample rows */
264 HeapTuple *rows; /* array of size targrows */
265 int targrows; /* target # of sample rows */
266 int numrows; /* # of sample rows collected */
267
268 /* for random sampling */
269 double samplerows; /* # of rows fetched */
270 double rowstoskip; /* # of rows to skip before next sample */
271 ReservoirStateData rstate; /* state for reservoir sampling */
272
273 /* working memory contexts */
274 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
275 MemoryContext temp_cxt; /* context for per-tuple temporary data */
277
278/*
279 * This enum describes what's kept in the fdw_private list for a ForeignPath.
280 * We store:
281 *
282 * 1) Boolean flag showing if the remote query has the final sort
283 * 2) Boolean flag showing if the remote query has the LIMIT clause
284 */
286{
287 /* has-final-sort flag (as a Boolean node) */
289 /* has-limit flag (as a Boolean node) */
291};
292
293/* Struct for extra information passed to estimate_path_cost_size() */
303
304/*
305 * Identify the attribute where data conversion fails.
306 */
307typedef struct ConversionLocation
308{
309 AttrNumber cur_attno; /* attribute number being processed, or 0 */
310 Relation rel; /* foreign table being processed, or NULL */
311 ForeignScanState *fsstate; /* plan node being processed, or NULL */
313
314/* Callback argument for ec_member_matches_foreign */
315typedef struct
316{
317 Expr *current; /* current expr, or NULL if not yet found */
318 List *already_used; /* expressions already dealt with */
320
321/*
322 * SQL functions
323 */
325
326/*
327 * FDW callback routines
328 */
336 RelOptInfo *foreignrel,
339 List *tlist,
341 Plan *outer_plan);
342static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
347 Index rtindex,
348 RangeTblEntry *target_rte,
352 Index resultRelation,
353 int subplan_index);
355 ResultRelInfo *resultRelInfo,
356 List *fdw_private,
357 int subplan_index,
358 int eflags);
360 ResultRelInfo *resultRelInfo,
361 TupleTableSlot *slot,
362 TupleTableSlot *planSlot);
364 ResultRelInfo *resultRelInfo,
365 TupleTableSlot **slots,
367 int *numSlots);
368static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo);
370 ResultRelInfo *resultRelInfo,
371 TupleTableSlot *slot,
372 TupleTableSlot *planSlot);
374 ResultRelInfo *resultRelInfo,
375 TupleTableSlot *slot,
376 TupleTableSlot *planSlot);
377static void postgresEndForeignModify(EState *estate,
378 ResultRelInfo *resultRelInfo);
380 ResultRelInfo *resultRelInfo);
381static void postgresEndForeignInsert(EState *estate,
382 ResultRelInfo *resultRelInfo);
386 Index resultRelation,
387 int subplan_index);
388static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
392 ExplainState *es);
394 ResultRelInfo *rinfo,
395 List *fdw_private,
396 int subplan_index,
397 ExplainState *es);
399 ExplainState *es);
400static void postgresExecForeignTruncate(List *rels,
401 DropBehavior behavior,
402 bool restart_seqs);
403static bool postgresAnalyzeForeignTable(Relation relation,
407 Oid serverOid);
409 RelOptInfo *joinrel,
410 RelOptInfo *outerrel,
411 RelOptInfo *innerrel,
412 JoinType jointype,
413 JoinPathExtraData *extra);
415 TupleTableSlot *slot);
417 UpperRelationKind stage,
420 void *extra);
425
426/*
427 * Helper functions
428 */
430 RelOptInfo *foreignrel,
432 List *pathkeys,
434 double *p_rows, int *p_width,
435 int *p_disabled_nodes,
437static void get_remote_estimate(const char *sql,
438 PGconn *conn,
439 double *rows,
440 int *width,
441 Cost *startup_cost,
442 Cost *total_cost);
444 List *pathkeys,
445 double retrieved_rows,
446 double width,
447 double limit_tuples,
448 int *p_disabled_nodes,
453 void *arg);
454static void create_cursor(ForeignScanState *node);
455static void fetch_more_data(ForeignScanState *node);
456static void close_cursor(PGconn *conn, unsigned int cursor_number,
457 PgFdwConnState *conn_state);
460 ResultRelInfo *resultRelInfo,
461 CmdType operation,
462 Plan *subplan,
463 char *query,
464 List *target_attrs,
465 int values_end,
466 bool has_returning,
467 List *retrieved_attrs);
469 ResultRelInfo *resultRelInfo,
470 CmdType operation,
471 TupleTableSlot **slots,
473 int *numSlots);
477 TupleTableSlot **slots,
478 int numSlots);
480 TupleTableSlot *slot, PGresult *res);
483static List *build_remote_returning(Index rtindex, Relation rel,
484 List *returningList);
485static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
486static void execute_dml_stmt(ForeignScanState *node);
489 List *fdw_scan_tlist,
490 Index rtindex);
492 ResultRelInfo *resultRelInfo,
493 TupleTableSlot *slot,
494 EState *estate);
495static void prepare_query_params(PlanState *node,
496 List *fdw_exprs,
497 int numParams,
498 FmgrInfo **param_flinfo,
499 List **param_exprs,
500 const char ***param_values);
501static void process_query_params(ExprContext *econtext,
502 FmgrInfo *param_flinfo,
503 List *param_exprs,
504 const char **param_values);
505static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
506 HeapTuple *rows, int targrows,
507 double *totalrows,
508 double *totaldeadrows);
509static void analyze_row_processor(PGresult *res, int row,
510 PgFdwAnalyzeState *astate);
511static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
515 int row,
516 Relation rel,
517 AttInMetadata *attinmeta,
518 List *retrieved_attrs,
519 ForeignScanState *fsstate,
521static void conversion_error_callback(void *arg);
522static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
523 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
524 JoinPathExtraData *extra);
525static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
526 Node *havingQual);
528 RelOptInfo *rel);
531 Path *epq_path, List *restrictlist);
534 RelOptInfo *grouped_rel,
535 GroupPathExtraData *extra);
542 FinalPathExtraData *extra);
548static int get_batch_size_option(Relation rel);
549
550
551/*
552 * Foreign-data wrapper handler function: return a struct with pointers
553 * to my callback routines.
554 */
555Datum
557{
558 FdwRoutine *routine = makeNode(FdwRoutine);
559
560 /* Functions for scanning foreign tables */
568
569 /* Functions for updating foreign tables */
586
587 /* Function for EvalPlanQual rechecks */
589 /* Support functions for EXPLAIN */
593
594 /* Support function for TRUNCATE */
596
597 /* Support functions for ANALYZE */
599
600 /* Support functions for IMPORT FOREIGN SCHEMA */
602
603 /* Support functions for join push-down */
605
606 /* Support functions for upper relation push-down */
608
609 /* Support functions for asynchronous execution */
614
615 PG_RETURN_POINTER(routine);
616}
617
618/*
619 * postgresGetForeignRelSize
620 * Estimate # of rows and width of the result of the scan
621 *
622 * We should consider the effect of all baserestrictinfo clauses here, but
623 * not any join clauses.
624 */
625static void
629{
631 ListCell *lc;
632
633 /*
634 * We use PgFdwRelationInfo to pass various information to subsequent
635 * functions.
636 */
638 baserel->fdw_private = fpinfo;
639
640 /* Base foreign tables need to be pushed down always. */
641 fpinfo->pushdown_safe = true;
642
643 /* Look up foreign-table catalog info. */
645 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
646
647 /*
648 * Extract user-settable option values. Note that per-table settings of
649 * use_remote_estimate, fetch_size and async_capable override per-server
650 * settings of them, respectively.
651 */
652 fpinfo->use_remote_estimate = false;
653 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
654 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
655 fpinfo->shippable_extensions = NIL;
656 fpinfo->fetch_size = 100;
657 fpinfo->async_capable = false;
658
661
662 /*
663 * If the table or the server is configured to use remote estimates,
664 * identify which user to do remote access as during planning. This
665 * should match what ExecCheckPermissions() does. If we fail due to lack
666 * of permissions, the query would have failed at runtime anyway.
667 */
668 if (fpinfo->use_remote_estimate)
669 {
670 Oid userid;
671
672 userid = OidIsValid(baserel->userid) ? baserel->userid : GetUserId();
673 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
674 }
675 else
676 fpinfo->user = NULL;
677
678 /*
679 * Identify which baserestrictinfo clauses can be sent to the remote
680 * server and which can't.
681 */
682 classifyConditions(root, baserel, baserel->baserestrictinfo,
683 &fpinfo->remote_conds, &fpinfo->local_conds);
684
685 /*
686 * Identify which attributes will need to be retrieved from the remote
687 * server. These include all attrs needed for joins or final output, plus
688 * all attrs used in the local_conds. (Note: if we end up using a
689 * parameterized scan, it's possible that some of the join clauses will be
690 * sent to the remote and thus we wouldn't really need to retrieve the
691 * columns used in them. Doesn't seem worth detecting that case though.)
692 */
693 fpinfo->attrs_used = NULL;
694 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
695 &fpinfo->attrs_used);
696 foreach(lc, fpinfo->local_conds)
697 {
699
700 pull_varattnos((Node *) rinfo->clause, baserel->relid,
701 &fpinfo->attrs_used);
702 }
703
704 /*
705 * Compute the selectivity and cost of the local_conds, so we don't have
706 * to do it over again for each path. The best we can do for these
707 * conditions is to estimate selectivity on the basis of local statistics.
708 */
709 fpinfo->local_conds_sel = clauselist_selectivity(root,
710 fpinfo->local_conds,
711 baserel->relid,
713 NULL);
714
715 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
716
717 /*
718 * Set # of retrieved rows and cached relation costs to some negative
719 * value, so that we can detect when they are set to some sensible values,
720 * during one (usually the first) of the calls to estimate_path_cost_size.
721 */
722 fpinfo->retrieved_rows = -1;
723 fpinfo->rel_startup_cost = -1;
724 fpinfo->rel_total_cost = -1;
725
726 /*
727 * If the table or the server is configured to use remote estimates,
728 * connect to the foreign server and execute EXPLAIN to estimate the
729 * number of rows selected by the restriction clauses, as well as the
730 * average row width. Otherwise, estimate using whatever statistics we
731 * have locally, in a way similar to ordinary tables.
732 */
733 if (fpinfo->use_remote_estimate)
734 {
735 /*
736 * Get cost/size estimates with help of remote server. Save the
737 * values in fpinfo so we don't need to do it again to generate the
738 * basic foreign path.
739 */
741 &fpinfo->rows, &fpinfo->width,
742 &fpinfo->disabled_nodes,
743 &fpinfo->startup_cost, &fpinfo->total_cost);
744
745 /* Report estimated baserel size to planner. */
746 baserel->rows = fpinfo->rows;
747 baserel->reltarget->width = fpinfo->width;
748 }
749 else
750 {
751 /*
752 * If the foreign table has never been ANALYZEd, it will have
753 * reltuples < 0, meaning "unknown". We can't do much if we're not
754 * allowed to consult the remote server, but we can use a hack similar
755 * to plancat.c's treatment of empty relations: use a minimum size
756 * estimate of 10 pages, and divide by the column-datatype-based width
757 * estimate to get the corresponding number of tuples.
758 */
759 if (baserel->tuples < 0)
760 {
761 baserel->pages = 10;
762 baserel->tuples =
763 (10 * BLCKSZ) / (baserel->reltarget->width +
765 }
766
767 /* Estimate baserel size as best we can with local statistics. */
769
770 /* Fill in basically-bogus cost estimates for use later. */
772 &fpinfo->rows, &fpinfo->width,
773 &fpinfo->disabled_nodes,
774 &fpinfo->startup_cost, &fpinfo->total_cost);
775 }
776
777 /*
778 * fpinfo->relation_name gets the numeric rangetable index of the foreign
779 * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
780 * human-readable string at that time.)
781 */
782 fpinfo->relation_name = psprintf("%u", baserel->relid);
783
784 /* No outer and inner relations. */
785 fpinfo->make_outerrel_subquery = false;
786 fpinfo->make_innerrel_subquery = false;
787 fpinfo->lower_subquery_rels = NULL;
788 fpinfo->hidden_subquery_rels = NULL;
789 /* Set the relation index. */
790 fpinfo->relation_index = baserel->relid;
791}
792
793/*
794 * get_useful_ecs_for_relation
795 * Determine which EquivalenceClasses might be involved in useful
796 * orderings of this relation.
797 *
798 * This function is in some respects a mirror image of the core function
799 * pathkeys_useful_for_merging: for a regular table, we know what indexes
800 * we have and want to test whether any of them are useful. For a foreign
801 * table, we don't know what indexes are present on the remote side but
802 * want to speculate about which ones we'd like to use if they existed.
803 *
804 * This function returns a list of potentially-useful equivalence classes,
805 * but it does not guarantee that an EquivalenceMember exists which contains
806 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
807 * ft1.x + t1.x = 0, this function will say that the equivalence class
808 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
809 * t1 is local (or on a different server), it will turn out that no useful
810 * ORDER BY clause can be generated. It's not our job to figure that out
811 * here; we're only interested in identifying relevant ECs.
812 */
813static List *
815{
817 ListCell *lc;
818 Relids relids;
819
820 /*
821 * First, consider whether any active EC is potentially useful for a merge
822 * join against this relation.
823 */
824 if (rel->has_eclass_joins)
825 {
826 foreach(lc, root->eq_classes)
827 {
829
832 }
833 }
834
835 /*
836 * Next, consider whether there are any non-EC derivable join clauses that
837 * are merge-joinable. If the joininfo list is empty, we can exit
838 * quickly.
839 */
840 if (rel->joininfo == NIL)
841 return useful_eclass_list;
842
843 /* If this is a child rel, we must use the topmost parent rel to search. */
844 if (IS_OTHER_REL(rel))
845 {
847 relids = rel->top_parent_relids;
848 }
849 else
850 relids = rel->relids;
851
852 /* Check each join clause in turn. */
853 foreach(lc, rel->joininfo)
854 {
856
857 /* Consider only mergejoinable clauses */
858 if (restrictinfo->mergeopfamilies == NIL)
859 continue;
860
861 /* Make sure we've got canonical ECs. */
863
864 /*
865 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
866 * that left_ec and right_ec will be initialized, per comments in
867 * distribute_qual_to_rels.
868 *
869 * We want to identify which side of this merge-joinable clause
870 * contains columns from the relation produced by this RelOptInfo. We
871 * test for overlap, not containment, because there could be extra
872 * relations on either side. For example, suppose we've got something
873 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
874 * A.y = D.y. The input rel might be the joinrel between A and B, and
875 * we'll consider the join clause A.y = D.y. relids contains a
876 * relation not involved in the join class (B) and the equivalence
877 * class for the left-hand side of the clause contains a relation not
878 * involved in the input rel (C). Despite the fact that we have only
879 * overlap and not containment in either direction, A.y is potentially
880 * useful as a sort column.
881 *
882 * Note that it's even possible that relids overlaps neither side of
883 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
884 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
885 * but overlaps neither side of B. In that case, we just skip this
886 * join clause, since it doesn't suggest a useful sort order for this
887 * relation.
888 */
889 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
891 restrictinfo->right_ec);
892 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
894 restrictinfo->left_ec);
895 }
896
897 return useful_eclass_list;
898}
899
900/*
901 * get_useful_pathkeys_for_relation
902 * Determine which orderings of a relation might be useful.
903 *
904 * Getting data in sorted order can be useful either because the requested
905 * order matches the final output ordering for the overall query we're
906 * planning, or because it enables an efficient merge join. Here, we try
907 * to figure out which pathkeys to consider.
908 */
909static List *
911{
914 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
916 ListCell *lc;
917
918 /*
919 * Pushing the query_pathkeys to the remote server is always worth
920 * considering, because it might let us avoid a local sort.
921 */
922 fpinfo->qp_is_pushdown_safe = false;
923 if (root->query_pathkeys)
924 {
925 bool query_pathkeys_ok = true;
926
927 foreach(lc, root->query_pathkeys)
928 {
930
931 /*
932 * The planner and executor don't have any clever strategy for
933 * taking data sorted by a prefix of the query's pathkeys and
934 * getting it to be sorted by all of those pathkeys. We'll just
935 * end up resorting the entire data set. So, unless we can push
936 * down all of the query pathkeys, forget it.
937 */
938 if (!is_foreign_pathkey(root, rel, pathkey))
939 {
940 query_pathkeys_ok = false;
941 break;
942 }
943 }
944
946 {
947 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
948 fpinfo->qp_is_pushdown_safe = true;
949 }
950 }
951
952 /*
953 * Even if we're not using remote estimates, having the remote side do the
954 * sort generally won't be any worse than doing it locally, and it might
955 * be much better if the remote side can generate data in the right order
956 * without needing a sort at all. However, what we're going to do next is
957 * try to generate pathkeys that seem promising for possible merge joins,
958 * and that's more speculative. A wrong choice might hurt quite a bit, so
959 * bail out if we can't use remote estimates.
960 */
961 if (!fpinfo->use_remote_estimate)
963
964 /* Get the list of interesting EquivalenceClasses. */
966
967 /* Extract unique EC for query, if any, so we don't consider it again. */
968 if (list_length(root->query_pathkeys) == 1)
969 {
970 PathKey *query_pathkey = linitial(root->query_pathkeys);
971
972 query_ec = query_pathkey->pk_eclass;
973 }
974
975 /*
976 * As a heuristic, the only pathkeys we consider here are those of length
977 * one. It's surely possible to consider more, but since each one we
978 * choose to consider will generate a round-trip to the remote side, we
979 * need to be a bit cautious here. It would sure be nice to have a local
980 * cache of information about remote index definitions...
981 */
982 foreach(lc, useful_eclass_list)
983 {
986
987 /* If redundant with what we did above, skip it. */
988 if (cur_ec == query_ec)
989 continue;
990
991 /* Can't push down the sort if the EC's opfamily is not shippable. */
992 if (!is_shippable(linitial_oid(cur_ec->ec_opfamilies),
994 continue;
995
996 /* If no pushable expression for this rel, skip it. */
997 if (find_em_for_rel(root, cur_ec, rel) == NULL)
998 continue;
999
1000 /* Looks like we can generate a pathkey, so let's do it. */
1002 linitial_oid(cur_ec->ec_opfamilies),
1003 COMPARE_LT,
1004 false);
1007 }
1008
1009 return useful_pathkeys_list;
1010}
1011
1012/*
1013 * postgresGetForeignPaths
1014 * Create possible scan paths for a scan on the foreign table
1015 */
1016static void
1020{
1022 ForeignPath *path;
1023 List *ppi_list;
1024 ListCell *lc;
1025
1026 /*
1027 * Create simplest ForeignScan path node and add it to baserel. This path
1028 * corresponds to SeqScan path of regular tables (though depending on what
1029 * baserestrict conditions we were able to send to remote, there might
1030 * actually be an indexscan happening there). We already did all the work
1031 * to estimate cost and size of this path.
1032 *
1033 * Although this path uses no join clauses, it could still have required
1034 * parameterization due to LATERAL refs in its tlist.
1035 */
1037 NULL, /* default pathtarget */
1038 fpinfo->rows,
1039 fpinfo->disabled_nodes,
1040 fpinfo->startup_cost,
1041 fpinfo->total_cost,
1042 NIL, /* no pathkeys */
1043 baserel->lateral_relids,
1044 NULL, /* no extra plan */
1045 NIL, /* no fdw_restrictinfo list */
1046 NIL); /* no fdw_private list */
1047 add_path(baserel, (Path *) path);
1048
1049 /* Add paths with pathkeys */
1051
1052 /*
1053 * If we're not using remote estimates, stop here. We have no way to
1054 * estimate whether any join clauses would be worth sending across, so
1055 * don't bother building parameterized paths.
1056 */
1057 if (!fpinfo->use_remote_estimate)
1058 return;
1059
1060 /*
1061 * Thumb through all join clauses for the rel to identify which outer
1062 * relations could supply one or more safe-to-send-to-remote join clauses.
1063 * We'll build a parameterized path for each such outer relation.
1064 *
1065 * It's convenient to manage this by representing each candidate outer
1066 * relation by the ParamPathInfo node for it. We can then use the
1067 * ppi_clauses list in the ParamPathInfo node directly as a list of the
1068 * interesting join clauses for that rel. This takes care of the
1069 * possibility that there are multiple safe join clauses for such a rel,
1070 * and also ensures that we account for unsafe join clauses that we'll
1071 * still have to enforce locally (since the parameterized-path machinery
1072 * insists that we handle all movable clauses).
1073 */
1074 ppi_list = NIL;
1075 foreach(lc, baserel->joininfo)
1076 {
1077 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1080
1081 /* Check if clause can be moved to this rel */
1083 continue;
1084
1085 /* See if it is safe to send to remote */
1086 if (!is_foreign_expr(root, baserel, rinfo->clause))
1087 continue;
1088
1089 /* Calculate required outer rels for the resulting path */
1090 required_outer = bms_union(rinfo->clause_relids,
1091 baserel->lateral_relids);
1092 /* We do not want the foreign rel itself listed in required_outer */
1094
1095 /*
1096 * required_outer probably can't be empty here, but if it were, we
1097 * couldn't make a parameterized path.
1098 */
1100 continue;
1101
1102 /* Get the ParamPathInfo */
1106
1107 /*
1108 * Add it to list unless we already have it. Testing pointer equality
1109 * is OK since get_baserel_parampathinfo won't make duplicates.
1110 */
1112 }
1113
1114 /*
1115 * The above scan examined only "generic" join clauses, not those that
1116 * were absorbed into EquivalenceClauses. See if we can make anything out
1117 * of EquivalenceClauses.
1118 */
1119 if (baserel->has_eclass_joins)
1120 {
1121 /*
1122 * We repeatedly scan the eclass list looking for column references
1123 * (or expressions) belonging to the foreign rel. Each time we find
1124 * one, we generate a list of equivalence joinclauses for it, and then
1125 * see if any are safe to send to the remote. Repeat till there are
1126 * no more candidate EC members.
1127 */
1129
1131 for (;;)
1132 {
1133 List *clauses;
1134
1135 /* Make clauses, skipping any that join to lateral_referencers */
1136 arg.current = NULL;
1138 baserel,
1140 &arg,
1141 baserel->lateral_referencers);
1142
1143 /* Done if there are no more expressions in the foreign rel */
1144 if (arg.current == NULL)
1145 {
1146 Assert(clauses == NIL);
1147 break;
1148 }
1149
1150 /* Scan the extracted join clauses */
1151 foreach(lc, clauses)
1152 {
1153 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1156
1157 /* Check if clause can be moved to this rel */
1159 continue;
1160
1161 /* See if it is safe to send to remote */
1162 if (!is_foreign_expr(root, baserel, rinfo->clause))
1163 continue;
1164
1165 /* Calculate required outer rels for the resulting path */
1166 required_outer = bms_union(rinfo->clause_relids,
1167 baserel->lateral_relids);
1170 continue;
1171
1172 /* Get the ParamPathInfo */
1176
1177 /* Add it to list unless we already have it */
1179 }
1180
1181 /* Try again, now ignoring the expression we found this time */
1182 arg.already_used = lappend(arg.already_used, arg.current);
1183 }
1184 }
1185
1186 /*
1187 * Now build a path for each useful outer relation.
1188 */
1189 foreach(lc, ppi_list)
1190 {
1192 double rows;
1193 int width;
1194 int disabled_nodes;
1195 Cost startup_cost;
1196 Cost total_cost;
1197
1198 /* Get a cost estimate from the remote */
1200 param_info->ppi_clauses, NIL, NULL,
1201 &rows, &width, &disabled_nodes,
1202 &startup_cost, &total_cost);
1203
1204 /*
1205 * ppi_rows currently won't get looked at by anything, but still we
1206 * may as well ensure that it matches our idea of the rowcount.
1207 */
1208 param_info->ppi_rows = rows;
1209
1210 /* Make the path */
1212 NULL, /* default pathtarget */
1213 rows,
1214 disabled_nodes,
1215 startup_cost,
1216 total_cost,
1217 NIL, /* no pathkeys */
1218 param_info->ppi_req_outer,
1219 NULL,
1220 NIL, /* no fdw_restrictinfo list */
1221 NIL); /* no fdw_private list */
1222 add_path(baserel, (Path *) path);
1223 }
1224}
1225
1226/*
1227 * postgresGetForeignPlan
1228 * Create ForeignScan plan node which implements selected best path
1229 */
1230static ForeignScan *
1232 RelOptInfo *foreignrel,
1235 List *tlist,
1237 Plan *outer_plan)
1238{
1239 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1241 List *fdw_private;
1243 List *local_exprs = NIL;
1244 List *params_list = NIL;
1245 List *fdw_scan_tlist = NIL;
1246 List *fdw_recheck_quals = NIL;
1247 List *retrieved_attrs;
1248 StringInfoData sql;
1249 bool has_final_sort = false;
1250 bool has_limit = false;
1251 ListCell *lc;
1252
1253 /*
1254 * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1255 */
1256 if (best_path->fdw_private)
1257 {
1258 has_final_sort = boolVal(list_nth(best_path->fdw_private,
1260 has_limit = boolVal(list_nth(best_path->fdw_private,
1262 }
1263
1264 if (IS_SIMPLE_REL(foreignrel))
1265 {
1266 /*
1267 * For base relations, set scan_relid as the relid of the relation.
1268 */
1269 scan_relid = foreignrel->relid;
1270
1271 /*
1272 * In a base-relation scan, we must apply the given scan_clauses.
1273 *
1274 * Separate the scan_clauses into those that can be executed remotely
1275 * and those that can't. baserestrictinfo clauses that were
1276 * previously determined to be safe or unsafe by classifyConditions
1277 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1278 * else in the scan_clauses list will be a join clause, which we have
1279 * to check for remote-safety.
1280 *
1281 * Note: the join clauses we see here should be the exact same ones
1282 * previously examined by postgresGetForeignPaths. Possibly it'd be
1283 * worth passing forward the classification work done then, rather
1284 * than repeating it here.
1285 *
1286 * This code must match "extract_actual_clauses(scan_clauses, false)"
1287 * except for the additional decision about remote versus local
1288 * execution.
1289 */
1290 foreach(lc, scan_clauses)
1291 {
1293
1294 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1295 if (rinfo->pseudoconstant)
1296 continue;
1297
1298 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1300 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1302 else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1304 else
1306 }
1307
1308 /*
1309 * For a base-relation scan, we have to support EPQ recheck, which
1310 * should recheck all the remote quals.
1311 */
1312 fdw_recheck_quals = remote_exprs;
1313 }
1314 else
1315 {
1316 /*
1317 * Join relation or upper relation - set scan_relid to 0.
1318 */
1319 scan_relid = 0;
1320
1321 /*
1322 * For a join rel, baserestrictinfo is NIL and we are not considering
1323 * parameterization right now, so there should be no scan_clauses for
1324 * a joinrel or an upper rel either.
1325 */
1327
1328 /*
1329 * Instead we get the conditions to apply from the fdw_private
1330 * structure.
1331 */
1332 remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1333 local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1334
1335 /*
1336 * We leave fdw_recheck_quals empty in this case, since we never need
1337 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1338 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1339 * If we're planning an upperrel (ie, remote grouping or aggregation)
1340 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1341 * allowed, and indeed we *can't* put the remote clauses into
1342 * fdw_recheck_quals because the unaggregated Vars won't be available
1343 * locally.
1344 */
1345
1346 /* Build the list of columns to be fetched from the foreign server. */
1347 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1348
1349 /*
1350 * Ensure that the outer plan produces a tuple whose descriptor
1351 * matches our scan tuple slot. Also, remove the local conditions
1352 * from outer plan's quals, lest they be evaluated twice, once by the
1353 * local plan and once by the scan.
1354 */
1355 if (outer_plan)
1356 {
1357 /*
1358 * Right now, we only consider grouping and aggregation beyond
1359 * joins. Queries involving aggregates or grouping do not require
1360 * EPQ mechanism, hence should not have an outer plan here.
1361 */
1362 Assert(!IS_UPPER_REL(foreignrel));
1363
1364 /*
1365 * First, update the plan's qual list if possible. In some cases
1366 * the quals might be enforced below the topmost plan level, in
1367 * which case we'll fail to remove them; it's not worth working
1368 * harder than this.
1369 */
1370 foreach(lc, local_exprs)
1371 {
1372 Node *qual = lfirst(lc);
1373
1374 outer_plan->qual = list_delete(outer_plan->qual, qual);
1375
1376 /*
1377 * For an inner join the local conditions of foreign scan plan
1378 * can be part of the joinquals as well. (They might also be
1379 * in the mergequals or hashquals, but we can't touch those
1380 * without breaking the plan.)
1381 */
1382 if (IsA(outer_plan, NestLoop) ||
1383 IsA(outer_plan, MergeJoin) ||
1384 IsA(outer_plan, HashJoin))
1385 {
1386 Join *join_plan = (Join *) outer_plan;
1387
1388 if (join_plan->jointype == JOIN_INNER)
1389 join_plan->joinqual = list_delete(join_plan->joinqual,
1390 qual);
1391 }
1392 }
1393
1394 /*
1395 * Now fix the subplan's tlist --- this might result in inserting
1396 * a Result node atop the plan tree.
1397 */
1398 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1399 best_path->path.parallel_safe);
1400 }
1401 }
1402
1403 /*
1404 * Build the query string to be sent for execution, and identify
1405 * expressions to be sent as parameters.
1406 */
1407 initStringInfo(&sql);
1408 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1409 remote_exprs, best_path->path.pathkeys,
1410 has_final_sort, has_limit, false,
1411 &retrieved_attrs, &params_list);
1412
1413 /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1414 fpinfo->final_remote_exprs = remote_exprs;
1415
1416 /*
1417 * Build the fdw_private list that will be available to the executor.
1418 * Items in the list must match order in enum FdwScanPrivateIndex.
1419 */
1420 fdw_private = list_make3(makeString(sql.data),
1421 retrieved_attrs,
1422 makeInteger(fpinfo->fetch_size));
1423 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1424 fdw_private = lappend(fdw_private,
1425 makeString(fpinfo->relation_name));
1426
1427 /*
1428 * Create the ForeignScan node for the given relation.
1429 *
1430 * Note that the remote parameter expressions are stored in the fdw_exprs
1431 * field of the finished plan node; we can't keep them in private state
1432 * because then they wouldn't be subject to later planner processing.
1433 */
1434 return make_foreignscan(tlist,
1436 scan_relid,
1437 params_list,
1438 fdw_private,
1439 fdw_scan_tlist,
1440 fdw_recheck_quals,
1441 outer_plan);
1442}
1443
1444/*
1445 * Construct a tuple descriptor for the scan tuples handled by a foreign join.
1446 */
1447static TupleDesc
1449{
1450 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1451 EState *estate = node->ss.ps.state;
1452 TupleDesc tupdesc;
1453
1454 /*
1455 * The core code has already set up a scan tuple slot based on
1456 * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough,
1457 * but there's one case where it isn't. If we have any whole-row row
1458 * identifier Vars, they may have vartype RECORD, and we need to replace
1459 * that with the associated table's actual composite type. This ensures
1460 * that when we read those ROW() expression values from the remote server,
1461 * we can convert them to a composite type the local server knows.
1462 */
1464 for (int i = 0; i < tupdesc->natts; i++)
1465 {
1467 Var *var;
1469 Oid reltype;
1470
1471 /* Nothing to do if it's not a generic RECORD attribute */
1472 if (att->atttypid != RECORDOID || att->atttypmod >= 0)
1473 continue;
1474
1475 /*
1476 * If we can't identify the referenced table, do nothing. This'll
1477 * likely lead to failure later, but perhaps we can muddle through.
1478 */
1479 var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
1480 i)->expr;
1481 if (!IsA(var, Var) || var->varattno != 0)
1482 continue;
1483 rte = list_nth(estate->es_range_table, var->varno - 1);
1484 if (rte->rtekind != RTE_RELATION)
1485 continue;
1486 reltype = get_rel_type_id(rte->relid);
1487 if (!OidIsValid(reltype))
1488 continue;
1489 att->atttypid = reltype;
1490 /* shouldn't need to change anything else */
1491 }
1492 return tupdesc;
1493}
1494
1495/*
1496 * postgresBeginForeignScan
1497 * Initiate an executor scan of a foreign PostgreSQL table.
1498 */
1499static void
1501{
1502 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1503 EState *estate = node->ss.ps.state;
1504 PgFdwScanState *fsstate;
1506 Oid userid;
1509 int rtindex;
1510 int numParams;
1511
1512 /*
1513 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1514 */
1515 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1516 return;
1517
1518 /*
1519 * We'll save private state in node->fdw_state.
1520 */
1521 fsstate = palloc0_object(PgFdwScanState);
1522 node->fdw_state = fsstate;
1523
1524 /*
1525 * Identify which user to do the remote access as. This should match what
1526 * ExecCheckPermissions() does.
1527 */
1528 userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
1529 if (fsplan->scan.scanrelid > 0)
1530 rtindex = fsplan->scan.scanrelid;
1531 else
1532 rtindex = bms_next_member(fsplan->fs_base_relids, -1);
1533 rte = exec_rt_fetch(rtindex, estate);
1534
1535 /* Get info about foreign table. */
1536 table = GetForeignTable(rte->relid);
1537 user = GetUserMapping(userid, table->serverid);
1538
1539 /*
1540 * Get connection to the foreign server. Connection manager will
1541 * establish new connection if necessary.
1542 */
1543 fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
1544
1545 /* Assign a unique ID for my cursor */
1546 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1547 fsstate->cursor_exists = false;
1548
1549 /* Get private info created by planner functions. */
1550 fsstate->query = strVal(list_nth(fsplan->fdw_private,
1552 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1554 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1556
1557 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1558 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1559 "postgres_fdw tuple data",
1561 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1562 "postgres_fdw temporary data",
1564
1565 /*
1566 * Get info we'll need for converting data fetched from the foreign server
1567 * into local representation and error reporting during that process.
1568 */
1569 if (fsplan->scan.scanrelid > 0)
1570 {
1571 fsstate->rel = node->ss.ss_currentRelation;
1572 fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1573 }
1574 else
1575 {
1576 fsstate->rel = NULL;
1578 }
1579
1580 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1581
1582 /*
1583 * Prepare for processing of parameters used in remote query, if any.
1584 */
1585 numParams = list_length(fsplan->fdw_exprs);
1586 fsstate->numParams = numParams;
1587 if (numParams > 0)
1589 fsplan->fdw_exprs,
1590 numParams,
1591 &fsstate->param_flinfo,
1592 &fsstate->param_exprs,
1593 &fsstate->param_values);
1594
1595 /* Set the async-capable flag */
1596 fsstate->async_capable = node->ss.ps.async_capable;
1597}
1598
1599/*
1600 * postgresIterateForeignScan
1601 * Retrieve next row from the result set, or clear tuple slot to indicate
1602 * EOF.
1603 */
1604static TupleTableSlot *
1606{
1607 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1608 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1609
1610 /*
1611 * In sync mode, if this is the first call after Begin or ReScan, we need
1612 * to create the cursor on the remote side. In async mode, we would have
1613 * already created the cursor before we get here, even if this is the
1614 * first call after Begin or ReScan.
1615 */
1616 if (!fsstate->cursor_exists)
1617 create_cursor(node);
1618
1619 /*
1620 * Get some more tuples, if we've run out.
1621 */
1622 if (fsstate->next_tuple >= fsstate->num_tuples)
1623 {
1624 /* In async mode, just clear tuple slot. */
1625 if (fsstate->async_capable)
1626 return ExecClearTuple(slot);
1627 /* No point in another fetch if we already detected EOF, though. */
1628 if (!fsstate->eof_reached)
1629 fetch_more_data(node);
1630 /* If we didn't get any tuples, must be end of data. */
1631 if (fsstate->next_tuple >= fsstate->num_tuples)
1632 return ExecClearTuple(slot);
1633 }
1634
1635 /*
1636 * Return the next tuple.
1637 */
1638 ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1639 slot,
1640 false);
1641
1642 return slot;
1643}
1644
1645/*
1646 * postgresReScanForeignScan
1647 * Restart the scan.
1648 */
1649static void
1651{
1652 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1653 char sql[64];
1654 PGresult *res;
1655
1656 /* If we haven't created the cursor yet, nothing to do. */
1657 if (!fsstate->cursor_exists)
1658 return;
1659
1660 /*
1661 * If the node is async-capable, and an asynchronous fetch for it has
1662 * begun, the asynchronous fetch might not have yet completed. Check if
1663 * the node is async-capable, and an asynchronous fetch for it is still in
1664 * progress; if so, complete the asynchronous fetch before restarting the
1665 * scan.
1666 */
1667 if (fsstate->async_capable &&
1668 fsstate->conn_state->pendingAreq &&
1669 fsstate->conn_state->pendingAreq->requestee == (PlanState *) node)
1670 fetch_more_data(node);
1671
1672 /*
1673 * If any internal parameters affecting this node have changed, we'd
1674 * better destroy and recreate the cursor. Otherwise, if the remote
1675 * server is v14 or older, rewinding it should be good enough; if not,
1676 * rewind is only allowed for scrollable cursors, but we don't have a way
1677 * to check the scrollability of it, so destroy and recreate it in any
1678 * case. If we've only fetched zero or one batch, we needn't even rewind
1679 * the cursor, just rescan what we have.
1680 */
1681 if (node->ss.ps.chgParam != NULL)
1682 {
1683 fsstate->cursor_exists = false;
1684 snprintf(sql, sizeof(sql), "CLOSE c%u",
1685 fsstate->cursor_number);
1686 }
1687 else if (fsstate->fetch_ct_2 > 1)
1688 {
1689 if (PQserverVersion(fsstate->conn) < 150000)
1690 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1691 fsstate->cursor_number);
1692 else
1693 {
1694 fsstate->cursor_exists = false;
1695 snprintf(sql, sizeof(sql), "CLOSE c%u",
1696 fsstate->cursor_number);
1697 }
1698 }
1699 else
1700 {
1701 /* Easy: just rescan what we already have in memory, if anything */
1702 fsstate->next_tuple = 0;
1703 return;
1704 }
1705
1706 res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
1707 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1708 pgfdw_report_error(res, fsstate->conn, sql);
1709 PQclear(res);
1710
1711 /* Now force a fresh FETCH. */
1712 fsstate->tuples = NULL;
1713 fsstate->num_tuples = 0;
1714 fsstate->next_tuple = 0;
1715 fsstate->fetch_ct_2 = 0;
1716 fsstate->eof_reached = false;
1717}
1718
1719/*
1720 * postgresEndForeignScan
1721 * Finish scanning foreign table and dispose objects used for this scan
1722 */
1723static void
1725{
1726 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1727
1728 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1729 if (fsstate == NULL)
1730 return;
1731
1732 /* Close the cursor if open, to prevent accumulation of cursors */
1733 if (fsstate->cursor_exists)
1734 close_cursor(fsstate->conn, fsstate->cursor_number,
1735 fsstate->conn_state);
1736
1737 /* Release remote connection */
1738 ReleaseConnection(fsstate->conn);
1739 fsstate->conn = NULL;
1740
1741 /* MemoryContexts will be deleted automatically. */
1742}
1743
1744/*
1745 * postgresAddForeignUpdateTargets
1746 * Add resjunk column(s) needed for update/delete on a foreign table
1747 */
1748static void
1750 Index rtindex,
1751 RangeTblEntry *target_rte,
1753{
1754 Var *var;
1755
1756 /*
1757 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1758 */
1759
1760 /* Make a Var representing the desired value */
1761 var = makeVar(rtindex,
1763 TIDOID,
1764 -1,
1765 InvalidOid,
1766 0);
1767
1768 /* Register it as a row-identity column needed by this target rel */
1769 add_row_identity_var(root, var, rtindex, "ctid");
1770}
1771
1772/*
1773 * postgresPlanForeignModify
1774 * Plan an insert/update/delete operation on a foreign table
1775 */
1776static List *
1779 Index resultRelation,
1780 int subplan_index)
1781{
1782 CmdType operation = plan->operation;
1783 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1784 Relation rel;
1785 StringInfoData sql;
1786 List *targetAttrs = NIL;
1788 List *returningList = NIL;
1789 List *retrieved_attrs = NIL;
1790 bool doNothing = false;
1791 int values_end_len = -1;
1792
1793 initStringInfo(&sql);
1794
1795 /*
1796 * Core code already has some lock on each rel being planned, so we can
1797 * use NoLock here.
1798 */
1799 rel = table_open(rte->relid, NoLock);
1800
1801 /*
1802 * In an INSERT, we transmit all columns that are defined in the foreign
1803 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1804 * foreign table, we transmit all columns like INSERT; else we transmit
1805 * only columns that were explicitly targets of the UPDATE, so as to avoid
1806 * unnecessary data transmission. (We can't do that for INSERT since we
1807 * would miss sending default values for columns not listed in the source
1808 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1809 * those triggers might change values for non-target columns, in which
1810 * case we would miss sending changed values for those columns.)
1811 */
1812 if (operation == CMD_INSERT ||
1813 (operation == CMD_UPDATE &&
1814 rel->trigdesc &&
1816 {
1817 TupleDesc tupdesc = RelationGetDescr(rel);
1818 int attnum;
1819
1820 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1821 {
1822 CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1);
1823
1824 if (!attr->attisdropped)
1826 }
1827 }
1828 else if (operation == CMD_UPDATE)
1829 {
1830 int col;
1831 RelOptInfo *rel = find_base_rel(root, resultRelation);
1833
1834 col = -1;
1835 while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1836 {
1837 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1839
1840 if (attno <= InvalidAttrNumber) /* shouldn't happen */
1841 elog(ERROR, "system-column update is not supported");
1843 }
1844 }
1845
1846 /*
1847 * Extract the relevant WITH CHECK OPTION list if any.
1848 */
1849 if (plan->withCheckOptionLists)
1850 withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1852
1853 /*
1854 * Extract the relevant RETURNING list if any.
1855 */
1856 if (plan->returningLists)
1857 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1858
1859 /*
1860 * ON CONFLICT DO NOTHING/SELECT/UPDATE with inference specification
1861 * should have already been rejected in the optimizer, as presently there
1862 * is no way to recognize an arbiter index on a foreign table. Only DO
1863 * NOTHING is supported without an inference specification.
1864 */
1865 if (plan->onConflictAction == ONCONFLICT_NOTHING)
1866 doNothing = true;
1867 else if (plan->onConflictAction != ONCONFLICT_NONE)
1868 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1869 (int) plan->onConflictAction);
1870
1871 /*
1872 * Construct the SQL command string.
1873 */
1874 switch (operation)
1875 {
1876 case CMD_INSERT:
1877 deparseInsertSql(&sql, rte, resultRelation, rel,
1879 withCheckOptionList, returningList,
1880 &retrieved_attrs, &values_end_len);
1881 break;
1882 case CMD_UPDATE:
1883 deparseUpdateSql(&sql, rte, resultRelation, rel,
1885 withCheckOptionList, returningList,
1886 &retrieved_attrs);
1887 break;
1888 case CMD_DELETE:
1889 deparseDeleteSql(&sql, rte, resultRelation, rel,
1890 returningList,
1891 &retrieved_attrs);
1892 break;
1893 default:
1894 elog(ERROR, "unexpected operation: %d", (int) operation);
1895 break;
1896 }
1897
1898 table_close(rel, NoLock);
1899
1900 /*
1901 * Build the fdw_private list that will be available to the executor.
1902 * Items in the list must match enum FdwModifyPrivateIndex, above.
1903 */
1904 return list_make5(makeString(sql.data),
1907 makeBoolean((retrieved_attrs != NIL)),
1908 retrieved_attrs);
1909}
1910
1911/*
1912 * postgresBeginForeignModify
1913 * Begin an insert/update/delete operation on a foreign table
1914 */
1915static void
1917 ResultRelInfo *resultRelInfo,
1918 List *fdw_private,
1919 int subplan_index,
1920 int eflags)
1921{
1923 char *query;
1924 List *target_attrs;
1925 bool has_returning;
1926 int values_end_len;
1927 List *retrieved_attrs;
1929
1930 /*
1931 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1932 * stays NULL.
1933 */
1934 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1935 return;
1936
1937 /* Deconstruct fdw_private data. */
1938 query = strVal(list_nth(fdw_private,
1940 target_attrs = (List *) list_nth(fdw_private,
1942 values_end_len = intVal(list_nth(fdw_private,
1944 has_returning = boolVal(list_nth(fdw_private,
1946 retrieved_attrs = (List *) list_nth(fdw_private,
1948
1949 /* Find RTE. */
1950 rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1951 mtstate->ps.state);
1952
1953 /* Construct an execution state. */
1955 rte,
1956 resultRelInfo,
1957 mtstate->operation,
1958 outerPlanState(mtstate)->plan,
1959 query,
1960 target_attrs,
1962 has_returning,
1963 retrieved_attrs);
1964
1965 resultRelInfo->ri_FdwState = fmstate;
1966}
1967
1968/*
1969 * postgresExecForeignInsert
1970 * Insert one row into a foreign table
1971 */
1972static TupleTableSlot *
1974 ResultRelInfo *resultRelInfo,
1975 TupleTableSlot *slot,
1976 TupleTableSlot *planSlot)
1977{
1980 int numSlots = 1;
1981
1982 /*
1983 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1984 * postgresBeginForeignInsert())
1985 */
1986 if (fmstate->aux_fmstate)
1987 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1988 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1989 &slot, &planSlot, &numSlots);
1990 /* Revert that change */
1991 if (fmstate->aux_fmstate)
1992 resultRelInfo->ri_FdwState = fmstate;
1993
1994 return rslot ? *rslot : NULL;
1995}
1996
1997/*
1998 * postgresExecForeignBatchInsert
1999 * Insert multiple rows into a foreign table
2000 */
2001static TupleTableSlot **
2003 ResultRelInfo *resultRelInfo,
2004 TupleTableSlot **slots,
2006 int *numSlots)
2007{
2010
2011 /*
2012 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
2013 * postgresBeginForeignInsert())
2014 */
2015 if (fmstate->aux_fmstate)
2016 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
2017 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
2018 slots, planSlots, numSlots);
2019 /* Revert that change */
2020 if (fmstate->aux_fmstate)
2021 resultRelInfo->ri_FdwState = fmstate;
2022
2023 return rslot;
2024}
2025
2026/*
2027 * postgresGetForeignModifyBatchSize
2028 * Determine the maximum number of tuples that can be inserted in bulk
2029 *
2030 * Returns the batch size specified for server or table. When batching is not
2031 * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING
2032 * clause), returns 1.
2033 */
2034static int
2036{
2037 int batch_size;
2039
2040 /* should be called only once */
2041 Assert(resultRelInfo->ri_BatchSize == 0);
2042
2043 /*
2044 * Should never get called when the insert is being performed on a table
2045 * that is also among the target relations of an UPDATE operation, because
2046 * postgresBeginForeignInsert() currently rejects such insert attempts.
2047 */
2048 Assert(fmstate == NULL || fmstate->aux_fmstate == NULL);
2049
2050 /*
2051 * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup
2052 * the option directly in server/table options. Otherwise just use the
2053 * value we determined earlier.
2054 */
2055 if (fmstate)
2056 batch_size = fmstate->batch_size;
2057 else
2058 batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
2059
2060 /*
2061 * Disable batching when we have to use RETURNING, there are any
2062 * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any
2063 * WITH CHECK OPTION constraints from parent views.
2064 *
2065 * When there are any BEFORE ROW INSERT triggers on the table, we can't
2066 * support it, because such triggers might query the table we're inserting
2067 * into and act differently if the tuples that have already been processed
2068 * and prepared for insertion are not there.
2069 */
2070 if (resultRelInfo->ri_projectReturning != NULL ||
2071 resultRelInfo->ri_WithCheckOptions != NIL ||
2072 (resultRelInfo->ri_TrigDesc &&
2073 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2074 resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
2075 return 1;
2076
2077 /*
2078 * If the foreign table has no columns, disable batching as the INSERT
2079 * syntax doesn't allow batching multiple empty rows into a zero-column
2080 * table in a single statement. This is needed for COPY FROM, in which
2081 * case fmstate must be non-NULL.
2082 */
2083 if (fmstate && list_length(fmstate->target_attrs) == 0)
2084 return 1;
2085
2086 /*
2087 * Otherwise use the batch size specified for server/table. The number of
2088 * parameters in a batch is limited to 65535 (uint16), so make sure we
2089 * don't exceed this limit by using the maximum batch_size possible.
2090 */
2091 if (fmstate && fmstate->p_nums > 0)
2092 batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums);
2093
2094 return batch_size;
2095}
2096
2097/*
2098 * postgresExecForeignUpdate
2099 * Update one row in a foreign table
2100 */
2101static TupleTableSlot *
2103 ResultRelInfo *resultRelInfo,
2104 TupleTableSlot *slot,
2105 TupleTableSlot *planSlot)
2106{
2108 int numSlots = 1;
2109
2110 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
2111 &slot, &planSlot, &numSlots);
2112
2113 return rslot ? rslot[0] : NULL;
2114}
2115
2116/*
2117 * postgresExecForeignDelete
2118 * Delete one row from a foreign table
2119 */
2120static TupleTableSlot *
2122 ResultRelInfo *resultRelInfo,
2123 TupleTableSlot *slot,
2124 TupleTableSlot *planSlot)
2125{
2127 int numSlots = 1;
2128
2129 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
2130 &slot, &planSlot, &numSlots);
2131
2132 return rslot ? rslot[0] : NULL;
2133}
2134
2135/*
2136 * postgresEndForeignModify
2137 * Finish an insert/update/delete operation on a foreign table
2138 */
2139static void
2141 ResultRelInfo *resultRelInfo)
2142{
2144
2145 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2146 if (fmstate == NULL)
2147 return;
2148
2149 /* Destroy the execution state */
2151}
2152
2153/*
2154 * postgresBeginForeignInsert
2155 * Begin an insert operation on a foreign table
2156 */
2157static void
2159 ResultRelInfo *resultRelInfo)
2160{
2163 EState *estate = mtstate->ps.state;
2164 Index resultRelation;
2165 Relation rel = resultRelInfo->ri_RelationDesc;
2167 TupleDesc tupdesc = RelationGetDescr(rel);
2168 int attnum;
2169 int values_end_len;
2170 StringInfoData sql;
2171 List *targetAttrs = NIL;
2172 List *retrieved_attrs = NIL;
2173 bool doNothing = false;
2174
2175 /*
2176 * If the foreign table we are about to insert routed rows into is also an
2177 * UPDATE subplan result rel that will be updated later, proceeding with
2178 * the INSERT will result in the later UPDATE incorrectly modifying those
2179 * routed rows, so prevent the INSERT --- it would be nice if we could
2180 * handle this case; but for now, throw an error for safety.
2181 */
2182 if (plan && plan->operation == CMD_UPDATE &&
2183 (resultRelInfo->ri_usesFdwDirectModify ||
2184 resultRelInfo->ri_FdwState))
2185 ereport(ERROR,
2187 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2189
2190 initStringInfo(&sql);
2191
2192 /* We transmit all columns that are defined in the foreign table. */
2193 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2194 {
2195 CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1);
2196
2197 if (!attr->attisdropped)
2199 }
2200
2201 /* Check if we add the ON CONFLICT clause to the remote query. */
2202 if (plan)
2203 {
2204 OnConflictAction onConflictAction = plan->onConflictAction;
2205
2206 /* We only support DO NOTHING without an inference specification. */
2207 if (onConflictAction == ONCONFLICT_NOTHING)
2208 doNothing = true;
2209 else if (onConflictAction != ONCONFLICT_NONE)
2210 elog(ERROR, "unexpected ON CONFLICT specification: %d",
2211 (int) onConflictAction);
2212 }
2213
2214 /*
2215 * If the foreign table is a partition that doesn't have a corresponding
2216 * RTE entry, we need to create a new RTE describing the foreign table for
2217 * use by deparseInsertSql and create_foreign_modify() below, after first
2218 * copying the parent's RTE and modifying some fields to describe the
2219 * foreign partition to work on. However, if this is invoked by UPDATE,
2220 * the existing RTE may already correspond to this partition if it is one
2221 * of the UPDATE subplan target rels; in that case, we can just use the
2222 * existing RTE as-is.
2223 */
2224 if (resultRelInfo->ri_RangeTableIndex == 0)
2225 {
2226 ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
2227
2228 rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
2229 rte = copyObject(rte);
2230 rte->relid = RelationGetRelid(rel);
2231 rte->relkind = RELKIND_FOREIGN_TABLE;
2232
2233 /*
2234 * For UPDATE, we must use the RT index of the first subplan target
2235 * rel's RTE, because the core code would have built expressions for
2236 * the partition, such as RETURNING, using that RT index as varno of
2237 * Vars contained in those expressions.
2238 */
2239 if (plan && plan->operation == CMD_UPDATE &&
2240 rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
2241 resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2242 else
2243 resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2244 }
2245 else
2246 {
2247 resultRelation = resultRelInfo->ri_RangeTableIndex;
2248 rte = exec_rt_fetch(resultRelation, estate);
2249 }
2250
2251 /* Construct the SQL command string. */
2252 deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2253 resultRelInfo->ri_WithCheckOptions,
2254 resultRelInfo->ri_returningList,
2255 &retrieved_attrs, &values_end_len);
2256
2257 /* Construct an execution state. */
2259 rte,
2260 resultRelInfo,
2261 CMD_INSERT,
2262 NULL,
2263 sql.data,
2266 retrieved_attrs != NIL,
2267 retrieved_attrs);
2268
2269 /*
2270 * If the given resultRelInfo already has PgFdwModifyState set, it means
2271 * the foreign table is an UPDATE subplan result rel; in which case, store
2272 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2273 */
2274 if (resultRelInfo->ri_FdwState)
2275 {
2276 Assert(plan && plan->operation == CMD_UPDATE);
2277 Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2278 ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2279 }
2280 else
2281 resultRelInfo->ri_FdwState = fmstate;
2282}
2283
2284/*
2285 * postgresEndForeignInsert
2286 * Finish an insert operation on a foreign table
2287 */
2288static void
2290 ResultRelInfo *resultRelInfo)
2291{
2293
2294 Assert(fmstate != NULL);
2295
2296 /*
2297 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2298 * postgresBeginForeignInsert())
2299 */
2300 if (fmstate->aux_fmstate)
2301 fmstate = fmstate->aux_fmstate;
2302
2303 /* Destroy the execution state */
2305}
2306
2307/*
2308 * postgresIsForeignRelUpdatable
2309 * Determine whether a foreign table supports INSERT, UPDATE and/or
2310 * DELETE.
2311 */
2312static int
2314{
2315 bool updatable;
2317 ForeignServer *server;
2318 ListCell *lc;
2319
2320 /*
2321 * By default, all postgres_fdw foreign tables are assumed updatable. This
2322 * can be overridden by a per-server setting, which in turn can be
2323 * overridden by a per-table setting.
2324 */
2325 updatable = true;
2326
2328 server = GetForeignServer(table->serverid);
2329
2330 foreach(lc, server->options)
2331 {
2332 DefElem *def = (DefElem *) lfirst(lc);
2333
2334 if (strcmp(def->defname, "updatable") == 0)
2335 updatable = defGetBoolean(def);
2336 }
2337 foreach(lc, table->options)
2338 {
2339 DefElem *def = (DefElem *) lfirst(lc);
2340
2341 if (strcmp(def->defname, "updatable") == 0)
2342 updatable = defGetBoolean(def);
2343 }
2344
2345 /*
2346 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2347 */
2348 return updatable ?
2349 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2350}
2351
2352/*
2353 * postgresRecheckForeignScan
2354 * Execute a local join execution plan for a foreign join
2355 */
2356static bool
2358{
2359 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2361 TupleTableSlot *result;
2362
2363 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2364 if (scanrelid > 0)
2365 return true;
2366
2367 Assert(outerPlan != NULL);
2368
2369 /* Execute a local join execution plan */
2370 result = ExecProcNode(outerPlan);
2371 if (TupIsNull(result))
2372 return false;
2373
2374 /* Store result in the given slot */
2375 ExecCopySlot(slot, result);
2376
2377 return true;
2378}
2379
2380/*
2381 * find_modifytable_subplan
2382 * Helper routine for postgresPlanDirectModify to find the
2383 * ModifyTable subplan node that scans the specified RTI.
2384 *
2385 * Returns NULL if the subplan couldn't be identified. That's not a fatal
2386 * error condition, we just abandon trying to do the update directly.
2387 */
2388static ForeignScan *
2391 Index rtindex,
2392 int subplan_index)
2393{
2394 Plan *subplan = outerPlan(plan);
2395
2396 /*
2397 * The cases we support are (1) the desired ForeignScan is the immediate
2398 * child of ModifyTable, or (2) it is the subplan_index'th child of an
2399 * Append node that is the immediate child of ModifyTable. There is no
2400 * point in looking further down, as that would mean that local joins are
2401 * involved, so we can't do the update directly.
2402 *
2403 * There could be a Result atop the Append too, acting to compute the
2404 * UPDATE targetlist values. We ignore that here; the tlist will be
2405 * checked by our caller.
2406 *
2407 * In principle we could examine all the children of the Append, but it's
2408 * currently unlikely that the core planner would generate such a plan
2409 * with the children out-of-order. Moreover, such a search risks costing
2410 * O(N^2) time when there are a lot of children.
2411 */
2412 if (IsA(subplan, Append))
2413 {
2414 Append *appendplan = (Append *) subplan;
2415
2416 if (subplan_index < list_length(appendplan->appendplans))
2417 subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2418 }
2419 else if (IsA(subplan, Result) &&
2420 outerPlan(subplan) != NULL &&
2421 IsA(outerPlan(subplan), Append))
2422 {
2423 Append *appendplan = (Append *) outerPlan(subplan);
2424
2425 if (subplan_index < list_length(appendplan->appendplans))
2426 subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2427 }
2428
2429 /* Now, have we got a ForeignScan on the desired rel? */
2430 if (IsA(subplan, ForeignScan))
2431 {
2432 ForeignScan *fscan = (ForeignScan *) subplan;
2433
2434 if (bms_is_member(rtindex, fscan->fs_base_relids))
2435 return fscan;
2436 }
2437
2438 return NULL;
2439}
2440
2441/*
2442 * postgresPlanDirectModify
2443 * Consider a direct foreign table modification
2444 *
2445 * Decide whether it is safe to modify a foreign table directly, and if so,
2446 * rewrite subplan accordingly.
2447 */
2448static bool
2451 Index resultRelation,
2452 int subplan_index)
2453{
2454 CmdType operation = plan->operation;
2455 RelOptInfo *foreignrel;
2458 Relation rel;
2459 StringInfoData sql;
2461 List *processed_tlist = NIL;
2462 List *targetAttrs = NIL;
2464 List *params_list = NIL;
2465 List *returningList = NIL;
2466 List *retrieved_attrs = NIL;
2467
2468 /*
2469 * Decide whether it is safe to modify a foreign table directly.
2470 */
2471
2472 /*
2473 * The table modification must be an UPDATE or DELETE.
2474 */
2475 if (operation != CMD_UPDATE && operation != CMD_DELETE)
2476 return false;
2477
2478 /*
2479 * Try to locate the ForeignScan subplan that's scanning resultRelation.
2480 */
2482 if (!fscan)
2483 return false;
2484
2485 /*
2486 * It's unsafe to modify a foreign table directly if there are any quals
2487 * that should be evaluated locally.
2488 */
2489 if (fscan->scan.plan.qual != NIL)
2490 return false;
2491
2492 /* Safe to fetch data about the target foreign rel */
2493 if (fscan->scan.scanrelid == 0)
2494 {
2495 foreignrel = find_join_rel(root, fscan->fs_relids);
2496 /* We should have a rel for this foreign join. */
2497 Assert(foreignrel);
2498 }
2499 else
2500 foreignrel = root->simple_rel_array[resultRelation];
2501 rte = root->simple_rte_array[resultRelation];
2502 fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2503
2504 /*
2505 * It's unsafe to update a foreign table directly, if any expressions to
2506 * assign to the target columns are unsafe to evaluate remotely.
2507 */
2508 if (operation == CMD_UPDATE)
2509 {
2510 ListCell *lc,
2511 *lc2;
2512
2513 /*
2514 * The expressions of concern are the first N columns of the processed
2515 * targetlist, where N is the length of the rel's update_colnos.
2516 */
2517 get_translated_update_targetlist(root, resultRelation,
2518 &processed_tlist, &targetAttrs);
2519 forboth(lc, processed_tlist, lc2, targetAttrs)
2520 {
2522 AttrNumber attno = lfirst_int(lc2);
2523
2524 /* update's new-value expressions shouldn't be resjunk */
2525 Assert(!tle->resjunk);
2526
2527 if (attno <= InvalidAttrNumber) /* shouldn't happen */
2528 elog(ERROR, "system-column update is not supported");
2529
2530 if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2531 return false;
2532 }
2533 }
2534
2535 /*
2536 * Ok, rewrite subplan so as to modify the foreign table directly.
2537 */
2538 initStringInfo(&sql);
2539
2540 /*
2541 * Core code already has some lock on each rel being planned, so we can
2542 * use NoLock here.
2543 */
2544 rel = table_open(rte->relid, NoLock);
2545
2546 /*
2547 * Recall the qual clauses that must be evaluated remotely. (These are
2548 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2549 * doesn't care.)
2550 */
2551 remote_exprs = fpinfo->final_remote_exprs;
2552
2553 /*
2554 * Extract the relevant RETURNING list if any.
2555 */
2556 if (plan->returningLists)
2557 {
2558 returningList = (List *) list_nth(plan->returningLists, subplan_index);
2559
2560 /*
2561 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2562 * we fetch from the foreign server any Vars specified in RETURNING
2563 * that refer not only to the target relation but to non-target
2564 * relations. So we'll deparse them into the RETURNING clause of the
2565 * remote query; use a targetlist consisting of them instead, which
2566 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2567 * node below.
2568 */
2569 if (fscan->scan.scanrelid == 0)
2570 returningList = build_remote_returning(resultRelation, rel,
2571 returningList);
2572 }
2573
2574 /*
2575 * Construct the SQL command string.
2576 */
2577 switch (operation)
2578 {
2579 case CMD_UPDATE:
2580 deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2581 foreignrel,
2582 processed_tlist,
2584 remote_exprs, &params_list,
2585 returningList, &retrieved_attrs);
2586 break;
2587 case CMD_DELETE:
2588 deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2589 foreignrel,
2590 remote_exprs, &params_list,
2591 returningList, &retrieved_attrs);
2592 break;
2593 default:
2594 elog(ERROR, "unexpected operation: %d", (int) operation);
2595 break;
2596 }
2597
2598 /*
2599 * Update the operation and target relation info.
2600 */
2601 fscan->operation = operation;
2602 fscan->resultRelation = resultRelation;
2603
2604 /*
2605 * Update the fdw_exprs list that will be available to the executor.
2606 */
2607 fscan->fdw_exprs = params_list;
2608
2609 /*
2610 * Update the fdw_private list that will be available to the executor.
2611 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2612 */
2613 fscan->fdw_private = list_make4(makeString(sql.data),
2614 makeBoolean((retrieved_attrs != NIL)),
2615 retrieved_attrs,
2616 makeBoolean(plan->canSetTag));
2617
2618 /*
2619 * Update the foreign-join-related fields.
2620 */
2621 if (fscan->scan.scanrelid == 0)
2622 {
2623 /* No need for the outer subplan. */
2624 fscan->scan.plan.lefttree = NULL;
2625
2626 /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2627 if (returningList)
2628 rebuild_fdw_scan_tlist(fscan, returningList);
2629 }
2630
2631 /*
2632 * Finally, unset the async-capable flag if it is set, as we currently
2633 * don't support asynchronous execution of direct modifications.
2634 */
2635 if (fscan->scan.plan.async_capable)
2636 fscan->scan.plan.async_capable = false;
2637
2638 table_close(rel, NoLock);
2639 return true;
2640}
2641
2642/*
2643 * postgresBeginDirectModify
2644 * Prepare a direct foreign table modification
2645 */
2646static void
2648{
2649 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2650 EState *estate = node->ss.ps.state;
2652 Index rtindex;
2653 Oid userid;
2656 int numParams;
2657
2658 /*
2659 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2660 */
2661 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2662 return;
2663
2664 /*
2665 * We'll save private state in node->fdw_state.
2666 */
2668 node->fdw_state = dmstate;
2669
2670 /*
2671 * Identify which user to do the remote access as. This should match what
2672 * ExecCheckPermissions() does.
2673 */
2674 userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
2675
2676 /* Get info about foreign table. */
2677 rtindex = node->resultRelInfo->ri_RangeTableIndex;
2678 if (fsplan->scan.scanrelid == 0)
2679 dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2680 else
2681 dmstate->rel = node->ss.ss_currentRelation;
2683 user = GetUserMapping(userid, table->serverid);
2684
2685 /*
2686 * Get connection to the foreign server. Connection manager will
2687 * establish new connection if necessary.
2688 */
2689 dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
2690
2691 /* Update the foreign-join-related fields. */
2692 if (fsplan->scan.scanrelid == 0)
2693 {
2694 /* Save info about foreign table. */
2695 dmstate->resultRel = dmstate->rel;
2696
2697 /*
2698 * Set dmstate->rel to NULL to teach get_returning_data() and
2699 * make_tuple_from_result_row() that columns fetched from the remote
2700 * server are described by fdw_scan_tlist of the foreign-scan plan
2701 * node, not the tuple descriptor for the target relation.
2702 */
2703 dmstate->rel = NULL;
2704 }
2705
2706 /* Initialize state variable */
2707 dmstate->num_tuples = -1; /* -1 means not set yet */
2708
2709 /* Get private info created by planner functions. */
2710 dmstate->query = strVal(list_nth(fsplan->fdw_private,
2712 dmstate->has_returning = boolVal(list_nth(fsplan->fdw_private,
2714 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2716 dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private,
2718
2719 /* Create context for per-tuple temp workspace. */
2720 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2721 "postgres_fdw temporary data",
2723
2724 /* Prepare for input conversion of RETURNING results. */
2725 if (dmstate->has_returning)
2726 {
2727 TupleDesc tupdesc;
2728
2729 if (fsplan->scan.scanrelid == 0)
2730 tupdesc = get_tupdesc_for_join_scan_tuples(node);
2731 else
2732 tupdesc = RelationGetDescr(dmstate->rel);
2733
2734 dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2735
2736 /*
2737 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2738 * initialize a filter to extract an updated/deleted tuple from a scan
2739 * tuple.
2740 */
2741 if (fsplan->scan.scanrelid == 0)
2742 init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2743 }
2744
2745 /*
2746 * Prepare for processing of parameters used in remote query, if any.
2747 */
2748 numParams = list_length(fsplan->fdw_exprs);
2749 dmstate->numParams = numParams;
2750 if (numParams > 0)
2752 fsplan->fdw_exprs,
2753 numParams,
2754 &dmstate->param_flinfo,
2755 &dmstate->param_exprs,
2756 &dmstate->param_values);
2757}
2758
2759/*
2760 * postgresIterateDirectModify
2761 * Execute a direct foreign table modification
2762 */
2763static TupleTableSlot *
2765{
2767 EState *estate = node->ss.ps.state;
2768 ResultRelInfo *resultRelInfo = node->resultRelInfo;
2769
2770 /*
2771 * If this is the first call after Begin, execute the statement.
2772 */
2773 if (dmstate->num_tuples == -1)
2774 execute_dml_stmt(node);
2775
2776 /*
2777 * If the local query doesn't specify RETURNING, just clear tuple slot.
2778 */
2779 if (!resultRelInfo->ri_projectReturning)
2780 {
2781 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2782 Instrumentation *instr = node->ss.ps.instrument;
2783
2784 Assert(!dmstate->has_returning);
2785
2786 /* Increment the command es_processed count if necessary. */
2787 if (dmstate->set_processed)
2788 estate->es_processed += dmstate->num_tuples;
2789
2790 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2791 if (instr)
2792 instr->tuplecount += dmstate->num_tuples;
2793
2794 return ExecClearTuple(slot);
2795 }
2796
2797 /*
2798 * Get the next RETURNING tuple.
2799 */
2800 return get_returning_data(node);
2801}
2802
2803/*
2804 * postgresEndDirectModify
2805 * Finish a direct foreign table modification
2806 */
2807static void
2809{
2811
2812 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2813 if (dmstate == NULL)
2814 return;
2815
2816 /* Release PGresult */
2817 PQclear(dmstate->result);
2818
2819 /* Release remote connection */
2821 dmstate->conn = NULL;
2822
2823 /* MemoryContext will be deleted automatically. */
2824}
2825
2826/*
2827 * postgresExplainForeignScan
2828 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2829 */
2830static void
2832{
2834 List *fdw_private = plan->fdw_private;
2835
2836 /*
2837 * Identify foreign scans that are really joins or upper relations. The
2838 * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2839 * digit string(s), which are RT indexes, with the correct relation names.
2840 * We do that here, not when the plan is created, because we can't know
2841 * what aliases ruleutils.c will assign at plan creation time.
2842 */
2843 if (list_length(fdw_private) > FdwScanPrivateRelations)
2844 {
2845 StringInfoData relations;
2846 char *rawrelations;
2847 char *ptr;
2848 int minrti,
2849 rtoffset;
2850
2852
2853 /*
2854 * A difficulty with using a string representation of RT indexes is
2855 * that setrefs.c won't update the string when flattening the
2856 * rangetable. To find out what rtoffset was applied, identify the
2857 * minimum RT index appearing in the string and compare it to the
2858 * minimum member of plan->fs_base_relids. (We expect all the relids
2859 * in the join will have been offset by the same amount; the Asserts
2860 * below should catch it if that ever changes.)
2861 */
2862 minrti = INT_MAX;
2863 ptr = rawrelations;
2864 while (*ptr)
2865 {
2866 if (isdigit((unsigned char) *ptr))
2867 {
2868 int rti = strtol(ptr, &ptr, 10);
2869
2870 if (rti < minrti)
2871 minrti = rti;
2872 }
2873 else
2874 ptr++;
2875 }
2876 rtoffset = bms_next_member(plan->fs_base_relids, -1) - minrti;
2877
2878 /* Now we can translate the string */
2879 initStringInfo(&relations);
2880 ptr = rawrelations;
2881 while (*ptr)
2882 {
2883 if (isdigit((unsigned char) *ptr))
2884 {
2885 int rti = strtol(ptr, &ptr, 10);
2887 char *relname;
2888 char *refname;
2889
2890 rti += rtoffset;
2891 Assert(bms_is_member(rti, plan->fs_base_relids));
2892 rte = rt_fetch(rti, es->rtable);
2893 Assert(rte->rtekind == RTE_RELATION);
2894 /* This logic should agree with explain.c's ExplainTargetRel */
2895 relname = get_rel_name(rte->relid);
2896 if (es->verbose)
2897 {
2898 char *namespace;
2899
2901 appendStringInfo(&relations, "%s.%s",
2902 quote_identifier(namespace),
2904 }
2905 else
2906 appendStringInfoString(&relations,
2908 refname = (char *) list_nth(es->rtable_names, rti - 1);
2909 if (refname == NULL)
2910 refname = rte->eref->aliasname;
2911 if (strcmp(refname, relname) != 0)
2912 appendStringInfo(&relations, " %s",
2913 quote_identifier(refname));
2914 }
2915 else
2916 appendStringInfoChar(&relations, *ptr++);
2917 }
2918 ExplainPropertyText("Relations", relations.data, es);
2919 }
2920
2921 /*
2922 * Add remote query, when VERBOSE option is specified.
2923 */
2924 if (es->verbose)
2925 {
2926 char *sql;
2927
2928 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2929 ExplainPropertyText("Remote SQL", sql, es);
2930 }
2931}
2932
2933/*
2934 * postgresExplainForeignModify
2935 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2936 */
2937static void
2939 ResultRelInfo *rinfo,
2940 List *fdw_private,
2941 int subplan_index,
2942 ExplainState *es)
2943{
2944 if (es->verbose)
2945 {
2946 char *sql = strVal(list_nth(fdw_private,
2948
2949 ExplainPropertyText("Remote SQL", sql, es);
2950
2951 /*
2952 * For INSERT we should always have batch size >= 1, but UPDATE and
2953 * DELETE don't support batching so don't show the property.
2954 */
2955 if (rinfo->ri_BatchSize > 0)
2956 ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
2957 }
2958}
2959
2960/*
2961 * postgresExplainDirectModify
2962 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2963 * foreign table directly
2964 */
2965static void
2967{
2968 List *fdw_private;
2969 char *sql;
2970
2971 if (es->verbose)
2972 {
2973 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2974 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2975 ExplainPropertyText("Remote SQL", sql, es);
2976 }
2977}
2978
2979/*
2980 * postgresExecForeignTruncate
2981 * Truncate one or more foreign tables
2982 */
2983static void
2985 DropBehavior behavior,
2986 bool restart_seqs)
2987{
2988 Oid serverid = InvalidOid;
2990 PGconn *conn = NULL;
2991 StringInfoData sql;
2992 ListCell *lc;
2993 bool server_truncatable = true;
2994
2995 /*
2996 * By default, all postgres_fdw foreign tables are assumed truncatable.
2997 * This can be overridden by a per-server setting, which in turn can be
2998 * overridden by a per-table setting.
2999 */
3000 foreach(lc, rels)
3001 {
3002 ForeignServer *server = NULL;
3003 Relation rel = lfirst(lc);
3005 ListCell *cell;
3006 bool truncatable;
3007
3008 /*
3009 * First time through, determine whether the foreign server allows
3010 * truncates. Since all specified foreign tables are assumed to belong
3011 * to the same foreign server, this result can be used for other
3012 * foreign tables.
3013 */
3014 if (!OidIsValid(serverid))
3015 {
3016 serverid = table->serverid;
3017 server = GetForeignServer(serverid);
3018
3019 foreach(cell, server->options)
3020 {
3021 DefElem *defel = (DefElem *) lfirst(cell);
3022
3023 if (strcmp(defel->defname, "truncatable") == 0)
3024 {
3026 break;
3027 }
3028 }
3029 }
3030
3031 /*
3032 * Confirm that all specified foreign tables belong to the same
3033 * foreign server.
3034 */
3035 Assert(table->serverid == serverid);
3036
3037 /* Determine whether this foreign table allows truncations */
3039 foreach(cell, table->options)
3040 {
3041 DefElem *defel = (DefElem *) lfirst(cell);
3042
3043 if (strcmp(defel->defname, "truncatable") == 0)
3044 {
3046 break;
3047 }
3048 }
3049
3050 if (!truncatable)
3051 ereport(ERROR,
3053 errmsg("foreign table \"%s\" does not allow truncates",
3055 }
3056 Assert(OidIsValid(serverid));
3057
3058 /*
3059 * Get connection to the foreign server. Connection manager will
3060 * establish new connection if necessary.
3061 */
3062 user = GetUserMapping(GetUserId(), serverid);
3063 conn = GetConnection(user, false, NULL);
3064
3065 /* Construct the TRUNCATE command string */
3066 initStringInfo(&sql);
3067 deparseTruncateSql(&sql, rels, behavior, restart_seqs);
3068
3069 /* Issue the TRUNCATE command to remote server */
3070 do_sql_command(conn, sql.data);
3071
3072 pfree(sql.data);
3073}
3074
3075/*
3076 * estimate_path_cost_size
3077 * Get cost and size estimates for a foreign scan on given foreign relation
3078 * either a base relation or a join between foreign relations or an upper
3079 * relation containing foreign relations.
3080 *
3081 * param_join_conds are the parameterization clauses with outer relations.
3082 * pathkeys specify the expected sort order if any for given path being costed.
3083 * fpextra specifies additional post-scan/join-processing steps such as the
3084 * final sort and the LIMIT restriction.
3085 *
3086 * The function returns the cost and size estimates in p_rows, p_width,
3087 * p_disabled_nodes, p_startup_cost and p_total_cost variables.
3088 */
3089static void
3091 RelOptInfo *foreignrel,
3093 List *pathkeys,
3095 double *p_rows, int *p_width,
3096 int *p_disabled_nodes,
3098{
3099 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
3100 double rows;
3101 double retrieved_rows;
3102 int width;
3103 int disabled_nodes = 0;
3104 Cost startup_cost;
3105 Cost total_cost;
3106
3107 /* Make sure the core code has set up the relation's reltarget */
3108 Assert(foreignrel->reltarget);
3109
3110 /*
3111 * If the table or the server is configured to use remote estimates,
3112 * connect to the foreign server and execute EXPLAIN to estimate the
3113 * number of rows selected by the restriction+join clauses. Otherwise,
3114 * estimate rows using whatever statistics we have locally, in a way
3115 * similar to ordinary tables.
3116 */
3117 if (fpinfo->use_remote_estimate)
3118 {
3121 StringInfoData sql;
3122 PGconn *conn;
3125 List *fdw_scan_tlist = NIL;
3126 List *remote_conds;
3127
3128 /* Required only to be passed to deparseSelectStmtForRel */
3129 List *retrieved_attrs;
3130
3131 /*
3132 * param_join_conds might contain both clauses that are safe to send
3133 * across, and clauses that aren't.
3134 */
3137
3138 /* Build the list of columns to be fetched from the foreign server. */
3139 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
3140 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
3141 else
3142 fdw_scan_tlist = NIL;
3143
3144 /*
3145 * The complete list of remote conditions includes everything from
3146 * baserestrictinfo plus any extra join_conds relevant to this
3147 * particular path.
3148 */
3149 remote_conds = list_concat(remote_param_join_conds,
3150 fpinfo->remote_conds);
3151
3152 /*
3153 * Construct EXPLAIN query including the desired SELECT, FROM, and
3154 * WHERE clauses. Params and other-relation Vars are replaced by dummy
3155 * values, so don't request params_list.
3156 */
3157 initStringInfo(&sql);
3158 appendStringInfoString(&sql, "EXPLAIN ");
3159 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
3160 remote_conds, pathkeys,
3161 fpextra ? fpextra->has_final_sort : false,
3162 fpextra ? fpextra->has_limit : false,
3163 false, &retrieved_attrs, NULL);
3164
3165 /* Get the remote estimate */
3166 conn = GetConnection(fpinfo->user, false, NULL);
3167 get_remote_estimate(sql.data, conn, &rows, &width,
3168 &startup_cost, &total_cost);
3170
3171 retrieved_rows = rows;
3172
3173 /* Factor in the selectivity of the locally-checked quals */
3176 foreignrel->relid,
3177 JOIN_INNER,
3178 NULL);
3179 local_sel *= fpinfo->local_conds_sel;
3180
3181 rows = clamp_row_est(rows * local_sel);
3182
3183 /* Add in the eval cost of the locally-checked quals */
3184 startup_cost += fpinfo->local_conds_cost.startup;
3185 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3187 startup_cost += local_cost.startup;
3188 total_cost += local_cost.per_tuple * retrieved_rows;
3189
3190 /*
3191 * Add in tlist eval cost for each output row. In case of an
3192 * aggregate, some of the tlist expressions such as grouping
3193 * expressions will be evaluated remotely, so adjust the costs.
3194 */
3195 startup_cost += foreignrel->reltarget->cost.startup;
3196 total_cost += foreignrel->reltarget->cost.startup;
3197 total_cost += foreignrel->reltarget->cost.per_tuple * rows;
3198 if (IS_UPPER_REL(foreignrel))
3199 {
3201
3202 cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
3203 startup_cost -= tlist_cost.startup;
3204 total_cost -= tlist_cost.startup;
3205 total_cost -= tlist_cost.per_tuple * rows;
3206 }
3207 }
3208 else
3209 {
3210 Cost run_cost = 0;
3211
3212 /*
3213 * We don't support join conditions in this mode (hence, no
3214 * parameterized paths can be made).
3215 */
3217
3218 /*
3219 * We will come here again and again with different set of pathkeys or
3220 * additional post-scan/join-processing steps that caller wants to
3221 * cost. We don't need to calculate the cost/size estimates for the
3222 * underlying scan, join, or grouping each time. Instead, use those
3223 * estimates if we have cached them already.
3224 */
3225 if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
3226 {
3227 Assert(fpinfo->retrieved_rows >= 0);
3228
3229 rows = fpinfo->rows;
3230 retrieved_rows = fpinfo->retrieved_rows;
3231 width = fpinfo->width;
3232 startup_cost = fpinfo->rel_startup_cost;
3233 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
3234
3235 /*
3236 * If we estimate the costs of a foreign scan or a foreign join
3237 * with additional post-scan/join-processing steps, the scan or
3238 * join costs obtained from the cache wouldn't yet contain the
3239 * eval costs for the final scan/join target, which would've been
3240 * updated by apply_scanjoin_target_to_paths(); add the eval costs
3241 * now.
3242 */
3243 if (fpextra && !IS_UPPER_REL(foreignrel))
3244 {
3245 /* Shouldn't get here unless we have LIMIT */
3246 Assert(fpextra->has_limit);
3247 Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
3248 foreignrel->reloptkind == RELOPT_JOINREL);
3249 startup_cost += foreignrel->reltarget->cost.startup;
3250 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3251 }
3252 }
3253 else if (IS_JOIN_REL(foreignrel))
3254 {
3259 double nrows;
3260
3261 /* Use rows/width estimates made by the core code. */
3262 rows = foreignrel->rows;
3263 width = foreignrel->reltarget->width;
3264
3265 /* For join we expect inner and outer relations set */
3266 Assert(fpinfo->innerrel && fpinfo->outerrel);
3267
3268 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
3269 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
3270
3271 /* Estimate of number of rows in cross product */
3272 nrows = fpinfo_i->rows * fpinfo_o->rows;
3273
3274 /*
3275 * Back into an estimate of the number of retrieved rows. Just in
3276 * case this is nuts, clamp to at most nrows.
3277 */
3278 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3279 retrieved_rows = Min(retrieved_rows, nrows);
3280
3281 /*
3282 * The cost of foreign join is estimated as cost of generating
3283 * rows for the joining relations + cost for applying quals on the
3284 * rows.
3285 */
3286
3287 /*
3288 * Calculate the cost of clauses pushed down to the foreign server
3289 */
3290 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
3291 /* Calculate the cost of applying join clauses */
3292 cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
3293
3294 /*
3295 * Startup cost includes startup cost of joining relations and the
3296 * startup cost for join and other clauses. We do not include the
3297 * startup cost specific to join strategy (e.g. setting up hash
3298 * tables) since we do not know what strategy the foreign server
3299 * is going to use.
3300 */
3301 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
3302 startup_cost += join_cost.startup;
3303 startup_cost += remote_conds_cost.startup;
3304 startup_cost += fpinfo->local_conds_cost.startup;
3305
3306 /*
3307 * Run time cost includes:
3308 *
3309 * 1. Run time cost (total_cost - startup_cost) of relations being
3310 * joined
3311 *
3312 * 2. Run time cost of applying join clauses on the cross product
3313 * of the joining relations.
3314 *
3315 * 3. Run time cost of applying pushed down other clauses on the
3316 * result of join
3317 *
3318 * 4. Run time cost of applying nonpushable other clauses locally
3319 * on the result fetched from the foreign server.
3320 */
3321 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
3322 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
3323 run_cost += nrows * join_cost.per_tuple;
3324 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
3325 run_cost += nrows * remote_conds_cost.per_tuple;
3326 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3327
3328 /* Add in tlist eval cost for each output row */
3329 startup_cost += foreignrel->reltarget->cost.startup;
3330 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3331 }
3332 else if (IS_UPPER_REL(foreignrel))
3333 {
3334 RelOptInfo *outerrel = fpinfo->outerrel;
3337 double input_rows;
3338 int numGroupCols;
3339 double numGroups = 1;
3340
3341 /* The upper relation should have its outer relation set */
3342 Assert(outerrel);
3343 /* and that outer relation should have its reltarget set */
3344 Assert(outerrel->reltarget);
3345
3346 /*
3347 * This cost model is mixture of costing done for sorted and
3348 * hashed aggregates in cost_agg(). We are not sure which
3349 * strategy will be considered at remote side, thus for
3350 * simplicity, we put all startup related costs in startup_cost
3351 * and all finalization and run cost are added in total_cost.
3352 */
3353
3354 ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
3355
3356 /* Get rows from input rel */
3357 input_rows = ofpinfo->rows;
3358
3359 /* Collect statistics about aggregates for estimating costs. */
3360 if (root->parse->hasAggs)
3361 {
3363 }
3364
3365 /* Get number of grouping columns and possible number of groups */
3366 numGroupCols = list_length(root->processed_groupClause);
3367 numGroups = estimate_num_groups(root,
3368 get_sortgrouplist_exprs(root->processed_groupClause,
3369 fpinfo->grouped_tlist),
3370 input_rows, NULL, NULL);
3371
3372 /*
3373 * Get the retrieved_rows and rows estimates. If there are HAVING
3374 * quals, account for their selectivity.
3375 */
3376 if (root->hasHavingQual)
3377 {
3378 /* Factor in the selectivity of the remotely-checked quals */
3379 retrieved_rows =
3380 clamp_row_est(numGroups *
3382 fpinfo->remote_conds,
3383 0,
3384 JOIN_INNER,
3385 NULL));
3386 /* Factor in the selectivity of the locally-checked quals */
3387 rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
3388 }
3389 else
3390 {
3391 rows = retrieved_rows = numGroups;
3392 }
3393
3394 /* Use width estimate made by the core code. */
3395 width = foreignrel->reltarget->width;
3396
3397 /*-----
3398 * Startup cost includes:
3399 * 1. Startup cost for underneath input relation, adjusted for
3400 * tlist replacement by apply_scanjoin_target_to_paths()
3401 * 2. Cost of performing aggregation, per cost_agg()
3402 *-----
3403 */
3404 startup_cost = ofpinfo->rel_startup_cost;
3405 startup_cost += outerrel->reltarget->cost.startup;
3406 startup_cost += aggcosts.transCost.startup;
3407 startup_cost += aggcosts.transCost.per_tuple * input_rows;
3408 startup_cost += aggcosts.finalCost.startup;
3409 startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3410
3411 /*-----
3412 * Run time cost includes:
3413 * 1. Run time cost of underneath input relation, adjusted for
3414 * tlist replacement by apply_scanjoin_target_to_paths()
3415 * 2. Run time cost of performing aggregation, per cost_agg()
3416 *-----
3417 */
3418 run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3419 run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3420 run_cost += aggcosts.finalCost.per_tuple * numGroups;
3421 run_cost += cpu_tuple_cost * numGroups;
3422
3423 /* Account for the eval cost of HAVING quals, if any */
3424 if (root->hasHavingQual)
3425 {
3427
3428 /* Add in the eval cost of the remotely-checked quals */
3429 cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3430 startup_cost += remote_cost.startup;
3431 run_cost += remote_cost.per_tuple * numGroups;
3432 /* Add in the eval cost of the locally-checked quals */
3433 startup_cost += fpinfo->local_conds_cost.startup;
3434 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3435 }
3436
3437 /* Add in tlist eval cost for each output row */
3438 startup_cost += foreignrel->reltarget->cost.startup;
3439 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3440 }
3441 else
3442 {
3444
3445 /* Use rows/width estimates made by set_baserel_size_estimates. */
3446 rows = foreignrel->rows;
3447 width = foreignrel->reltarget->width;
3448
3449 /*
3450 * Back into an estimate of the number of retrieved rows. Just in
3451 * case this is nuts, clamp to at most foreignrel->tuples.
3452 */
3453 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3454 retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3455
3456 /*
3457 * Cost as though this were a seqscan, which is pessimistic. We
3458 * effectively imagine the local_conds are being evaluated
3459 * remotely, too.
3460 */
3461 startup_cost = 0;
3462 run_cost = 0;
3463 run_cost += seq_page_cost * foreignrel->pages;
3464
3465 startup_cost += foreignrel->baserestrictcost.startup;
3467 run_cost += cpu_per_tuple * foreignrel->tuples;
3468
3469 /* Add in tlist eval cost for each output row */
3470 startup_cost += foreignrel->reltarget->cost.startup;
3471 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3472 }
3473
3474 /*
3475 * Without remote estimates, we have no real way to estimate the cost
3476 * of generating sorted output. It could be free if the query plan
3477 * the remote side would have chosen generates properly-sorted output
3478 * anyway, but in most cases it will cost something. Estimate a value
3479 * high enough that we won't pick the sorted path when the ordering
3480 * isn't locally useful, but low enough that we'll err on the side of
3481 * pushing down the ORDER BY clause when it's useful to do so.
3482 */
3483 if (pathkeys != NIL)
3484 {
3485 if (IS_UPPER_REL(foreignrel))
3486 {
3487 Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3488 fpinfo->stage == UPPERREL_GROUP_AGG);
3489
3490 /*
3491 * We can only get here when this function is called from
3492 * add_foreign_ordered_paths() or add_foreign_final_paths();
3493 * in which cases, the passed-in fpextra should not be NULL.
3494 */
3495 Assert(fpextra);
3497 retrieved_rows, width,
3498 fpextra->limit_tuples,
3499 &disabled_nodes,
3500 &startup_cost, &run_cost);
3501 }
3502 else
3503 {
3504 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3505 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3506 }
3507 }
3508
3509 total_cost = startup_cost + run_cost;
3510
3511 /* Adjust the cost estimates if we have LIMIT */
3512 if (fpextra && fpextra->has_limit)
3513 {
3514 adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3515 fpextra->offset_est, fpextra->count_est);
3516 retrieved_rows = rows;
3517 }
3518 }
3519
3520 /*
3521 * If this includes the final sort step, the given target, which will be
3522 * applied to the resulting path, might have different expressions from
3523 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3524 * eval costs.
3525 */
3526 if (fpextra && fpextra->has_final_sort &&
3527 fpextra->target != foreignrel->reltarget)
3528 {
3529 QualCost oldcost = foreignrel->reltarget->cost;
3530 QualCost newcost = fpextra->target->cost;
3531
3532 startup_cost += newcost.startup - oldcost.startup;
3533 total_cost += newcost.startup - oldcost.startup;
3534 total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3535 }
3536
3537 /*
3538 * Cache the retrieved rows and cost estimates for scans, joins, or
3539 * groupings without any parameterization, pathkeys, or additional
3540 * post-scan/join-processing steps, before adding the costs for
3541 * transferring data from the foreign server. These estimates are useful
3542 * for costing remote joins involving this relation or costing other
3543 * remote operations on this relation such as remote sorts and remote
3544 * LIMIT restrictions, when the costs can not be obtained from the foreign
3545 * server. This function will be called at least once for every foreign
3546 * relation without any parameterization, pathkeys, or additional
3547 * post-scan/join-processing steps.
3548 */
3549 if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3550 {
3551 fpinfo->retrieved_rows = retrieved_rows;
3552 fpinfo->rel_startup_cost = startup_cost;
3553 fpinfo->rel_total_cost = total_cost;
3554 }
3555
3556 /*
3557 * Add some additional cost factors to account for connection overhead
3558 * (fdw_startup_cost), transferring data across the network
3559 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3560 * (cpu_tuple_cost per retrieved row).
3561 */
3562 startup_cost += fpinfo->fdw_startup_cost;
3563 total_cost += fpinfo->fdw_startup_cost;
3564 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3565 total_cost += cpu_tuple_cost * retrieved_rows;
3566
3567 /*
3568 * If we have LIMIT, we should prefer performing the restriction remotely
3569 * rather than locally, as the former avoids extra row fetches from the
3570 * remote that the latter might cause. But since the core code doesn't
3571 * account for such fetches when estimating the costs of the local
3572 * restriction (see create_limit_path()), there would be no difference
3573 * between the costs of the local restriction and the costs of the remote
3574 * restriction estimated above if we don't use remote estimates (except
3575 * for the case where the foreignrel is a grouping relation, the given
3576 * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3577 * accounted for in costing the remote restriction). Tweak the costs of
3578 * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3579 * one.
3580 */
3581 if (!fpinfo->use_remote_estimate &&
3582 fpextra && fpextra->has_limit &&
3583 fpextra->limit_tuples > 0 &&
3584 fpextra->limit_tuples < fpinfo->rows)
3585 {
3586 Assert(fpinfo->rows > 0);
3587 total_cost -= (total_cost - startup_cost) * 0.05 *
3588 (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3589 }
3590
3591 /* Return results. */
3592 *p_rows = rows;
3593 *p_width = width;
3594 *p_disabled_nodes = disabled_nodes;
3595 *p_startup_cost = startup_cost;
3596 *p_total_cost = total_cost;
3597}
3598
3599/*
3600 * Estimate costs of executing a SQL statement remotely.
3601 * The given "sql" must be an EXPLAIN command.
3602 */
3603static void
3605 double *rows, int *width,
3606 Cost *startup_cost, Cost *total_cost)
3607{
3608 PGresult *res;
3609 char *line;
3610 char *p;
3611 int n;
3612
3613 /*
3614 * Execute EXPLAIN remotely.
3615 */
3616 res = pgfdw_exec_query(conn, sql, NULL);
3617 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3618 pgfdw_report_error(res, conn, sql);
3619
3620 /*
3621 * Extract cost numbers for topmost plan node. Note we search for a left
3622 * paren from the end of the line to avoid being confused by other uses of
3623 * parentheses.
3624 */
3625 line = PQgetvalue(res, 0, 0);
3626 p = strrchr(line, '(');
3627 if (p == NULL)
3628 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3629 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3630 startup_cost, total_cost, rows, width);
3631 if (n != 4)
3632 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3633 PQclear(res);
3634}
3635
3636/*
3637 * Adjust the cost estimates of a foreign grouping path to include the cost of
3638 * generating properly-sorted output.
3639 */
3640static void
3642 List *pathkeys,
3643 double retrieved_rows,
3644 double width,
3645 double limit_tuples,
3646 int *p_disabled_nodes,
3649{
3650 /*
3651 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3652 * side is unlikely to generate properly-sorted output, so it would need
3653 * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3654 * if the GROUP BY clause is sort-able but isn't a superset of the given
3655 * pathkeys, adjust the costs with that function. Otherwise, adjust the
3656 * costs by applying the same heuristic as for the scan or join case.
3657 */
3658 if (!grouping_is_sortable(root->processed_groupClause) ||
3659 !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3660 {
3661 Path sort_path; /* dummy for result of cost_sort */
3662
3664 root,
3665 pathkeys,
3666 0,
3668 retrieved_rows,
3669 width,
3670 0.0,
3671 work_mem,
3672 limit_tuples);
3673
3674 *p_startup_cost = sort_path.startup_cost;
3675 *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3676 }
3677 else
3678 {
3679 /*
3680 * The default extra cost seems too large for foreign-grouping cases;
3681 * add 1/4th of that default.
3682 */
3684 - 1.0) * 0.25;
3685
3688 }
3689}
3690
3691/*
3692 * Detect whether we want to process an EquivalenceClass member.
3693 *
3694 * This is a callback for use by generate_implied_equalities_for_column.
3695 */
3696static bool
3699 void *arg)
3700{
3702 Expr *expr = em->em_expr;
3703
3704 /*
3705 * If we've identified what we're processing in the current scan, we only
3706 * want to match that expression.
3707 */
3708 if (state->current != NULL)
3709 return equal(expr, state->current);
3710
3711 /*
3712 * Otherwise, ignore anything we've already processed.
3713 */
3714 if (list_member(state->already_used, expr))
3715 return false;
3716
3717 /* This is the new target to process. */
3718 state->current = expr;
3719 return true;
3720}
3721
3722/*
3723 * Create cursor for node's query with current parameter values.
3724 */
3725static void
3727{
3728 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3729 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3730 int numParams = fsstate->numParams;
3731 const char **values = fsstate->param_values;
3732 PGconn *conn = fsstate->conn;
3734 PGresult *res;
3735
3736 /* First, process a pending asynchronous request, if any. */
3737 if (fsstate->conn_state->pendingAreq)
3739
3740 /*
3741 * Construct array of query parameter values in text format. We do the
3742 * conversions in the short-lived per-tuple context, so as not to cause a
3743 * memory leak over repeated scans.
3744 */
3745 if (numParams > 0)
3746 {
3747 MemoryContext oldcontext;
3748
3749 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3750
3751 process_query_params(econtext,
3752 fsstate->param_flinfo,
3753 fsstate->param_exprs,
3754 values);
3755
3756 MemoryContextSwitchTo(oldcontext);
3757 }
3758
3759 /* Construct the DECLARE CURSOR command */
3761 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3762 fsstate->cursor_number, fsstate->query);
3763
3764 /*
3765 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3766 * to infer types for all parameters. Since we explicitly cast every
3767 * parameter (see deparse.c), the "inference" is trivial and will produce
3768 * the desired result. This allows us to avoid assuming that the remote
3769 * server has the same OIDs we do for the parameters' types.
3770 */
3771 if (!PQsendQueryParams(conn, buf.data, numParams,
3772 NULL, values, NULL, NULL, 0))
3774
3775 /*
3776 * Get the result, and check for success.
3777 */
3778 res = pgfdw_get_result(conn);
3779 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3780 pgfdw_report_error(res, conn, fsstate->query);
3781 PQclear(res);
3782
3783 /* Mark the cursor as created, and show no tuples have been retrieved */
3784 fsstate->cursor_exists = true;
3785 fsstate->tuples = NULL;
3786 fsstate->num_tuples = 0;
3787 fsstate->next_tuple = 0;
3788 fsstate->fetch_ct_2 = 0;
3789 fsstate->eof_reached = false;
3790
3791 /* Clean up */
3792 pfree(buf.data);
3793}
3794
3795/*
3796 * Fetch some more rows from the node's cursor.
3797 */
3798static void
3800{
3801 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3802 PGconn *conn = fsstate->conn;
3803 PGresult *res;
3804 int numrows;
3805 int i;
3806 MemoryContext oldcontext;
3807
3808 /*
3809 * We'll store the tuples in the batch_cxt. First, flush the previous
3810 * batch.
3811 */
3812 fsstate->tuples = NULL;
3813 MemoryContextReset(fsstate->batch_cxt);
3814 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3815
3816 if (fsstate->async_capable)
3817 {
3818 Assert(fsstate->conn_state->pendingAreq);
3819
3820 /*
3821 * The query was already sent by an earlier call to
3822 * fetch_more_data_begin. So now we just fetch the result.
3823 */
3824 res = pgfdw_get_result(conn);
3825 /* On error, report the original query, not the FETCH. */
3826 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3827 pgfdw_report_error(res, conn, fsstate->query);
3828
3829 /* Reset per-connection state */
3830 fsstate->conn_state->pendingAreq = NULL;
3831 }
3832 else
3833 {
3834 char sql[64];
3835
3836 /* This is a regular synchronous fetch. */
3837 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3838 fsstate->fetch_size, fsstate->cursor_number);
3839
3840 res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
3841 /* On error, report the original query, not the FETCH. */
3842 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3843 pgfdw_report_error(res, conn, fsstate->query);
3844 }
3845
3846 /* Convert the data into HeapTuples */
3847 numrows = PQntuples(res);
3848 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3849 fsstate->num_tuples = numrows;
3850 fsstate->next_tuple = 0;
3851
3852 for (i = 0; i < numrows; i++)
3853 {
3854 Assert(IsA(node->ss.ps.plan, ForeignScan));
3855
3856 fsstate->tuples[i] =
3858 fsstate->rel,
3859 fsstate->attinmeta,
3860 fsstate->retrieved_attrs,
3861 node,
3862 fsstate->temp_cxt);
3863 }
3864
3865 /* Update fetch_ct_2 */
3866 if (fsstate->fetch_ct_2 < 2)
3867 fsstate->fetch_ct_2++;
3868
3869 /* Must be EOF if we didn't get as many tuples as we asked for. */
3870 fsstate->eof_reached = (numrows < fsstate->fetch_size);
3871
3872 PQclear(res);
3873
3874 MemoryContextSwitchTo(oldcontext);
3875}
3876
3877/*
3878 * Force assorted GUC parameters to settings that ensure that we'll output
3879 * data values in a form that is unambiguous to the remote server.
3880 *
3881 * This is rather expensive and annoying to do once per row, but there's
3882 * little choice if we want to be sure values are transmitted accurately;
3883 * we can't leave the settings in place between rows for fear of affecting
3884 * user-visible computations.
3885 *
3886 * We use the equivalent of a function SET option to allow the settings to
3887 * persist only until the caller calls reset_transmission_modes(). If an
3888 * error is thrown in between, guc.c will take care of undoing the settings.
3889 *
3890 * The return value is the nestlevel that must be passed to
3891 * reset_transmission_modes() to undo things.
3892 */
3893int
3895{
3896 int nestlevel = NewGUCNestLevel();
3897
3898 /*
3899 * The values set here should match what pg_dump does. See also
3900 * configure_remote_session in connection.c.
3901 */
3902 if (DateStyle != USE_ISO_DATES)
3903 (void) set_config_option("datestyle", "ISO",
3905 GUC_ACTION_SAVE, true, 0, false);
3907 (void) set_config_option("intervalstyle", "postgres",
3909 GUC_ACTION_SAVE, true, 0, false);
3910 if (extra_float_digits < 3)
3911 (void) set_config_option("extra_float_digits", "3",
3913 GUC_ACTION_SAVE, true, 0, false);
3914
3915 /*
3916 * In addition force restrictive search_path, in case there are any
3917 * regproc or similar constants to be printed.
3918 */
3919 (void) set_config_option("search_path", "pg_catalog",
3921 GUC_ACTION_SAVE, true, 0, false);
3922
3923 return nestlevel;
3924}
3925
3926/*
3927 * Undo the effects of set_transmission_modes().
3928 */
3929void
3934
3935/*
3936 * Utility routine to close a cursor.
3937 */
3938static void
3940 PgFdwConnState *conn_state)
3941{
3942 char sql[64];
3943 PGresult *res;
3944
3945 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3946 res = pgfdw_exec_query(conn, sql, conn_state);
3947 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3948 pgfdw_report_error(res, conn, sql);
3949 PQclear(res);
3950}
3951
3952/*
3953 * create_foreign_modify
3954 * Construct an execution state of a foreign insert/update/delete
3955 * operation
3956 */
3957static PgFdwModifyState *
3960 ResultRelInfo *resultRelInfo,
3961 CmdType operation,
3962 Plan *subplan,
3963 char *query,
3964 List *target_attrs,
3965 int values_end,
3966 bool has_returning,
3967 List *retrieved_attrs)
3968{
3970 Relation rel = resultRelInfo->ri_RelationDesc;
3971 TupleDesc tupdesc = RelationGetDescr(rel);
3972 Oid userid;
3976 Oid typefnoid;
3977 bool isvarlena;
3978 ListCell *lc;
3979
3980 /* Begin constructing PgFdwModifyState. */
3982 fmstate->rel = rel;
3983
3984 /* Identify which user to do the remote access as. */
3985 userid = ExecGetResultRelCheckAsUser(resultRelInfo, estate);
3986
3987 /* Get info about foreign table. */
3989 user = GetUserMapping(userid, table->serverid);
3990
3991 /* Open connection; report that we'll create a prepared statement. */
3992 fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
3993 fmstate->p_name = NULL; /* prepared statement not made yet */
3994
3995 /* Set up remote query information. */
3996 fmstate->query = query;
3997 if (operation == CMD_INSERT)
3998 {
3999 fmstate->query = pstrdup(fmstate->query);
4000 fmstate->orig_query = pstrdup(fmstate->query);
4001 }
4002 fmstate->target_attrs = target_attrs;
4003 fmstate->values_end = values_end;
4004 fmstate->has_returning = has_returning;
4005 fmstate->retrieved_attrs = retrieved_attrs;
4006
4007 /* Create context for per-tuple temp workspace. */
4008 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
4009 "postgres_fdw temporary data",
4011
4012 /* Prepare for input conversion of RETURNING results. */
4013 if (fmstate->has_returning)
4014 fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
4015
4016 /* Prepare for output conversion of parameters used in prepared stmt. */
4017 n_params = list_length(fmstate->target_attrs) + 1;
4018 fmstate->p_flinfo = palloc0_array(FmgrInfo, n_params);
4019 fmstate->p_nums = 0;
4020
4021 if (operation == CMD_UPDATE || operation == CMD_DELETE)
4022 {
4023 Assert(subplan != NULL);
4024
4025 /* Find the ctid resjunk column in the subplan's result */
4026 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
4027 "ctid");
4028 if (!AttributeNumberIsValid(fmstate->ctidAttno))
4029 elog(ERROR, "could not find junk ctid column");
4030
4031 /* First transmittable parameter will be ctid */
4033 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4034 fmstate->p_nums++;
4035 }
4036
4037 if (operation == CMD_INSERT || operation == CMD_UPDATE)
4038 {
4039 /* Set up for remaining transmittable parameters */
4040 foreach(lc, fmstate->target_attrs)
4041 {
4042 int attnum = lfirst_int(lc);
4043 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4044
4045 Assert(!attr->attisdropped);
4046
4047 /* Ignore generated columns; they are set to DEFAULT */
4048 if (attr->attgenerated)
4049 continue;
4050 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
4051 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4052 fmstate->p_nums++;
4053 }
4054 }
4055
4056 Assert(fmstate->p_nums <= n_params);
4057
4058 /* Set batch_size from foreign server/table options. */
4059 if (operation == CMD_INSERT)
4060 fmstate->batch_size = get_batch_size_option(rel);
4061
4062 fmstate->num_slots = 1;
4063
4064 /* Initialize auxiliary state */
4065 fmstate->aux_fmstate = NULL;
4066
4067 return fmstate;
4068}
4069
4070/*
4071 * execute_foreign_modify
4072 * Perform foreign-table modification as required, and fetch RETURNING
4073 * result if any. (This is the shared guts of postgresExecForeignInsert,
4074 * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
4075 * postgresExecForeignDelete.)
4076 */
4077static TupleTableSlot **
4079 ResultRelInfo *resultRelInfo,
4080 CmdType operation,
4081 TupleTableSlot **slots,
4083 int *numSlots)
4084{
4086 ItemPointer ctid = NULL;
4087 const char **p_values;
4088 PGresult *res;
4089 int n_rows;
4090 StringInfoData sql;
4091
4092 /* The operation should be INSERT, UPDATE, or DELETE */
4093 Assert(operation == CMD_INSERT ||
4094 operation == CMD_UPDATE ||
4095 operation == CMD_DELETE);
4096
4097 /* First, process a pending asynchronous request, if any. */
4098 if (fmstate->conn_state->pendingAreq)
4099 process_pending_request(fmstate->conn_state->pendingAreq);
4100
4101 /*
4102 * If the existing query was deparsed and prepared for a different number
4103 * of rows, rebuild it for the proper number.
4104 */
4105 if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
4106 {
4107 /* Destroy the prepared statement created previously */
4108 if (fmstate->p_name)
4110
4111 /* Build INSERT string with numSlots records in its VALUES clause. */
4112 initStringInfo(&sql);
4113 rebuildInsertSql(&sql, fmstate->rel,
4114 fmstate->orig_query, fmstate->target_attrs,
4115 fmstate->values_end, fmstate->p_nums,
4116 *numSlots - 1);
4117 pfree(fmstate->query);
4118 fmstate->query = sql.data;
4119 fmstate->num_slots = *numSlots;
4120 }
4121
4122 /* Set up the prepared statement on the remote server, if we didn't yet */
4123 if (!fmstate->p_name)
4125
4126 /*
4127 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
4128 */
4129 if (operation == CMD_UPDATE || operation == CMD_DELETE)
4130 {
4131 Datum datum;
4132 bool isNull;
4133
4135 fmstate->ctidAttno,
4136 &isNull);
4137 /* shouldn't ever get a null result... */
4138 if (isNull)
4139 elog(ERROR, "ctid is NULL");
4140 ctid = (ItemPointer) DatumGetPointer(datum);
4141 }
4142
4143 /* Convert parameters needed by prepared statement to text form */
4145
4146 /*
4147 * Execute the prepared statement.
4148 */
4149 if (!PQsendQueryPrepared(fmstate->conn,
4150 fmstate->p_name,
4151 fmstate->p_nums * (*numSlots),
4152 p_values,
4153 NULL,
4154 NULL,
4155 0))
4156 pgfdw_report_error(NULL, fmstate->conn, fmstate->query);
4157
4158 /*
4159 * Get the result, and check for success.
4160 */
4161 res = pgfdw_get_result(fmstate->conn);
4162 if (PQresultStatus(res) !=
4163 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4164 pgfdw_report_error(res, fmstate->conn, fmstate->query);
4165
4166 /* Check number of rows affected, and fetch RETURNING tuple if any */
4167 if (fmstate->has_returning)
4168 {
4169 Assert(*numSlots == 1);
4170 n_rows = PQntuples(res);
4171 if (n_rows > 0)
4172 store_returning_result(fmstate, slots[0], res);
4173 }
4174 else
4175 n_rows = atoi(PQcmdTuples(res));
4176
4177 /* And clean up */
4178 PQclear(res);
4179
4180 MemoryContextReset(fmstate->temp_cxt);
4181
4182 *numSlots = n_rows;
4183
4184 /*
4185 * Return NULL if nothing was inserted/updated/deleted on the remote end
4186 */
4187 return (n_rows > 0) ? slots : NULL;
4188}
4189
4190/*
4191 * prepare_foreign_modify
4192 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
4193 */
4194static void
4196{
4197 char prep_name[NAMEDATALEN];
4198 char *p_name;
4199 PGresult *res;
4200
4201 /*
4202 * The caller would already have processed a pending asynchronous request
4203 * if any, so no need to do it here.
4204 */
4205
4206 /* Construct name we'll use for the prepared statement. */
4207 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
4208 GetPrepStmtNumber(fmstate->conn));
4209 p_name = pstrdup(prep_name);
4210
4211 /*
4212 * We intentionally do not specify parameter types here, but leave the
4213 * remote server to derive them by default. This avoids possible problems
4214 * with the remote server using different type OIDs than we do. All of
4215 * the prepared statements we use in this module are simple enough that
4216 * the remote server will make the right choices.
4217 */
4218 if (!PQsendPrepare(fmstate->conn,
4219 p_name,
4220 fmstate->query,
4221 0,
4222 NULL))
4223 pgfdw_report_error(NULL, fmstate->conn, fmstate->query);
4224
4225 /*
4226 * Get the result, and check for success.
4227 */
4228 res = pgfdw_get_result(fmstate->conn);
4229 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4230 pgfdw_report_error(res, fmstate->conn, fmstate->query);
4231 PQclear(res);
4232
4233 /* This action shows that the prepare has been done. */
4234 fmstate->p_name = p_name;
4235}
4236
4237/*
4238 * convert_prep_stmt_params
4239 * Create array of text strings representing parameter values
4240 *
4241 * tupleid is ctid to send, or NULL if none
4242 * slot is slot to get remaining parameters from, or NULL if none
4243 *
4244 * Data is constructed in temp_cxt; caller should reset that after use.
4245 */
4246static const char **
4249 TupleTableSlot **slots,
4250 int numSlots)
4251{
4252 const char **p_values;
4253 int i;
4254 int j;
4255 int pindex = 0;
4256 MemoryContext oldcontext;
4257
4258 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
4259
4260 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
4261
4262 /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
4263 Assert(!(tupleid != NULL && numSlots > 1));
4264
4265 /* 1st parameter should be ctid, if it's in use */
4266 if (tupleid != NULL)
4267 {
4268 Assert(numSlots == 1);
4269 /* don't need set_transmission_modes for TID output */
4272 pindex++;
4273 }
4274
4275 /* get following parameters from slots */
4276 if (slots != NULL && fmstate->target_attrs != NIL)
4277 {
4278 TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
4279 int nestlevel;
4280 ListCell *lc;
4281
4283
4284 for (i = 0; i < numSlots; i++)
4285 {
4286 j = (tupleid != NULL) ? 1 : 0;
4287 foreach(lc, fmstate->target_attrs)
4288 {
4289 int attnum = lfirst_int(lc);
4290 CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1);
4291 Datum value;
4292 bool isnull;
4293
4294 /* Ignore generated columns; they are set to DEFAULT */
4295 if (attr->attgenerated)
4296 continue;
4297 value = slot_getattr(slots[i], attnum, &isnull);
4298 if (isnull)
4299 p_values[pindex] = NULL;
4300 else
4302 value);
4303 pindex++;
4304 j++;
4305 }
4306 }
4307
4309 }
4310
4311 Assert(pindex == fmstate->p_nums * numSlots);
4312
4313 MemoryContextSwitchTo(oldcontext);
4314
4315 return p_values;
4316}
4317
4318/*
4319 * store_returning_result
4320 * Store the result of a RETURNING clause
4321 */
4322static void
4324 TupleTableSlot *slot, PGresult *res)
4325{
4327
4329 fmstate->rel,
4330 fmstate->attinmeta,
4331 fmstate->retrieved_attrs,
4332 NULL,
4333 fmstate->temp_cxt);
4334
4335 /*
4336 * The returning slot will not necessarily be suitable to store heaptuples
4337 * directly, so allow for conversion.
4338 */
4339 ExecForceStoreHeapTuple(newtup, slot, true);
4340}
4341
4342/*
4343 * finish_foreign_modify
4344 * Release resources for a foreign insert/update/delete operation
4345 */
4346static void
4348{
4349 Assert(fmstate != NULL);
4350
4351 /* If we created a prepared statement, destroy it */
4353
4354 /* Release remote connection */
4356 fmstate->conn = NULL;
4357}
4358
4359/*
4360 * deallocate_query
4361 * Deallocate a prepared statement for a foreign insert/update/delete
4362 * operation
4363 */
4364static void
4366{
4367 char sql[64];
4368 PGresult *res;
4369
4370 /* do nothing if the query is not allocated */
4371 if (!fmstate->p_name)
4372 return;
4373
4374 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
4375 res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
4376 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4377 pgfdw_report_error(res, fmstate->conn, sql);
4378 PQclear(res);
4379 pfree(fmstate->p_name);
4380 fmstate->p_name = NULL;
4381}
4382
4383/*
4384 * build_remote_returning
4385 * Build a RETURNING targetlist of a remote query for performing an
4386 * UPDATE/DELETE .. RETURNING on a join directly
4387 */
4388static List *
4389build_remote_returning(Index rtindex, Relation rel, List *returningList)
4390{
4391 bool have_wholerow = false;
4392 List *tlist = NIL;
4393 List *vars;
4394 ListCell *lc;
4395
4396 Assert(returningList);
4397
4398 vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
4399
4400 /*
4401 * If there's a whole-row reference to the target relation, then we'll
4402 * need all the columns of the relation.
4403 */
4404 foreach(lc, vars)
4405 {
4406 Var *var = (Var *) lfirst(lc);
4407
4408 if (IsA(var, Var) &&
4409 var->varno == rtindex &&
4411 {
4412 have_wholerow = true;
4413 break;
4414 }
4415 }
4416
4417 if (have_wholerow)
4418 {
4419 TupleDesc tupdesc = RelationGetDescr(rel);
4420 int i;
4421
4422 for (i = 1; i <= tupdesc->natts; i++)
4423 {
4424 Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
4425 Var *var;
4426
4427 /* Ignore dropped attributes. */
4428 if (attr->attisdropped)
4429 continue;
4430
4431 var = makeVar(rtindex,
4432 i,
4433 attr->atttypid,
4434 attr->atttypmod,
4435 attr->attcollation,
4436 0);
4437
4438 tlist = lappend(tlist,
4439 makeTargetEntry((Expr *) var,
4440 list_length(tlist) + 1,
4441 NULL,
4442 false));
4443 }
4444 }
4445
4446 /* Now add any remaining columns to tlist. */
4447 foreach(lc, vars)
4448 {
4449 Var *var = (Var *) lfirst(lc);
4450
4451 /*
4452 * No need for whole-row references to the target relation. We don't
4453 * need system columns other than ctid and oid either, since those are
4454 * set locally.
4455 */
4456 if (IsA(var, Var) &&
4457 var->varno == rtindex &&
4458 var->varattno <= InvalidAttrNumber &&
4460 continue; /* don't need it */
4461
4462 if (tlist_member((Expr *) var, tlist))
4463 continue; /* already got it */
4464
4465 tlist = lappend(tlist,
4466 makeTargetEntry((Expr *) var,
4467 list_length(tlist) + 1,
4468 NULL,
4469 false));
4470 }
4471
4472 list_free(vars);
4473
4474 return tlist;
4475}
4476
4477/*
4478 * rebuild_fdw_scan_tlist
4479 * Build new fdw_scan_tlist of given foreign-scan plan node from given
4480 * tlist
4481 *
4482 * There might be columns that the fdw_scan_tlist of the given foreign-scan
4483 * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4484 * have contained resjunk columns such as 'ctid' of the target relation and
4485 * 'wholerow' of non-target relations, but the tlist might not contain them,
4486 * for example. So, adjust the tlist so it contains all the columns specified
4487 * in the fdw_scan_tlist; else setrefs.c will get confused.
4488 */
4489static void
4491{
4492 List *new_tlist = tlist;
4493 List *old_tlist = fscan->fdw_scan_tlist;
4494 ListCell *lc;
4495
4496 foreach(lc, old_tlist)
4497 {
4499
4500 if (tlist_member(tle->expr, new_tlist))
4501 continue; /* already got it */
4502
4504 makeTargetEntry(tle->expr,
4506 NULL,
4507 false));
4508 }
4509 fscan->fdw_scan_tlist = new_tlist;
4510}
4511
4512/*
4513 * Execute a direct UPDATE/DELETE statement.
4514 */
4515static void
4517{
4519 ExprContext *econtext = node->ss.ps.ps_ExprContext;
4520 int numParams = dmstate->numParams;
4521 const char **values = dmstate->param_values;
4522
4523 /* First, process a pending asynchronous request, if any. */
4524 if (dmstate->conn_state->pendingAreq)
4525 process_pending_request(dmstate->conn_state->pendingAreq);
4526
4527 /*
4528 * Construct array of query parameter values in text format.
4529 */
4530 if (numParams > 0)
4531 process_query_params(econtext,
4532 dmstate->param_flinfo,
4533 dmstate->param_exprs,
4534 values);
4535
4536 /*
4537 * Notice that we pass NULL for paramTypes, thus forcing the remote server
4538 * to infer types for all parameters. Since we explicitly cast every
4539 * parameter (see deparse.c), the "inference" is trivial and will produce
4540 * the desired result. This allows us to avoid assuming that the remote
4541 * server has the same OIDs we do for the parameters' types.
4542 */
4543 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4544 NULL, values, NULL, NULL, 0))
4545 pgfdw_report_error(NULL, dmstate->conn, dmstate->query);
4546
4547 /*
4548 * Get the result, and check for success.
4549 */
4550 dmstate->result = pgfdw_get_result(dmstate->conn);
4551 if (PQresultStatus(dmstate->result) !=
4552 (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4553 pgfdw_report_error(dmstate->result, dmstate->conn,
4554 dmstate->query);
4555
4556 /*
4557 * The result potentially needs to survive across multiple executor row
4558 * cycles, so move it to the context where the dmstate is.
4559 */
4560 dmstate->result = libpqsrv_PGresultSetParent(dmstate->result,
4562
4563 /* Get the number of rows affected. */
4564 if (dmstate->has_returning)
4565 dmstate->num_tuples = PQntuples(dmstate->result);
4566 else
4567 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4568}
4569
4570/*
4571 * Get the result of a RETURNING clause.
4572 */
4573static TupleTableSlot *
4575{
4577 EState *estate = node->ss.ps.state;
4578 ResultRelInfo *resultRelInfo = node->resultRelInfo;
4579 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4581
4582 Assert(resultRelInfo->ri_projectReturning);
4583
4584 /* If we didn't get any tuples, must be end of data. */
4585 if (dmstate->next_tuple >= dmstate->num_tuples)
4586 return ExecClearTuple(slot);
4587
4588 /* Increment the command es_processed count if necessary. */
4589 if (dmstate->set_processed)
4590 estate->es_processed += 1;
4591
4592 /*
4593 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4594 * tuple. (has_returning is false when the local query is of the form
4595 * "UPDATE/DELETE .. RETURNING 1" for example.)
4596 */
4597 if (!dmstate->has_returning)
4598 {
4600 resultSlot = slot;
4601 }
4602 else
4603 {
4605
4607 dmstate->next_tuple,
4608 dmstate->rel,
4609 dmstate->attinmeta,
4610 dmstate->retrieved_attrs,
4611 node,
4612 dmstate->temp_cxt);
4613 ExecStoreHeapTuple(newtup, slot, false);
4614 /* Get the updated/deleted tuple. */
4615 if (dmstate->rel)
4616 resultSlot = slot;
4617 else
4618 resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate);
4619 }
4620 dmstate->next_tuple++;
4621
4622 /* Make slot available for evaluation of the local query RETURNING list. */
4624 resultSlot;
4625
4626 return slot;
4627}
4628
4629/*
4630 * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4631 */
4632static void
4634 List *fdw_scan_tlist,
4635 Index rtindex)
4636{
4638 ListCell *lc;
4639 int i;
4640
4641 /*
4642 * Calculate the mapping between the fdw_scan_tlist's entries and the
4643 * result tuple's attributes.
4644 *
4645 * The "map" is an array of indexes of the result tuple's attributes in
4646 * fdw_scan_tlist, i.e., one entry for every attribute of the result
4647 * tuple. We store zero for any attributes that don't have the
4648 * corresponding entries in that list, marking that a NULL is needed in
4649 * the result tuple.
4650 *
4651 * Also get the indexes of the entries for ctid and oid if any.
4652 */
4653 dmstate->attnoMap = (AttrNumber *)
4654 palloc0(resultTupType->natts * sizeof(AttrNumber));
4655
4656 dmstate->ctidAttno = dmstate->oidAttno = 0;
4657
4658 i = 1;
4659 dmstate->hasSystemCols = false;
4660 foreach(lc, fdw_scan_tlist)
4661 {
4663 Var *var = (Var *) tle->expr;
4664
4665 Assert(IsA(var, Var));
4666
4667 /*
4668 * If the Var is a column of the target relation to be retrieved from
4669 * the foreign server, get the index of the entry.
4670 */
4671 if (var->varno == rtindex &&
4672 list_member_int(dmstate->retrieved_attrs, i))
4673 {
4674 int attrno = var->varattno;
4675
4676 if (attrno < 0)
4677 {
4678 /*
4679 * We don't retrieve system columns other than ctid and oid.
4680 */
4682 dmstate->ctidAttno = i;
4683 else
4684 Assert(false);
4685 dmstate->hasSystemCols = true;
4686 }
4687 else
4688 {
4689 /*
4690 * We don't retrieve whole-row references to the target
4691 * relation either.
4692 */
4693 Assert(attrno > 0);
4694
4695 dmstate->attnoMap[attrno - 1] = i;
4696 }
4697 }
4698 i++;
4699 }
4700}
4701
4702/*
4703 * Extract and return an updated/deleted tuple from a scan tuple.
4704 */
4705static TupleTableSlot *
4707 ResultRelInfo *resultRelInfo,
4708 TupleTableSlot *slot,
4709 EState *estate)
4710{
4713 Datum *values;
4714 bool *isnull;
4716 bool *old_isnull;
4717 int i;
4718
4719 /*
4720 * Use the return tuple slot as a place to store the result tuple.
4721 */
4722 resultSlot = ExecGetReturningSlot(estate, resultRelInfo);
4723
4724 /*
4725 * Extract all the values of the scan tuple.
4726 */
4727 slot_getallattrs(slot);
4728 old_values = slot->tts_values;
4729 old_isnull = slot->tts_isnull;
4730
4731 /*
4732 * Prepare to build the result tuple.
4733 */
4735 values = resultSlot->tts_values;
4736 isnull = resultSlot->tts_isnull;
4737
4738 /*
4739 * Transpose data into proper fields of the result tuple.
4740 */
4741 for (i = 0; i < resultTupType->natts; i++)
4742 {
4743 int j = dmstate->attnoMap[i];
4744
4745 if (j == 0)
4746 {
4747 values[i] = (Datum) 0;
4748 isnull[i] = true;
4749 }
4750 else
4751 {
4752 values[i] = old_values[j - 1];
4753 isnull[i] = old_isnull[j - 1];
4754 }
4755 }
4756
4757 /*
4758 * Build the virtual tuple.
4759 */
4761
4762 /*
4763 * If we have any system columns to return, materialize a heap tuple in
4764 * the slot from column values set above and install system columns in
4765 * that tuple.
4766 */
4767 if (dmstate->hasSystemCols)
4768 {
4770
4771 /* ctid */
4772 if (dmstate->ctidAttno)
4773 {
4774 ItemPointer ctid = NULL;
4775
4776 ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4777 resultTup->t_self = *ctid;
4778 }
4779
4780 /*
4781 * And remaining columns
4782 *
4783 * Note: since we currently don't allow the target relation to appear
4784 * on the nullable side of an outer join, any system columns wouldn't
4785 * go to NULL.
4786 *
4787 * Note: no need to care about tableoid here because it will be
4788 * initialized in ExecProcessReturning().
4789 */
4793 }
4794
4795 /*
4796 * And return the result tuple.
4797 */
4798 return resultSlot;
4799}
4800
4801/*
4802 * Prepare for processing of parameters used in remote query.
4803 */
4804static void
4806 List *fdw_exprs,
4807 int numParams,
4808 FmgrInfo **param_flinfo,
4809 List **param_exprs,
4810 const char ***param_values)
4811{
4812 int i;
4813 ListCell *lc;
4814
4815 Assert(numParams > 0);
4816
4817 /* Prepare for output conversion of parameters used in remote query. */
4818 *param_flinfo = palloc0_array(FmgrInfo, numParams);
4819
4820 i = 0;
4821 foreach(lc, fdw_exprs)
4822 {
4823 Node *param_expr = (Node *) lfirst(lc);
4824 Oid typefnoid;
4825 bool isvarlena;
4826
4828 fmgr_info(typefnoid, &(*param_flinfo)[i]);
4829 i++;
4830 }
4831
4832 /*
4833 * Prepare remote-parameter expressions for evaluation. (Note: in
4834 * practice, we expect that all these expressions will be just Params, so
4835 * we could possibly do something more efficient than using the full
4836 * expression-eval machinery for this. But probably there would be little
4837 * benefit, and it'd require postgres_fdw to know more than is desirable
4838 * about Param evaluation.)
4839 */
4840 *param_exprs = ExecInitExprList(fdw_exprs, node);
4841
4842 /* Allocate buffer for text form of query parameters. */
4843 *param_values = (const char **) palloc0(numParams * sizeof(char *));
4844}
4845
4846/*
4847 * Construct array of query parameter values in text format.
4848 */
4849static void
4851 FmgrInfo *param_flinfo,
4852 List *param_exprs,
4853 const char **param_values)
4854{
4855 int nestlevel;
4856 int i;
4857 ListCell *lc;
4858
4860
4861 i = 0;
4862 foreach(lc, param_exprs)
4863 {
4866 bool isNull;
4867
4868 /* Evaluate the parameter expression */
4869 expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4870
4871 /*
4872 * Get string representation of each parameter value by invoking
4873 * type-specific output function, unless the value is null.
4874 */
4875 if (isNull)
4876 param_values[i] = NULL;
4877 else
4878 param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4879
4880 i++;
4881 }
4882
4884}
4885
4886/*
4887 * postgresAnalyzeForeignTable
4888 * Test whether analyzing this foreign table is supported
4889 */
4890static bool
4894{
4897 PGconn *conn;
4898 StringInfoData sql;
4899 PGresult *res;
4900
4901 /* Return the row-analysis function pointer */
4903
4904 /*
4905 * Now we have to get the number of pages. It's annoying that the ANALYZE
4906 * API requires us to return that now, because it forces some duplication
4907 * of effort between this routine and postgresAcquireSampleRowsFunc. But
4908 * it's probably not worth redefining that API at this point.
4909 */
4910
4911 /*
4912 * Get the connection to use. We do the remote access as the table's
4913 * owner, even if the ANALYZE was started by some other user.
4914 */
4916 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4917 conn = GetConnection(user, false, NULL);
4918
4919 /*
4920 * Construct command to get page count for relation.
4921 */
4922 initStringInfo(&sql);
4923 deparseAnalyzeSizeSql(&sql, relation);
4924
4925 res = pgfdw_exec_query(conn, sql.data, NULL);
4926 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4927 pgfdw_report_error(res, conn, sql.data);
4928
4929 if (PQntuples(res) != 1 || PQnfields(res) != 1)
4930 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4931 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4932 PQclear(res);
4933
4935
4936 return true;
4937}
4938
4939/*
4940 * postgresGetAnalyzeInfoForForeignTable
4941 * Count tuples in foreign table (just get pg_class.reltuples).
4942 *
4943 * can_tablesample determines if the remote relation supports acquiring the
4944 * sample using TABLESAMPLE.
4945 */
4946static double
4948{
4951 PGconn *conn;
4952 StringInfoData sql;
4953 PGresult *res;
4954 double reltuples;
4955 char relkind;
4956
4957 /* assume the remote relation does not support TABLESAMPLE */
4958 *can_tablesample = false;
4959
4960 /*
4961 * Get the connection to use. We do the remote access as the table's
4962 * owner, even if the ANALYZE was started by some other user.
4963 */
4965 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4966 conn = GetConnection(user, false, NULL);
4967
4968 /*
4969 * Construct command to get page count for relation.
4970 */
4971 initStringInfo(&sql);
4972 deparseAnalyzeInfoSql(&sql, relation);
4973
4974 res = pgfdw_exec_query(conn, sql.data, NULL);
4975 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4976 pgfdw_report_error(res, conn, sql.data);
4977
4978 if (PQntuples(res) != 1 || PQnfields(res) != 2)
4979 elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
4980 reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
4981 relkind = *(PQgetvalue(res, 0, 1));
4982 PQclear(res);
4983
4985
4986 /* TABLESAMPLE is supported only for regular tables and matviews */
4987 *can_tablesample = (relkind == RELKIND_RELATION ||
4988 relkind == RELKIND_MATVIEW ||
4989 relkind == RELKIND_PARTITIONED_TABLE);
4990
4991 return reltuples;
4992}
4993
4994/*
4995 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4996 *
4997 * Selected rows are returned in the caller-allocated array rows[],
4998 * which must have at least targrows entries.
4999 * The actual number of rows selected is returned as the function result.
5000 * We also count the total number of rows in the table and return it into
5001 * *totalrows. Note that *totaldeadrows is always set to 0.
5002 *
5003 * Note that the returned list of rows is not always in order by physical
5004 * position in the table. Therefore, correlation estimates derived later
5005 * may be meaningless, but it's OK because we don't use the estimates
5006 * currently (the planner only pays attention to correlation for indexscans).
5007 */
5008static int
5010 HeapTuple *rows, int targrows,
5011 double *totalrows,
5012 double *totaldeadrows)
5013{
5014 PgFdwAnalyzeState astate;
5016 ForeignServer *server;
5018 PGconn *conn;
5020 PgFdwSamplingMethod method = ANALYZE_SAMPLE_AUTO; /* auto is default */
5021 double sample_frac = -1.0;
5022 double reltuples = -1.0;
5023 unsigned int cursor_number;
5024 StringInfoData sql;
5025 PGresult *res;
5026 char fetch_sql[64];
5027 int fetch_size;
5028 ListCell *lc;
5029
5030 /* Initialize workspace state */
5031 astate.rel = relation;
5033
5034 astate.rows = rows;
5035 astate.targrows = targrows;
5036 astate.numrows = 0;
5037 astate.samplerows = 0;
5038 astate.rowstoskip = -1; /* -1 means not set yet */
5039 reservoir_init_selection_state(&astate.rstate, targrows);
5040
5041 /* Remember ANALYZE context, and create a per-tuple temp context */
5044 "postgres_fdw temporary data",
5046
5047 /*
5048 * Get the connection to use. We do the remote access as the table's
5049 * owner, even if the ANALYZE was started by some other user.
5050 */
5052 server = GetForeignServer(table->serverid);
5053 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5054 conn = GetConnection(user, false, NULL);
5055
5056 /* We'll need server version, so fetch it now. */
5058
5059 /*
5060 * What sampling method should we use?
5061 */
5062 foreach(lc, server->options)
5063 {
5064 DefElem *def = (DefElem *) lfirst(lc);
5065
5066 if (strcmp(def->defname, "analyze_sampling") == 0)
5067 {
5068 char *value = defGetString(def);
5069
5070 if (strcmp(value, "off") == 0)
5071 method = ANALYZE_SAMPLE_OFF;
5072 else if (strcmp(value, "auto") == 0)
5073 method = ANALYZE_SAMPLE_AUTO;
5074 else if (strcmp(value, "random") == 0)
5075 method = ANALYZE_SAMPLE_RANDOM;
5076 else if (strcmp(value, "system") == 0)
5077 method = ANALYZE_SAMPLE_SYSTEM;
5078 else if (strcmp(value, "bernoulli") == 0)
5079 method = ANALYZE_SAMPLE_BERNOULLI;
5080
5081 break;
5082 }
5083 }
5084
5085 foreach(lc, table->options)
5086 {
5087 DefElem *def = (DefElem *) lfirst(lc);
5088
5089 if (strcmp(def->defname, "analyze_sampling") == 0)
5090 {
5091 char *value = defGetString(def);
5092
5093 if (strcmp(value, "off") == 0)
5094 method = ANALYZE_SAMPLE_OFF;
5095 else if (strcmp(value, "auto") == 0)
5096 method = ANALYZE_SAMPLE_AUTO;
5097 else if (strcmp(value, "random") == 0)
5098 method = ANALYZE_SAMPLE_RANDOM;
5099 else if (strcmp(value, "system") == 0)
5100 method = ANALYZE_SAMPLE_SYSTEM;
5101 else if (strcmp(value, "bernoulli") == 0)
5102 method = ANALYZE_SAMPLE_BERNOULLI;
5103
5104 break;
5105 }
5106 }
5107
5108 /*
5109 * Error-out if explicitly required one of the TABLESAMPLE methods, but
5110 * the server does not support it.
5111 */
5112 if ((server_version_num < 95000) &&
5113 (method == ANALYZE_SAMPLE_SYSTEM ||
5114 method == ANALYZE_SAMPLE_BERNOULLI))
5115 ereport(ERROR,
5117 errmsg("remote server does not support TABLESAMPLE feature")));
5118
5119 /*
5120 * If we've decided to do remote sampling, calculate the sampling rate. We
5121 * need to get the number of tuples from the remote server, but skip that
5122 * network round-trip if not needed.
5123 */
5124 if (method != ANALYZE_SAMPLE_OFF)
5125 {
5126 bool can_tablesample;
5127
5128 reltuples = postgresGetAnalyzeInfoForForeignTable(relation,
5130
5131 /*
5132 * Make sure we're not choosing TABLESAMPLE when the remote relation
5133 * does not support that. But only do this for "auto" - if the user
5134 * explicitly requested BERNOULLI/SYSTEM, it's better to fail.
5135 */
5136 if (!can_tablesample && (method == ANALYZE_SAMPLE_AUTO))
5137 method = ANALYZE_SAMPLE_RANDOM;
5138
5139 /*
5140 * Remote's reltuples could be 0 or -1 if the table has never been
5141 * vacuumed/analyzed. In that case, disable sampling after all.
5142 */
5143 if ((reltuples <= 0) || (targrows >= reltuples))
5144 method = ANALYZE_SAMPLE_OFF;
5145 else
5146 {
5147 /*
5148 * All supported sampling methods require sampling rate, not
5149 * target rows directly, so we calculate that using the remote
5150 * reltuples value. That's imperfect, because it might be off a
5151 * good deal, but that's not something we can (or should) address
5152 * here.
5153 *
5154 * If reltuples is too low (i.e. when table grew), we'll end up
5155 * sampling more rows - but then we'll apply the local sampling,
5156 * so we get the expected sample size. This is the same outcome as
5157 * without remote sampling.
5158 *
5159 * If reltuples is too high (e.g. after bulk DELETE), we will end
5160 * up sampling too few rows.
5161 *
5162 * We can't really do much better here - we could try sampling a
5163 * bit more rows, but we don't know how off the reltuples value is
5164 * so how much is "a bit more"?
5165 *
5166 * Furthermore, the targrows value for partitions is determined
5167 * based on table size (relpages), which can be off in different
5168 * ways too. Adjusting the sampling rate here might make the issue
5169 * worse.
5170 */
5171 sample_frac = targrows / reltuples;
5172
5173 /*
5174 * We should never get sampling rate outside the valid range
5175 * (between 0.0 and 1.0), because those cases should be covered by
5176 * the previous branch that sets ANALYZE_SAMPLE_OFF.
5177 */
5178 Assert(sample_frac >= 0.0 && sample_frac <= 1.0);
5179 }
5180 }
5181
5182 /*
5183 * For "auto" method, pick the one we believe is best. For servers with
5184 * TABLESAMPLE support we pick BERNOULLI, for old servers we fall-back to
5185 * random() to at least reduce network transfer.
5186 */
5187 if (method == ANALYZE_SAMPLE_AUTO)
5188 {
5189 if (server_version_num < 95000)
5190 method = ANALYZE_SAMPLE_RANDOM;
5191 else
5192 method = ANALYZE_SAMPLE_BERNOULLI;
5193 }
5194
5195 /*
5196 * Construct cursor that retrieves whole rows from remote.
5197 */
5199 initStringInfo(&sql);
5200 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
5201
5202 deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs);
5203
5204 res = pgfdw_exec_query(conn, sql.data, NULL);
5205 if (PQresultStatus(res) != PGRES_COMMAND_OK)
5206 pgfdw_report_error(res, conn, sql.data);
5207 PQclear(res);
5208
5209 /*
5210 * Determine the fetch size. The default is arbitrary, but shouldn't be
5211 * enormous.
5212 */
5213 fetch_size = 100;
5214 foreach(lc, server->options)
5215 {
5216 DefElem *def = (DefElem *) lfirst(lc);
5217
5218 if (strcmp(def->defname, "fetch_size") == 0)
5219 {
5221 break;
5222 }
5223 }
5224 foreach(lc, table->options)
5225 {
5226 DefElem *def = (DefElem *) lfirst(lc);
5227
5228 if (strcmp(def->defname, "fetch_size") == 0)
5229 {
5231 break;
5232 }
5233 }
5234
5235 /* Construct command to fetch rows from remote. */
5236 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
5238
5239 /* Retrieve and process rows a batch at a time. */
5240 for (;;)
5241 {
5242 int numrows;
5243 int i;
5244
5245 /* Allow users to cancel long query */
5247
5248 /*
5249 * XXX possible future improvement: if rowstoskip is large, we could
5250 * issue a MOVE rather than physically fetching the rows, then just
5251 * adjust rowstoskip and samplerows appropriately.
5252 */
5253
5254 /* Fetch some rows */
5256 /* On error, report the original query, not the FETCH. */
5257 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5258 pgfdw_report_error(res, conn, sql.data);
5259
5260 /* Process whatever we got. */
5261 numrows = PQntuples(res);
5262 for (i = 0; i < numrows; i++)
5263 analyze_row_processor(res, i, &astate);
5264
5265 PQclear(res);
5266
5267 /* Must be EOF if we didn't get all the rows requested. */
5268 if (numrows < fetch_size)
5269 break;
5270 }
5271
5272 /* Close the cursor, just to be tidy. */
5274
5276
5277 /* We assume that we have no dead tuple. */
5278 *totaldeadrows = 0.0;
5279
5280 /*
5281 * Without sampling, we've retrieved all living tuples from foreign
5282 * server, so report that as totalrows. Otherwise use the reltuples
5283 * estimate we got from the remote side.
5284 */
5285 if (method == ANALYZE_SAMPLE_OFF)
5286 *totalrows = astate.samplerows;
5287 else
5288 *totalrows = reltuples;
5289
5290 /*
5291 * Emit some interesting relation info
5292 */
5293 ereport(elevel,
5294 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
5295 RelationGetRelationName(relation),
5296 *totalrows, astate.numrows)));
5297
5298 return astate.numrows;
5299}
5300
5301/*
5302 * Collect sample rows from the result of query.
5303 * - Use all tuples in sample until target # of samples are collected.
5304 * - Subsequently, replace already-sampled tuples randomly.
5305 */
5306static void
5308{
5309 int targrows = astate->targrows;
5310 int pos; /* array index to store tuple in */
5311 MemoryContext oldcontext;
5312
5313 /* Always increment sample row counter. */
5314 astate->samplerows += 1;
5315
5316 /*
5317 * Determine the slot where this sample row should be stored. Set pos to
5318 * negative value to indicate the row should be skipped.
5319 */
5320 if (astate->numrows < targrows)
5321 {
5322 /* First targrows rows are always included into the sample */
5323 pos = astate->numrows++;
5324 }
5325 else
5326 {
5327 /*
5328 * Now we start replacing tuples in the sample until we reach the end
5329 * of the relation. Same algorithm as in acquire_sample_rows in
5330 * analyze.c; see Jeff Vitter's paper.
5331 */
5332 if (astate->rowstoskip < 0)
5333 astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
5334
5335 if (astate->rowstoskip <= 0)
5336 {
5337 /* Choose a random reservoir element to replace. */
5338 pos = (int) (targrows * sampler_random_fract(&astate->rstate.randstate));
5339 Assert(pos >= 0 && pos < targrows);
5340 heap_freetuple(astate->rows[pos]);
5341 }
5342 else
5343 {
5344 /* Skip this tuple. */
5345 pos = -1;
5346 }
5347
5348 astate->rowstoskip -= 1;
5349 }
5350
5351 if (pos >= 0)
5352 {
5353 /*
5354 * Create sample tuple from current result row, and store it in the
5355 * position determined above. The tuple has to be created in anl_cxt.
5356 */
5357 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
5358
5359 astate->rows[pos] = make_tuple_from_result_row(res, row,
5360 astate->rel,
5361 astate->attinmeta,
5362 astate->retrieved_attrs,
5363 NULL,
5364 astate->temp_cxt);
5365
5366 MemoryContextSwitchTo(oldcontext);
5367 }
5368}
5369
5370/*
5371 * Import a foreign schema
5372 */
5373static List *
5375{
5376 List *commands = NIL;
5377 bool import_collate = true;
5378 bool import_default = false;
5379 bool import_generated = true;
5380 bool import_not_null = true;
5381 ForeignServer *server;
5383 PGconn *conn;
5385 PGresult *res;
5386 int numrows,
5387 i;
5388 ListCell *lc;
5389
5390 /* Parse statement options */
5391 foreach(lc, stmt->options)
5392 {
5393 DefElem *def = (DefElem *) lfirst(lc);
5394
5395 if (strcmp(def->defname, "import_collate") == 0)
5397 else if (strcmp(def->defname, "import_default") == 0)
5399 else if (strcmp(def->defname, "import_generated") == 0)
5401 else if (strcmp(def->defname, "import_not_null") == 0)
5403 else
5404 ereport(ERROR,
5406 errmsg("invalid option \"%s\"", def->defname)));
5407 }
5408
5409 /*
5410 * Get connection to the foreign server. Connection manager will
5411 * establish new connection if necessary.
5412 */
5413 server = GetForeignServer(serverOid);
5415 conn = GetConnection(mapping, false, NULL);
5416
5417 /* Don't attempt to import collation if remote server hasn't got it */
5418 if (PQserverVersion(conn) < 90100)
5419 import_collate = false;
5420
5421 /* Create workspace for strings */
5423
5424 /* Check that the schema really exists */
5425 appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
5426 deparseStringLiteral(&buf, stmt->remote_schema);
5427
5428 res = pgfdw_exec_query(conn, buf.data, NULL);
5429 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5430 pgfdw_report_error(res, conn, buf.data);
5431
5432 if (PQntuples(res) != 1)
5433 ereport(ERROR,
5435 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
5436 stmt->remote_schema, server->servername)));
5437
5438 PQclear(res);
5440
5441 /*
5442 * Fetch all table data from this schema, possibly restricted by EXCEPT or
5443 * LIMIT TO. (We don't actually need to pay any attention to EXCEPT/LIMIT
5444 * TO here, because the core code will filter the statements we return
5445 * according to those lists anyway. But it should save a few cycles to
5446 * not process excluded tables in the first place.)
5447 *
5448 * Import table data for partitions only when they are explicitly
5449 * specified in LIMIT TO clause. Otherwise ignore them and only include
5450 * the definitions of the root partitioned tables to allow access to the
5451 * complete remote data set locally in the schema imported.
5452 *
5453 * Note: because we run the connection with search_path restricted to
5454 * pg_catalog, the format_type() and pg_get_expr() outputs will always
5455 * include a schema name for types/functions in other schemas, which is
5456 * what we want.
5457 */
5459 "SELECT relname, "
5460 " attname, "
5461 " format_type(atttypid, atttypmod), "
5462 " attnotnull, "
5463 " pg_get_expr(adbin, adrelid), ");
5464
5465 /* Generated columns are supported since Postgres 12 */
5466 if (PQserverVersion(conn) >= 120000)
5468 " attgenerated, ");
5469 else
5471 " NULL, ");
5472
5473 if (import_collate)
5475 " collname, "
5476 " collnsp.nspname ");
5477 else
5479 " NULL, NULL ");
5480
5482 "FROM pg_class c "
5483 " JOIN pg_namespace n ON "
5484 " relnamespace = n.oid "
5485 " LEFT JOIN pg_attribute a ON "
5486 " attrelid = c.oid AND attnum > 0 "
5487 " AND NOT attisdropped "
5488 " LEFT JOIN pg_attrdef ad ON "
5489 " adrelid = c.oid AND adnum = attnum ");
5490
5491 if (import_collate)
5493 " LEFT JOIN pg_collation coll ON "
5494 " coll.oid = attcollation "
5495 " LEFT JOIN pg_namespace collnsp ON "
5496 " collnsp.oid = collnamespace ");
5497
5499 "WHERE c.relkind IN ("
5505 " AND n.nspname = ");
5506 deparseStringLiteral(&buf, stmt->remote_schema);
5507
5508 /* Partitions are supported since Postgres 10 */
5509 if (PQserverVersion(conn) >= 100000 &&
5510 stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO)
5511 appendStringInfoString(&buf, " AND NOT c.relispartition ");
5512
5513 /* Apply restrictions for LIMIT TO and EXCEPT */
5514 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
5515 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
5516 {
5517 bool first_item = true;
5518
5519 appendStringInfoString(&buf, " AND c.relname ");
5520 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
5521 appendStringInfoString(&buf, "NOT ");
5522 appendStringInfoString(&buf, "IN (");
5523
5524 /* Append list of table names within IN clause */
5525 foreach(lc, stmt->table_list)
5526 {
5527 RangeVar *rv = (RangeVar *) lfirst(lc);
5528
5529 if (first_item)
5530 first_item = false;
5531 else
5534 }
5536 }
5537
5538 /* Append ORDER BY at the end of query to ensure output ordering */
5539 appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
5540
5541 /* Fetch the data */
5542 res = pgfdw_exec_query(conn, buf.data, NULL);
5543 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5544 pgfdw_report_error(res, conn, buf.data);
5545
5546 /* Process results */
5547 numrows = PQntuples(res);
5548 /* note: incrementation of i happens in inner loop's while() test */
5549 for (i = 0; i < numrows;)
5550 {
5551 char *tablename = PQgetvalue(res, i, 0);
5552 bool first_item = true;
5553
5555 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
5556 quote_identifier(tablename));
5557
5558 /* Scan all rows for this table */
5559 do
5560 {
5561 char *attname;
5562 char *typename;
5563 char *attnotnull;
5564 char *attgenerated;
5565 char *attdefault;
5566 char *collname;
5567 char *collnamespace;
5568
5569 /* If table has no columns, we'll see nulls here */
5570 if (PQgetisnull(res, i, 1))
5571 continue;
5572
5573 attname = PQgetvalue(res, i, 1);
5574 typename = PQgetvalue(res, i, 2);
5575 attnotnull = PQgetvalue(res, i, 3);
5576 attdefault = PQgetisnull(res, i, 4) ? NULL :
5577 PQgetvalue(res, i, 4);
5578 attgenerated = PQgetisnull(res, i, 5) ? NULL :
5579 PQgetvalue(res, i, 5);
5580 collname = PQgetisnull(res, i, 6) ? NULL :
5581 PQgetvalue(res, i, 6);
5582 collnamespace = PQgetisnull(res, i, 7) ? NULL :
5583 PQgetvalue(res, i, 7);
5584
5585 if (first_item)
5586 first_item = false;
5587 else
5588 appendStringInfoString(&buf, ",\n");
5589
5590 /* Print column name and type */
5591 appendStringInfo(&buf, " %s %s",
5593 typename);
5594
5595 /*
5596 * Add column_name option so that renaming the foreign table's
5597 * column doesn't break the association to the underlying column.
5598 */
5599 appendStringInfoString(&buf, " OPTIONS (column_name ");
5602
5603 /* Add COLLATE if needed */
5604 if (import_collate && collname != NULL && collnamespace != NULL)
5605 appendStringInfo(&buf, " COLLATE %s.%s",
5607 quote_identifier(collname));
5608
5609 /* Add DEFAULT if needed */
5610 if (import_default && attdefault != NULL &&
5611 (!attgenerated || !attgenerated[0]))
5612 appendStringInfo(&buf, " DEFAULT %s", attdefault);
5613
5614 /* Add GENERATED if needed */
5615 if (import_generated && attgenerated != NULL &&
5616 attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
5617 {
5620 " GENERATED ALWAYS AS (%s) STORED",
5621 attdefault);
5622 }
5623
5624 /* Add NOT NULL if needed */
5625 if (import_not_null && attnotnull[0] == 't')
5626 appendStringInfoString(&buf, " NOT NULL");
5627 }
5628 while (++i < numrows &&
5629 strcmp(PQgetvalue(res, i, 0), tablename) == 0);
5630
5631 /*
5632 * Add server name and table-level options. We specify remote schema
5633 * and table name as options (the latter to ensure that renaming the
5634 * foreign table doesn't break the association).
5635 */
5636 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
5637 quote_identifier(server->servername));
5638
5639 appendStringInfoString(&buf, "schema_name ");
5640 deparseStringLiteral(&buf, stmt->remote_schema);
5641 appendStringInfoString(&buf, ", table_name ");
5642 deparseStringLiteral(&buf, tablename);
5643
5645
5646 commands = lappend(commands, pstrdup(buf.data));
5647 }
5648 PQclear(res);
5649
5651
5652 return commands;
5653}
5654
5655/*
5656 * Check if reltarget is safe enough to push down semi-join. Reltarget is not
5657 * safe, if it contains references to inner rel relids, which do not belong to
5658 * outer rel.
5659 */
5660static bool
5662{
5663 List *vars;
5664 ListCell *lc;
5665 bool ok = true;
5666
5667 Assert(joinrel->reltarget);
5668
5670
5671 foreach(lc, vars)
5672 {
5673 Var *var = (Var *) lfirst(lc);
5674
5675 if (!IsA(var, Var))
5676 continue;
5677
5678 if (bms_is_member(var->varno, innerrel->relids))
5679 {
5680 /*
5681 * The planner can create semi-join, which refers to inner rel
5682 * vars in its target list. However, we deparse semi-join as an
5683 * exists() subquery, so can't handle references to inner rel in
5684 * the target list.
5685 */
5686 Assert(!bms_is_member(var->varno, outerrel->relids));
5687 ok = false;
5688 break;
5689 }
5690 }
5691 return ok;
5692}
5693
5694/*
5695 * Assess whether the join between inner and outer relations can be pushed down
5696 * to the foreign server. As a side effect, save information we obtain in this
5697 * function to PgFdwRelationInfo passed in.
5698 */
5699static bool
5701 RelOptInfo *outerrel, RelOptInfo *innerrel,
5702 JoinPathExtraData *extra)
5703{
5707 ListCell *lc;
5708 List *joinclauses;
5709
5710 /*
5711 * We support pushing down INNER, LEFT, RIGHT, FULL OUTER and SEMI joins.
5712 * Constructing queries representing ANTI joins is hard, hence not
5713 * considered right now.
5714 */
5715 if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
5716 jointype != JOIN_RIGHT && jointype != JOIN_FULL &&
5717 jointype != JOIN_SEMI)
5718 return false;
5719
5720 /*
5721 * We can't push down semi-join if its reltarget is not safe
5722 */
5723 if ((jointype == JOIN_SEMI) && !semijoin_target_ok(root, joinrel, outerrel, innerrel))
5724 return false;
5725
5726 /*
5727 * If either of the joining relations is marked as unsafe to pushdown, the
5728 * join can not be pushed down.
5729 */
5730 fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
5731 fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
5732 fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
5733 if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
5734 !fpinfo_i || !fpinfo_i->pushdown_safe)
5735 return false;
5736
5737 /*
5738 * If joining relations have local conditions, those conditions are
5739 * required to be applied before joining the relations. Hence the join can
5740 * not be pushed down.
5741 */
5742 if (fpinfo_o->local_conds || fpinfo_i->local_conds)
5743 return false;
5744
5745 /*
5746 * Merge FDW options. We might be tempted to do this after we have deemed
5747 * the foreign join to be OK. But we must do this beforehand so that we
5748 * know which quals can be evaluated on the foreign server, which might
5749 * depend on shippable_extensions.
5750 */
5751 fpinfo->server = fpinfo_o->server;
5753
5754 /*
5755 * Separate restrict list into join quals and pushed-down (other) quals.
5756 *
5757 * Join quals belonging to an outer join must all be shippable, else we
5758 * cannot execute the join remotely. Add such quals to 'joinclauses'.
5759 *
5760 * Add other quals to fpinfo->remote_conds if they are shippable, else to
5761 * fpinfo->local_conds. In an inner join it's okay to execute conditions
5762 * either locally or remotely; the same is true for pushed-down conditions
5763 * at an outer join.
5764 *
5765 * Note we might return failure after having already scribbled on
5766 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
5767 * won't consult those lists again if we deem the join unshippable.
5768 */
5769 joinclauses = NIL;
5770 foreach(lc, extra->restrictlist)
5771 {
5773 bool is_remote_clause = is_foreign_expr(root, joinrel,
5774 rinfo->clause);
5775
5776 if (IS_OUTER_JOIN(jointype) &&
5777 !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
5778 {
5779 if (!is_remote_clause)
5780 return false;
5781 joinclauses = lappend(joinclauses, rinfo);
5782 }
5783 else
5784 {
5785 if (is_remote_clause)
5786 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5787 else
5788 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5789 }
5790 }
5791
5792 /*
5793 * deparseExplicitTargetList() isn't smart enough to handle anything other
5794 * than a Var. In particular, if there's some PlaceHolderVar that would
5795 * need to be evaluated within this join tree (because there's an upper
5796 * reference to a quantity that may go to NULL as a result of an outer
5797 * join), then we can't try to push the join down because we'll fail when
5798 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
5799 * needs to be evaluated *at the top* of this join tree is OK, because we
5800 * can do that locally after fetching the results from the remote side.
5801 */
5802 foreach(lc, root->placeholder_list)
5803 {
5805 Relids relids;
5806
5807 /* PlaceHolderInfo refers to parent relids, not child relids. */
5808 relids = IS_OTHER_REL(joinrel) ?
5809 joinrel->top_parent_relids : joinrel->relids;
5810
5811 if (bms_is_subset(phinfo->ph_eval_at, relids) &&
5812 bms_nonempty_difference(relids, phinfo->ph_eval_at))
5813 return false;
5814 }
5815
5816 /* Save the join clauses, for later use. */
5817 fpinfo->joinclauses = joinclauses;
5818
5819 fpinfo->outerrel = outerrel;
5820 fpinfo->innerrel = innerrel;
5821 fpinfo->jointype = jointype;
5822
5823 /*
5824 * By default, both the input relations are not required to be deparsed as
5825 * subqueries, but there might be some relations covered by the input
5826 * relations that are required to be deparsed as subqueries, so save the
5827 * relids of those relations for later use by the deparser.
5828 */
5829 fpinfo->make_outerrel_subquery = false;
5830 fpinfo->make_innerrel_subquery = false;
5831 Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
5832 Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
5833 fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
5834 fpinfo_i->lower_subquery_rels);
5835 fpinfo->hidden_subquery_rels = bms_union(fpinfo_o->hidden_subquery_rels,
5836 fpinfo_i->hidden_subquery_rels);
5837
5838 /*
5839 * Pull the other remote conditions from the joining relations into join
5840 * clauses or other remote clauses (remote_conds) of this relation
5841 * wherever possible. This avoids building subqueries at every join step.
5842 *
5843 * For an inner join, clauses from both the relations are added to the
5844 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5845 * the outer side are added to remote_conds since those can be evaluated
5846 * after the join is evaluated. The clauses from inner side are added to
5847 * the joinclauses, since they need to be evaluated while constructing the
5848 * join.
5849 *
5850 * For SEMI-JOIN clauses from inner relation can not be added to
5851 * remote_conds, but should be treated as join clauses (as they are
5852 * deparsed to EXISTS subquery, where inner relation can be referred). A
5853 * list of relation ids, which can't be referred to from higher levels, is
5854 * preserved as a hidden_subquery_rels list.
5855 *
5856 * For a FULL OUTER JOIN, the other clauses from either relation can not
5857 * be added to the joinclauses or remote_conds, since each relation acts
5858 * as an outer relation for the other.
5859 *
5860 * The joining sides can not have local conditions, thus no need to test
5861 * shippability of the clauses being pulled up.
5862 */
5863 switch (jointype)
5864 {
5865 case JOIN_INNER:
5866 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5867 fpinfo_i->remote_conds);
5868 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5869 fpinfo_o->remote_conds);
5870 break;
5871
5872 case JOIN_LEFT:
5873
5874 /*
5875 * When semi-join is involved in the inner or outer part of the
5876 * left join, it's deparsed as a subquery, and we can't refer to
5877 * its vars on the upper level.
5878 */
5879 if (bms_is_empty(fpinfo_i->hidden_subquery_rels))
5880 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5881 fpinfo_i->remote_conds);
5882 if (bms_is_empty(fpinfo_o->hidden_subquery_rels))
5883 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5884 fpinfo_o->remote_conds);
5885 break;
5886
5887 case JOIN_RIGHT:
5888
5889 /*
5890 * When semi-join is involved in the inner or outer part of the
5891 * right join, it's deparsed as a subquery, and we can't refer to
5892 * its vars on the upper level.
5893 */
5894 if (bms_is_empty(fpinfo_o->hidden_subquery_rels))
5895 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5896 fpinfo_o->remote_conds);
5897 if (bms_is_empty(fpinfo_i->hidden_subquery_rels))
5898 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5899 fpinfo_i->remote_conds);
5900 break;
5901
5902 case JOIN_SEMI:
5903 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5904 fpinfo_i->remote_conds);
5905 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5906 fpinfo->remote_conds);
5907 fpinfo->remote_conds = list_copy(fpinfo_o->remote_conds);
5908 fpinfo->hidden_subquery_rels = bms_union(fpinfo->hidden_subquery_rels,
5909 innerrel->relids);
5910 break;
5911
5912 case JOIN_FULL:
5913
5914 /*
5915 * In this case, if any of the input relations has conditions, we
5916 * need to deparse that relation as a subquery so that the
5917 * conditions can be evaluated before the join. Remember it in
5918 * the fpinfo of this relation so that the deparser can take
5919 * appropriate action. Also, save the relids of base relations
5920 * covered by that relation for later use by the deparser.
5921 */
5922 if (fpinfo_o->remote_conds)
5923 {
5924 fpinfo->make_outerrel_subquery = true;
5925 fpinfo->lower_subquery_rels =
5926 bms_add_members(fpinfo->lower_subquery_rels,
5927 outerrel->relids);
5928 }
5929 if (fpinfo_i->remote_conds)
5930 {
5931 fpinfo->make_innerrel_subquery = true;
5932 fpinfo->lower_subquery_rels =
5933 bms_add_members(fpinfo->lower_subquery_rels,
5934 innerrel->relids);
5935 }
5936 break;
5937
5938 default:
5939 /* Should not happen, we have just checked this above */
5940 elog(ERROR, "unsupported join type %d", jointype);
5941 }
5942
5943 /*
5944 * For an inner join, all restrictions can be treated alike. Treating the
5945 * pushed down conditions as join conditions allows a top level full outer
5946 * join to be deparsed without requiring subqueries.
5947 */
5948 if (jointype == JOIN_INNER)
5949 {
5950 Assert(!fpinfo->joinclauses);
5951 fpinfo->joinclauses = fpinfo->remote_conds;
5952 fpinfo->remote_conds = NIL;
5953 }
5954 else if (jointype == JOIN_LEFT || jointype == JOIN_RIGHT || jointype == JOIN_FULL)
5955 {
5956 /*
5957 * Conditions, generated from semi-joins, should be evaluated before
5958 * LEFT/RIGHT/FULL join.
5959 */
5960 if (!bms_is_empty(fpinfo_o->hidden_subquery_rels))
5961 {
5962 fpinfo->make_outerrel_subquery = true;
5963 fpinfo->lower_subquery_rels = bms_add_members(fpinfo->lower_subquery_rels, outerrel->relids);
5964 }
5965
5966 if (!bms_is_empty(fpinfo_i->hidden_subquery_rels))
5967 {
5968 fpinfo->make_innerrel_subquery = true;
5969 fpinfo->lower_subquery_rels = bms_add_members(fpinfo->lower_subquery_rels, innerrel->relids);
5970 }
5971 }
5972
5973 /* Mark that this join can be pushed down safely */
5974 fpinfo->pushdown_safe = true;
5975
5976 /* Get user mapping */
5977 if (fpinfo->use_remote_estimate)
5978 {
5979 if (fpinfo_o->use_remote_estimate)
5980 fpinfo->user = fpinfo_o->user;
5981 else
5982 fpinfo->user = fpinfo_i->user;
5983 }
5984 else
5985 fpinfo->user = NULL;
5986
5987 /*
5988 * Set # of retrieved rows and cached relation costs to some negative
5989 * value, so that we can detect when they are set to some sensible values,
5990 * during one (usually the first) of the calls to estimate_path_cost_size.
5991 */
5992 fpinfo->retrieved_rows = -1;
5993 fpinfo->rel_startup_cost = -1;
5994 fpinfo->rel_total_cost = -1;
5995
5996 /*
5997 * Set the string describing this join relation to be used in EXPLAIN
5998 * output of corresponding ForeignScan. Note that the decoration we add
5999 * to the base relation names mustn't include any digits, or it'll confuse
6000 * postgresExplainForeignScan.
6001 */
6002 fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)",
6003 fpinfo_o->relation_name,
6004 get_jointype_name(fpinfo->jointype),
6005 fpinfo_i->relation_name);
6006
6007 /*
6008 * Set the relation index. This is defined as the position of this
6009 * joinrel in the join_rel_list list plus the length of the rtable list.
6010 * Note that since this joinrel is at the end of the join_rel_list list
6011 * when we are called, we can get the position by list_length.
6012 */
6013 Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
6014 fpinfo->relation_index =
6015 list_length(root->parse->rtable) + list_length(root->join_rel_list);
6016
6017 return true;
6018}
6019
6020static void
6022 Path *epq_path, List *restrictlist)
6023{
6024 List *useful_pathkeys_list = NIL; /* List of all pathkeys */
6025 ListCell *lc;
6026
6028
6029 /*
6030 * Before creating sorted paths, arrange for the passed-in EPQ path, if
6031 * any, to return columns needed by the parent ForeignScan node so that
6032 * they will propagate up through Sort nodes injected below, if necessary.
6033 */
6035 {
6036 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
6037 PathTarget *target = copy_pathtarget(epq_path->pathtarget);
6038
6039 /* Include columns required for evaluating PHVs in the tlist. */
6041 pull_var_clause((Node *) target->exprs,
6043
6044 /* Include columns required for evaluating the local conditions. */
6045 foreach(lc, fpinfo->local_conds)
6046 {
6048
6050 pull_var_clause((Node *) rinfo->clause,
6052 }
6053
6054 /*
6055 * If we have added any new columns, adjust the tlist of the EPQ path.
6056 *
6057 * Note: the plan created using this path will only be used to execute
6058 * EPQ checks, where accuracy of the plan cost and width estimates
6059 * would not be important, so we do not do set_pathtarget_cost_width()
6060 * for the new pathtarget here. See also postgresGetForeignPlan().
6061 */
6062 if (list_length(target->exprs) > list_length(epq_path->pathtarget->exprs))
6063 {
6064 /* The EPQ path is a join path, so it is projection-capable. */
6066
6067 /*
6068 * Use create_projection_path() here, so as to avoid modifying it
6069 * in place.
6070 */
6072 rel,
6073 epq_path,
6074 target);
6075 }
6076 }
6077
6078 /* Create one path for each set of pathkeys we found above. */
6079 foreach(lc, useful_pathkeys_list)
6080 {
6081 double rows;
6082 int width;
6083 int disabled_nodes;
6084 Cost startup_cost;
6085 Cost total_cost;
6088
6090 &rows, &width, &disabled_nodes,
6091 &startup_cost, &total_cost);
6092
6093 /*
6094 * The EPQ path must be at least as well sorted as the path itself, in
6095 * case it gets used as input to a mergejoin.
6096 */
6098 if (sorted_epq_path != NULL &&
6100 sorted_epq_path->pathkeys))
6101 sorted_epq_path = (Path *)
6103 rel,
6106 -1.0);
6107
6108 if (IS_SIMPLE_REL(rel))
6109 add_path(rel, (Path *)
6111 NULL,
6112 rows,
6113 disabled_nodes,
6114 startup_cost,
6115 total_cost,
6117 rel->lateral_relids,
6119 NIL, /* no fdw_restrictinfo
6120 * list */
6121 NIL));
6122 else
6123 add_path(rel, (Path *)
6125 NULL,
6126 rows,
6127 disabled_nodes,
6128 startup_cost,
6129 total_cost,
6131 rel->lateral_relids,
6133 restrictlist,
6134 NIL));
6135 }
6136}
6137
6138/*
6139 * Parse options from foreign server and apply them to fpinfo.
6140 *
6141 * New options might also require tweaking merge_fdw_options().
6142 */
6143static void
6145{
6146 ListCell *lc;
6147
6148 foreach(lc, fpinfo->server->options)
6149 {
6150 DefElem *def = (DefElem *) lfirst(lc);
6151
6152 if (strcmp(def->defname, "use_remote_estimate") == 0)
6153 fpinfo->use_remote_estimate = defGetBoolean(def);
6154 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
6155 (void) parse_real(defGetString(def), &fpinfo->fdw_startup_cost, 0,
6156 NULL);
6157 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
6158 (void) parse_real(defGetString(def), &fpinfo->fdw_tuple_cost, 0,
6159 NULL);
6160 else if (strcmp(def->defname, "extensions") == 0)
6161 fpinfo->shippable_extensions =
6163 else if (strcmp(def->defname, "fetch_size") == 0)
6164 (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
6165 else if (strcmp(def->defname, "async_capable") == 0)
6166 fpinfo->async_capable = defGetBoolean(def);
6167 }
6168}
6169
6170/*
6171 * Parse options from foreign table and apply them to fpinfo.
6172 *
6173 * New options might also require tweaking merge_fdw_options().
6174 */
6175static void
6177{
6178 ListCell *lc;
6179
6180 foreach(lc, fpinfo->table->options)
6181 {
6182 DefElem *def = (DefElem *) lfirst(lc);
6183
6184 if (strcmp(def->defname, "use_remote_estimate") == 0)
6185 fpinfo->use_remote_estimate = defGetBoolean(def);
6186 else if (strcmp(def->defname, "fetch_size") == 0)
6187 (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
6188 else if (strcmp(def->defname, "async_capable") == 0)
6189 fpinfo->async_capable = defGetBoolean(def);
6190 }
6191}
6192
6193/*
6194 * Merge FDW options from input relations into a new set of options for a join
6195 * or an upper rel.
6196 *
6197 * For a join relation, FDW-specific information about the inner and outer
6198 * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
6199 * fpinfo_o provides the information for the input relation; fpinfo_i is
6200 * expected to NULL.
6201 */
6202static void
6206{
6207 /* We must always have fpinfo_o. */
6209
6210 /* fpinfo_i may be NULL, but if present the servers must both match. */
6211 Assert(!fpinfo_i ||
6212 fpinfo_i->server->serverid == fpinfo_o->server->serverid);
6213
6214 /*
6215 * Copy the server specific FDW options. (For a join, both relations come
6216 * from the same server, so the server options should have the same value
6217 * for both relations.)
6218 */
6219 fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
6220 fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
6221 fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
6222 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
6223 fpinfo->fetch_size = fpinfo_o->fetch_size;
6224 fpinfo->async_capable = fpinfo_o->async_capable;
6225
6226 /* Merge the table level options from either side of the join. */
6227 if (fpinfo_i)
6228 {
6229 /*
6230 * We'll prefer to use remote estimates for this join if any table
6231 * from either side of the join is using remote estimates. This is
6232 * most likely going to be preferred since they're already willing to
6233 * pay the price of a round trip to get the remote EXPLAIN. In any
6234 * case it's not entirely clear how we might otherwise handle this
6235 * best.
6236 */
6237 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
6238 fpinfo_i->use_remote_estimate;
6239
6240 /*
6241 * Set fetch size to maximum of the joining sides, since we are
6242 * expecting the rows returned by the join to be proportional to the
6243 * relation sizes.
6244 */
6245 fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
6246
6247 /*
6248 * We'll prefer to consider this join async-capable if any table from
6249 * either side of the join is considered async-capable. This would be
6250 * reasonable because in that case the foreign server would have its
6251 * own resources to scan that table asynchronously, and the join could
6252 * also be computed asynchronously using the resources.
6253 */
6254 fpinfo->async_capable = fpinfo_o->async_capable ||
6255 fpinfo_i->async_capable;
6256 }
6257}
6258
6259/*
6260 * postgresGetForeignJoinPaths
6261 * Add possible ForeignPath to joinrel, if join is safe to push down.
6262 */
6263static void
6265 RelOptInfo *joinrel,
6266 RelOptInfo *outerrel,
6267 RelOptInfo *innerrel,
6268 JoinType jointype,
6269 JoinPathExtraData *extra)
6270{
6273 double rows;
6274 int width;
6275 int disabled_nodes;
6276 Cost startup_cost;
6277 Cost total_cost;
6278 Path *epq_path; /* Path to create plan to be executed when
6279 * EvalPlanQual gets triggered. */
6280
6281 /*
6282 * Skip if this join combination has been considered already.
6283 */
6284 if (joinrel->fdw_private)
6285 return;
6286
6287 /*
6288 * This code does not work for joins with lateral references, since those
6289 * must have parameterized paths, which we don't generate yet.
6290 */
6291 if (!bms_is_empty(joinrel->lateral_relids))
6292 return;
6293
6294 /*
6295 * Create unfinished PgFdwRelationInfo entry which is used to indicate
6296 * that the join relation is already considered, so that we won't waste
6297 * time in judging safety of join pushdown and adding the same paths again
6298 * if found safe. Once we know that this join can be pushed down, we fill
6299 * the entry.
6300 */
6302 fpinfo->pushdown_safe = false;
6303 joinrel->fdw_private = fpinfo;
6304 /* attrs_used is only for base relations. */
6305 fpinfo->attrs_used = NULL;
6306
6307 /*
6308 * If there is a possibility that EvalPlanQual will be executed, we need
6309 * to be able to reconstruct the row using scans of the base relations.
6310 * GetExistingLocalJoinPath will find a suitable path for this purpose in
6311 * the path list of the joinrel, if one exists. We must be careful to
6312 * call it before adding any ForeignPath, since the ForeignPath might
6313 * dominate the only suitable local path available. We also do it before
6314 * calling foreign_join_ok(), since that function updates fpinfo and marks
6315 * it as pushable if the join is found to be pushable.
6316 */
6317 if (root->parse->commandType == CMD_DELETE ||
6318 root->parse->commandType == CMD_UPDATE ||
6319 root->rowMarks)
6320 {
6322 if (!epq_path)
6323 {
6324 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
6325 return;
6326 }
6327 }
6328 else
6329 epq_path = NULL;
6330
6331 if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
6332 {
6333 /* Free path required for EPQ if we copied one; we don't need it now */
6334 if (epq_path)
6335 pfree(epq_path);
6336 return;
6337 }
6338
6339 /*
6340 * Compute the selectivity and cost of the local_conds, so we don't have
6341 * to do it over again for each path. The best we can do for these
6342 * conditions is to estimate selectivity on the basis of local statistics.
6343 * The local conditions are applied after the join has been computed on
6344 * the remote side like quals in WHERE clause, so pass jointype as
6345 * JOIN_INNER.
6346 */
6347 fpinfo->local_conds_sel = clauselist_selectivity(root,
6348 fpinfo->local_conds,
6349 0,
6350 JOIN_INNER,
6351 NULL);
6352 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
6353
6354 /*
6355 * If we are going to estimate costs locally, estimate the join clause
6356 * selectivity here while we have special join info.
6357 */
6358 if (!fpinfo->use_remote_estimate)
6359 fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
6360 0, fpinfo->jointype,
6361 extra->sjinfo);
6362
6363 /* Estimate costs for bare join relation */
6365 &rows, &width, &disabled_nodes,
6366 &startup_cost, &total_cost);
6367 /* Now update this information in the joinrel */
6368 joinrel->rows = rows;
6369 joinrel->reltarget->width = width;
6370 fpinfo->rows = rows;
6371 fpinfo->width = width;
6372 fpinfo->disabled_nodes = disabled_nodes;
6373 fpinfo->startup_cost = startup_cost;
6374 fpinfo->total_cost = total_cost;
6375
6376 /*
6377 * Create a new join path and add it to the joinrel which represents a
6378 * join between foreign tables.
6379 */
6381 joinrel,
6382 NULL, /* default pathtarget */
6383 rows,
6384 disabled_nodes,
6385 startup_cost,
6386 total_cost,
6387 NIL, /* no pathkeys */
6388 joinrel->lateral_relids,
6389 epq_path,
6390 extra->restrictlist,
6391 NIL); /* no fdw_private */
6392
6393 /* Add generated path into joinrel by add_path(). */
6394 add_path(joinrel, (Path *) joinpath);
6395
6396 /* Consider pathkeys for the join relation */
6398 extra->restrictlist);
6399
6400 /* XXX Consider parameterized paths for the join relation */
6401}
6402
6403/*
6404 * Assess whether the aggregation, grouping and having operations can be pushed
6405 * down to the foreign server. As a side effect, save information we obtain in
6406 * this function to PgFdwRelationInfo of the input relation.
6407 */
6408static bool
6410 Node *havingQual)
6411{
6412 Query *query = root->parse;
6413 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
6414 PathTarget *grouping_target = grouped_rel->reltarget;
6416 ListCell *lc;
6417 int i;
6418 List *tlist = NIL;
6419
6420 /* We currently don't support pushing Grouping Sets. */
6421 if (query->groupingSets)
6422 return false;
6423
6424 /* Get the fpinfo of the underlying scan relation. */
6425 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
6426
6427 /*
6428 * If underlying scan relation has any local conditions, those conditions
6429 * are required to be applied before performing aggregation. Hence the
6430 * aggregate cannot be pushed down.
6431 */
6432 if (ofpinfo->local_conds)
6433 return false;
6434
6435 /*
6436 * Examine grouping expressions, as well as other expressions we'd need to
6437 * compute, and check whether they are safe to push down to the foreign
6438 * server. All GROUP BY expressions will be part of the grouping target
6439 * and thus there is no need to search for them separately. Add grouping
6440 * expressions into target list which will be passed to foreign server.
6441 *
6442 * A tricky fine point is that we must not put any expression into the
6443 * target list that is just a foreign param (that is, something that
6444 * deparse.c would conclude has to be sent to the foreign server). If we
6445 * do, the expression will also appear in the fdw_exprs list of the plan
6446 * node, and setrefs.c will get confused and decide that the fdw_exprs
6447 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
6448 * a broken plan. Somewhat oddly, it's OK if the expression contains such
6449 * a node, as long as it's not at top level; then no match is possible.
6450 */
6451 i = 0;
6452 foreach(lc, grouping_target->exprs)
6453 {
6454 Expr *expr = (Expr *) lfirst(lc);
6456 ListCell *l;
6457
6458 /*
6459 * Check whether this expression is part of GROUP BY clause. Note we
6460 * check the whole GROUP BY clause not just processed_groupClause,
6461 * because we will ship all of it, cf. appendGroupByClause.
6462 */
6464 {
6466
6467 /*
6468 * If any GROUP BY expression is not shippable, then we cannot
6469 * push down aggregation to the foreign server.
6470 */
6471 if (!is_foreign_expr(root, grouped_rel, expr))
6472 return false;
6473
6474 /*
6475 * If it would be a foreign param, we can't put it into the tlist,
6476 * so we have to fail.
6477 */
6478 if (is_foreign_param(root, grouped_rel, expr))
6479 return false;
6480
6481 /*
6482 * Pushable, so add to tlist. We need to create a TLE for this
6483 * expression and apply the sortgroupref to it. We cannot use
6484 * add_to_flat_tlist() here because that avoids making duplicate
6485 * entries in the tlist. If there are duplicate entries with
6486 * distinct sortgrouprefs, we have to duplicate that situation in
6487 * the output tlist.
6488 */
6489 tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
6490 tle->ressortgroupref = sgref;
6491 tlist = lappend(tlist, tle);
6492 }
6493 else
6494 {
6495 /*
6496 * Non-grouping expression we need to compute. Can we ship it
6497 * as-is to the foreign server?
6498 */
6499 if (is_foreign_expr(root, grouped_rel, expr) &&
6500 !is_foreign_param(root, grouped_rel, expr))
6501 {
6502 /* Yes, so add to tlist as-is; OK to suppress duplicates */
6503 tlist = add_to_flat_tlist(tlist, list_make1(expr));
6504 }
6505 else
6506 {
6507 /* Not pushable as a whole; extract its Vars and aggregates */
6508 List *aggvars;
6509
6510 aggvars = pull_var_clause((Node *) expr,
6512
6513 /*
6514 * If any aggregate expression is not shippable, then we
6515 * cannot push down aggregation to the foreign server. (We
6516 * don't have to check is_foreign_param, since that certainly
6517 * won't return true for any such expression.)
6518 */
6519 if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
6520 return false;
6521
6522 /*
6523 * Add aggregates, if any, into the targetlist. Plain Vars
6524 * outside an aggregate can be ignored, because they should be
6525 * either same as some GROUP BY column or part of some GROUP
6526 * BY expression. In either case, they are already part of
6527 * the targetlist and thus no need to add them again. In fact
6528 * including plain Vars in the tlist when they do not match a
6529 * GROUP BY column would cause the foreign server to complain
6530 * that the shipped query is invalid.
6531 */
6532 foreach(l, aggvars)
6533 {
6534 Expr *aggref = (Expr *) lfirst(l);
6535
6536 if (IsA(aggref, Aggref))
6537 tlist = add_to_flat_tlist(tlist, list_make1(aggref));
6538 }
6539 }
6540 }
6541
6542 i++;
6543 }
6544
6545 /*
6546 * Classify the pushable and non-pushable HAVING clauses and save them in
6547 * remote_conds and local_conds of the grouped rel's fpinfo.
6548 */
6549 if (havingQual)
6550 {
6551 foreach(lc, (List *) havingQual)
6552 {
6553 Expr *expr = (Expr *) lfirst(lc);
6554 RestrictInfo *rinfo;
6555
6556 /*
6557 * Currently, the core code doesn't wrap havingQuals in
6558 * RestrictInfos, so we must make our own.
6559 */
6560 Assert(!IsA(expr, RestrictInfo));
6561 rinfo = make_restrictinfo(root,
6562 expr,
6563 true,
6564 false,
6565 false,
6566 false,
6567 root->qual_security_level,
6568 grouped_rel->relids,
6569 NULL,
6570 NULL);
6571 if (is_foreign_expr(root, grouped_rel, expr))
6572 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
6573 else
6574 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
6575 }
6576 }
6577
6578 /*
6579 * If there are any local conditions, pull Vars and aggregates from it and
6580 * check whether they are safe to pushdown or not.
6581 */
6582 if (fpinfo->local_conds)
6583 {
6584 List *aggvars = NIL;
6585
6586 foreach(lc, fpinfo->local_conds)
6587 {
6589
6591 pull_var_clause((Node *) rinfo->clause,
6593 }
6594
6595 foreach(lc, aggvars)
6596 {
6597 Expr *expr = (Expr *) lfirst(lc);
6598
6599 /*
6600 * If aggregates within local conditions are not safe to push
6601 * down, then we cannot push down the query. Vars are already
6602 * part of GROUP BY clause which are checked above, so no need to
6603 * access them again here. Again, we need not check
6604 * is_foreign_param for a foreign aggregate.
6605 */
6606 if (IsA(expr, Aggref))
6607 {
6608 if (!is_foreign_expr(root, grouped_rel, expr))
6609 return false;
6610
6611 tlist = add_to_flat_tlist(tlist, list_make1(expr));
6612 }
6613 }
6614 }
6615
6616 /* Store generated targetlist */
6617 fpinfo->grouped_tlist = tlist;
6618
6619 /* Safe to pushdown */
6620 fpinfo->pushdown_safe = true;
6621
6622 /*
6623 * Set # of retrieved rows and cached relation costs to some negative
6624 * value, so that we can detect when they are set to some sensible values,
6625 * during one (usually the first) of the calls to estimate_path_cost_size.
6626 */
6627 fpinfo->retrieved_rows = -1;
6628 fpinfo->rel_startup_cost = -1;
6629 fpinfo->rel_total_cost = -1;
6630
6631 /*
6632 * Set the string describing this grouped relation to be used in EXPLAIN
6633 * output of corresponding ForeignScan. Note that the decoration we add
6634 * to the base relation name mustn't include any digits, or it'll confuse
6635 * postgresExplainForeignScan.
6636 */
6637 fpinfo->relation_name = psprintf("Aggregate on (%s)",
6638 ofpinfo->relation_name);
6639
6640 return true;
6641}
6642
6643/*
6644 * postgresGetForeignUpperPaths
6645 * Add paths for post-join operations like aggregation, grouping etc. if
6646 * corresponding operations are safe to push down.
6647 */
6648static void
6651 void *extra)
6652{
6654
6655 /*
6656 * If input rel is not safe to pushdown, then simply return as we cannot
6657 * perform any post-join operations on the foreign server.
6658 */
6659 if (!input_rel->fdw_private ||
6660 !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
6661 return;
6662
6663 /* Ignore stages we don't support; and skip any duplicate calls. */
6664 if ((stage != UPPERREL_GROUP_AGG &&
6665 stage != UPPERREL_ORDERED &&
6666 stage != UPPERREL_FINAL) ||
6667 output_rel->fdw_private)
6668 return;
6669
6671 fpinfo->pushdown_safe = false;
6672 fpinfo->stage = stage;
6673 output_rel->fdw_private = fpinfo;
6674
6675 switch (stage)
6676 {
6677 case UPPERREL_GROUP_AGG:
6679 (GroupPathExtraData *) extra);
6680 break;
6681 case UPPERREL_ORDERED:
6683 break;
6684 case UPPERREL_FINAL:
6686 (FinalPathExtraData *) extra);
6687 break;
6688 default:
6689 elog(ERROR, "unexpected upper relation: %d", (int) stage);
6690 break;
6691 }
6692}
6693
6694/*
6695 * add_foreign_grouping_paths
6696 * Add foreign path for grouping and/or aggregation.
6697 *
6698 * Given input_rel represents the underlying scan. The paths are added to the
6699 * given grouped_rel.
6700 */
6701static void
6703 RelOptInfo *grouped_rel,
6704 GroupPathExtraData *extra)
6705{
6706 Query *parse = root->parse;
6707 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
6708 PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
6710 double rows;
6711 int width;
6712 int disabled_nodes;
6713 Cost startup_cost;
6714 Cost total_cost;
6715
6716 /* Nothing to be done, if there is no grouping or aggregation required. */
6717 if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
6718 !root->hasHavingQual)
6719 return;
6720
6723
6724 /* save the input_rel as outerrel in fpinfo */
6725 fpinfo->outerrel = input_rel;
6726
6727 /*
6728 * Copy foreign table, foreign server, user mapping, FDW options etc.
6729 * details from the input relation's fpinfo.
6730 */
6731 fpinfo->table = ifpinfo->table;
6732 fpinfo->server = ifpinfo->server;
6733 fpinfo->user = ifpinfo->user;
6735
6736 /*
6737 * Assess if it is safe to push down aggregation and grouping.
6738 *
6739 * Use HAVING qual from extra. In case of child partition, it will have
6740 * translated Vars.
6741 */
6742 if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
6743 return;
6744
6745 /*
6746 * Compute the selectivity and cost of the local_conds, so we don't have
6747 * to do it over again for each path. (Currently we create just a single
6748 * path here, but in future it would be possible that we build more paths
6749 * such as pre-sorted paths as in postgresGetForeignPaths and
6750 * postgresGetForeignJoinPaths.) The best we can do for these conditions
6751 * is to estimate selectivity on the basis of local statistics.
6752 */
6753 fpinfo->local_conds_sel = clauselist_selectivity(root,
6754 fpinfo->local_conds,
6755 0,
6756 JOIN_INNER,
6757 NULL);
6758
6759 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
6760
6761 /* Estimate the cost of push down */
6762 estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL,
6763 &rows, &width, &disabled_nodes,
6764 &startup_cost, &total_cost);
6765
6766 /* Now update this information in the fpinfo */
6767 fpinfo->rows = rows;
6768 fpinfo->width = width;
6769 fpinfo->disabled_nodes = disabled_nodes;
6770 fpinfo->startup_cost = startup_cost;
6771 fpinfo->total_cost = total_cost;
6772
6773 /* Create and add foreign path to the grouping relation. */
6775 grouped_rel,
6776 grouped_rel->reltarget,
6777 rows,
6778 disabled_nodes,
6779 startup_cost,
6780 total_cost,
6781 NIL, /* no pathkeys */
6782 NULL,
6783 NIL, /* no fdw_restrictinfo list */
6784 NIL); /* no fdw_private */
6785
6786 /* Add generated path into grouped_rel by add_path(). */
6787 add_path(grouped_rel, (Path *) grouppath);
6788}
6789
6790/*
6791 * add_foreign_ordered_paths
6792 * Add foreign paths for performing the final sort remotely.
6793 *
6794 * Given input_rel contains the source-data Paths. The paths are added to the
6795 * given ordered_rel.
6796 */
6797static void
6800{
6801 Query *parse = root->parse;
6802 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
6803 PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private;
6805 double rows;
6806 int width;
6807 int disabled_nodes;
6808 Cost startup_cost;
6809 Cost total_cost;
6810 List *fdw_private;
6812 ListCell *lc;
6813
6814 /* Shouldn't get here unless the query has ORDER BY */
6815 Assert(parse->sortClause);
6816
6817 /* We don't support cases where there are any SRFs in the targetlist */
6818 if (parse->hasTargetSRFs)
6819 return;
6820
6821 /* Save the input_rel as outerrel in fpinfo */
6822 fpinfo->outerrel = input_rel;
6823
6824 /*
6825 * Copy foreign table, foreign server, user mapping, FDW options etc.
6826 * details from the input relation's fpinfo.
6827 */
6828 fpinfo->table = ifpinfo->table;
6829 fpinfo->server = ifpinfo->server;
6830 fpinfo->user = ifpinfo->user;
6832
6833 /*
6834 * If the input_rel is a base or join relation, we would already have
6835 * considered pushing down the final sort to the remote server when
6836 * creating pre-sorted foreign paths for that relation, because the
6837 * query_pathkeys is set to the root->sort_pathkeys in that case (see
6838 * standard_qp_callback()).
6839 */
6840 if (input_rel->reloptkind == RELOPT_BASEREL ||
6841 input_rel->reloptkind == RELOPT_JOINREL)
6842 {
6843 Assert(root->query_pathkeys == root->sort_pathkeys);
6844
6845 /* Safe to push down if the query_pathkeys is safe to push down */
6846 fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe;
6847
6848 return;
6849 }
6850
6851 /* The input_rel should be a grouping relation */
6852 Assert(input_rel->reloptkind == RELOPT_UPPER_REL &&
6853 ifpinfo->stage == UPPERREL_GROUP_AGG);
6854
6855 /*
6856 * We try to create a path below by extending a simple foreign path for
6857 * the underlying grouping relation to perform the final sort remotely,
6858 * which is stored into the fdw_private list of the resulting path.
6859 */
6860
6861 /* Assess if it is safe to push down the final sort */
6862 foreach(lc, root->sort_pathkeys)
6863 {
6865 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
6866
6867 /*
6868 * is_foreign_expr would detect volatile expressions as well, but
6869 * checking ec_has_volatile here saves some cycles.
6870 */
6871 if (pathkey_ec->ec_has_volatile)
6872 return;
6873
6874 /*
6875 * Can't push down the sort if pathkey's opfamily is not shippable.
6876 */
6878 fpinfo))
6879 return;
6880
6881 /*
6882 * The EC must contain a shippable EM that is computed in input_rel's
6883 * reltarget, else we can't push down the sort.
6884 */
6886 pathkey_ec,
6887 input_rel) == NULL)
6888 return;
6889 }
6890
6891 /* Safe to push down */
6892 fpinfo->pushdown_safe = true;
6893
6894 /* Construct PgFdwPathExtraData */
6896 fpextra->target = root->upper_targets[UPPERREL_ORDERED];
6897 fpextra->has_final_sort = true;
6898
6899 /* Estimate the costs of performing the final sort remotely */
6901 &rows, &width, &disabled_nodes,
6902 &startup_cost, &total_cost);
6903
6904 /*
6905 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6906 * Items in the list must match order in enum FdwPathPrivateIndex.
6907 */
6908 fdw_private = list_make2(makeBoolean(true), makeBoolean(false));
6909
6910 /* Create foreign ordering path */
6912 input_rel,
6913 root->upper_targets[UPPERREL_ORDERED],
6914 rows,
6915 disabled_nodes,
6916 startup_cost,
6917 total_cost,
6918 root->sort_pathkeys,
6919 NULL, /* no extra plan */
6920 NIL, /* no fdw_restrictinfo
6921 * list */
6922 fdw_private);
6923
6924 /* and add it to the ordered_rel */
6926}
6927
6928/*
6929 * add_foreign_final_paths
6930 * Add foreign paths for performing the final processing remotely.
6931 *
6932 * Given input_rel contains the source-data Paths. The paths are added to the
6933 * given final_rel.
6934 */
6935static void
6938 FinalPathExtraData *extra)
6939{
6940 Query *parse = root->parse;
6943 bool has_final_sort = false;
6944 List *pathkeys = NIL;
6946 bool save_use_remote_estimate = false;
6947 double rows;
6948 int width;
6949 int disabled_nodes;
6950 Cost startup_cost;
6951 Cost total_cost;
6952 List *fdw_private;
6954
6955 /*
6956 * Currently, we only support this for SELECT commands
6957 */
6958 if (parse->commandType != CMD_SELECT)
6959 return;
6960
6961 /*
6962 * No work if there is no FOR UPDATE/SHARE clause and if there is no need
6963 * to add a LIMIT node
6964 */
6965 if (!parse->rowMarks && !extra->limit_needed)
6966 return;
6967
6968 /* We don't support cases where there are any SRFs in the targetlist */
6969 if (parse->hasTargetSRFs)
6970 return;
6971
6972 /* Save the input_rel as outerrel in fpinfo */
6973 fpinfo->outerrel = input_rel;
6974
6975 /*
6976 * Copy foreign table, foreign server, user mapping, FDW options etc.
6977 * details from the input relation's fpinfo.
6978 */
6979 fpinfo->table = ifpinfo->table;
6980 fpinfo->server = ifpinfo->server;
6981 fpinfo->user = ifpinfo->user;
6983
6984 /*
6985 * If there is no need to add a LIMIT node, there might be a ForeignPath
6986 * in the input_rel's pathlist that implements all behavior of the query.
6987 * Note: we would already have accounted for the query's FOR UPDATE/SHARE
6988 * (if any) before we get here.
6989 */
6990 if (!extra->limit_needed)
6991 {
6992 ListCell *lc;
6993
6994 Assert(parse->rowMarks);
6995
6996 /*
6997 * Grouping and aggregation are not supported with FOR UPDATE/SHARE,
6998 * so the input_rel should be a base, join, or ordered relation; and
6999 * if it's an ordered relation, its input relation should be a base or
7000 * join relation.
7001 */
7002 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
7003 input_rel->reloptkind == RELOPT_JOINREL ||
7004 (input_rel->reloptkind == RELOPT_UPPER_REL &&
7005 ifpinfo->stage == UPPERREL_ORDERED &&
7006 (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL ||
7007 ifpinfo->outerrel->reloptkind == RELOPT_JOINREL)));
7008
7009 foreach(lc, input_rel->pathlist)
7010 {
7011 Path *path = (Path *) lfirst(lc);
7012
7013 /*
7014 * apply_scanjoin_target_to_paths() uses create_projection_path()
7015 * to adjust each of its input paths if needed, whereas
7016 * create_ordered_paths() uses apply_projection_to_path() to do
7017 * that. So the former might have put a ProjectionPath on top of
7018 * the ForeignPath; look through ProjectionPath and see if the
7019 * path underneath it is ForeignPath.
7020 */
7021 if (IsA(path, ForeignPath) ||
7022 (IsA(path, ProjectionPath) &&
7023 IsA(((ProjectionPath *) path)->subpath, ForeignPath)))
7024 {
7025 /*
7026 * Create foreign final path; this gets rid of a
7027 * no-longer-needed outer plan (if any), which makes the
7028 * EXPLAIN output look cleaner
7029 */
7031 path->parent,
7032 path->pathtarget,
7033 path->rows,
7034 path->disabled_nodes,
7035 path->startup_cost,
7036 path->total_cost,
7037 path->pathkeys,
7038 NULL, /* no extra plan */
7039 NIL, /* no fdw_restrictinfo
7040 * list */
7041 NIL); /* no fdw_private */
7042
7043 /* and add it to the final_rel */
7045
7046 /* Safe to push down */
7047 fpinfo->pushdown_safe = true;
7048
7049 return;
7050 }
7051 }
7052
7053 /*
7054 * If we get here it means no ForeignPaths; since we would already
7055 * have considered pushing down all operations for the query to the
7056 * remote server, give up on it.
7057 */
7058 return;
7059 }
7060
7061 Assert(extra->limit_needed);
7062
7063 /*
7064 * If the input_rel is an ordered relation, replace the input_rel with its
7065 * input relation
7066 */
7067 if (input_rel->reloptkind == RELOPT_UPPER_REL &&
7068 ifpinfo->stage == UPPERREL_ORDERED)
7069 {
7070 input_rel = ifpinfo->outerrel;
7071 ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
7072 has_final_sort = true;
7073 pathkeys = root->sort_pathkeys;
7074 }
7075
7076 /* The input_rel should be a base, join, or grouping relation */
7077 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
7078 input_rel->reloptkind == RELOPT_JOINREL ||
7079 (input_rel->reloptkind == RELOPT_UPPER_REL &&
7080 ifpinfo->stage == UPPERREL_GROUP_AGG));
7081
7082 /*
7083 * We try to create a path below by extending a simple foreign path for
7084 * the underlying base, join, or grouping relation to perform the final
7085 * sort (if has_final_sort) and the LIMIT restriction remotely, which is
7086 * stored into the fdw_private list of the resulting path. (We
7087 * re-estimate the costs of sorting the underlying relation, if
7088 * has_final_sort.)
7089 */
7090
7091 /*
7092 * Assess if it is safe to push down the LIMIT and OFFSET to the remote
7093 * server
7094 */
7095
7096 /*
7097 * If the underlying relation has any local conditions, the LIMIT/OFFSET
7098 * cannot be pushed down.
7099 */
7100 if (ifpinfo->local_conds)
7101 return;
7102
7103 /*
7104 * If the query has FETCH FIRST .. WITH TIES, 1) it must have ORDER BY as
7105 * well, which is used to determine which additional rows tie for the last
7106 * place in the result set, and 2) ORDER BY must already have been
7107 * determined to be safe to push down before we get here. So in that case
7108 * the FETCH clause is safe to push down with ORDER BY if the remote
7109 * server is v13 or later, but if not, the remote query will fail entirely
7110 * for lack of support for it. Since we do not currently have a way to do
7111 * a remote-version check (without accessing the remote server), disable
7112 * pushing the FETCH clause for now.
7113 */
7114 if (parse->limitOption == LIMIT_OPTION_WITH_TIES)
7115 return;
7116
7117 /*
7118 * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are
7119 * not safe to remote.
7120 */
7121 if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) ||
7122 !is_foreign_expr(root, input_rel, (Expr *) parse->limitCount))
7123 return;
7124
7125 /* Safe to push down */
7126 fpinfo->pushdown_safe = true;
7127
7128 /* Construct PgFdwPathExtraData */
7130 fpextra->target = root->upper_targets[UPPERREL_FINAL];
7131 fpextra->has_final_sort = has_final_sort;
7132 fpextra->has_limit = extra->limit_needed;
7133 fpextra->limit_tuples = extra->limit_tuples;
7134 fpextra->count_est = extra->count_est;
7135 fpextra->offset_est = extra->offset_est;
7136
7137 /*
7138 * Estimate the costs of performing the final sort and the LIMIT
7139 * restriction remotely. If has_final_sort is false, we wouldn't need to
7140 * execute EXPLAIN anymore if use_remote_estimate, since the costs can be
7141 * roughly estimated using the costs we already have for the underlying
7142 * relation, in the same way as when use_remote_estimate is false. Since
7143 * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to
7144 * false in that case.
7145 */
7146 if (!fpextra->has_final_sort)
7147 {
7148 save_use_remote_estimate = ifpinfo->use_remote_estimate;
7149 ifpinfo->use_remote_estimate = false;
7150 }
7152 &rows, &width, &disabled_nodes,
7153 &startup_cost, &total_cost);
7154 if (!fpextra->has_final_sort)
7155 ifpinfo->use_remote_estimate = save_use_remote_estimate;
7156
7157 /*
7158 * Build the fdw_private list that will be used by postgresGetForeignPlan.
7159 * Items in the list must match order in enum FdwPathPrivateIndex.
7160 */
7161 fdw_private = list_make2(makeBoolean(has_final_sort),
7162 makeBoolean(extra->limit_needed));
7163
7164 /*
7165 * Create foreign final path; this gets rid of a no-longer-needed outer
7166 * plan (if any), which makes the EXPLAIN output look cleaner
7167 */
7169 input_rel,
7170 root->upper_targets[UPPERREL_FINAL],
7171 rows,
7172 disabled_nodes,
7173 startup_cost,
7174 total_cost,
7175 pathkeys,
7176 NULL, /* no extra plan */
7177 NIL, /* no fdw_restrictinfo list */
7178 fdw_private);
7179
7180 /* and add it to the final_rel */
7182}
7183
7184/*
7185 * postgresIsForeignPathAsyncCapable
7186 * Check whether a given ForeignPath node is async-capable.
7187 */
7188static bool
7190{
7191 RelOptInfo *rel = ((Path *) path)->parent;
7192 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
7193
7194 return fpinfo->async_capable;
7195}
7196
7197/*
7198 * postgresForeignAsyncRequest
7199 * Asynchronously request next tuple from a foreign PostgreSQL table.
7200 */
7201static void
7206
7207/*
7208 * postgresForeignAsyncConfigureWait
7209 * Configure a file descriptor event for which we wish to wait.
7210 */
7211static void
7213{
7214 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7215 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7216 AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
7217 AppendState *requestor = (AppendState *) areq->requestor;
7218 WaitEventSet *set = requestor->as_eventset;
7219
7220 /* This should not be called unless callback_pending */
7221 Assert(areq->callback_pending);
7222
7223 /*
7224 * If process_pending_request() has been invoked on the given request
7225 * before we get here, we might have some tuples already; in which case
7226 * complete the request
7227 */
7228 if (fsstate->next_tuple < fsstate->num_tuples)
7229 {
7231 if (areq->request_complete)
7232 return;
7233 Assert(areq->callback_pending);
7234 }
7235
7236 /* We must have run out of tuples */
7237 Assert(fsstate->next_tuple >= fsstate->num_tuples);
7238
7239 /* The core code would have registered postmaster death event */
7241
7242 /* Begin an asynchronous data fetch if not already done */
7243 if (!pendingAreq)
7245 else if (pendingAreq->requestor != areq->requestor)
7246 {
7247 /*
7248 * This is the case when the in-process request was made by another
7249 * Append. Note that it might be useless to process the request made
7250 * by that Append, because the query might not need tuples from that
7251 * Append anymore; so we avoid processing it to begin a fetch for the
7252 * given request if possible. If there are any child subplans of the
7253 * same parent that are ready for new requests, skip the given
7254 * request. Likewise, if there are any configured events other than
7255 * the postmaster death event, skip it. Otherwise, process the
7256 * in-process request, then begin a fetch to configure the event
7257 * below, because we might otherwise end up with no configured events
7258 * other than the postmaster death event.
7259 */
7260 if (!bms_is_empty(requestor->as_needrequest))
7261 return;
7262 if (GetNumRegisteredWaitEvents(set) > 1)
7263 return;
7264 process_pending_request(pendingAreq);
7266 }
7267 else if (pendingAreq->requestee != areq->requestee)
7268 {
7269 /*
7270 * This is the case when the in-process request was made by the same
7271 * parent but for a different child. Since we configure only the
7272 * event for the request made for that child, skip the given request.
7273 */
7274 return;
7275 }
7276 else
7277 Assert(pendingAreq == areq);
7278
7280 NULL, areq);
7281}
7282
7283/*
7284 * postgresForeignAsyncNotify
7285 * Fetch some more tuples from a file descriptor that becomes ready,
7286 * requesting next tuple.
7287 */
7288static void
7290{
7291 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7292 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7293
7294 /* The core code would have initialized the callback_pending flag */
7295 Assert(!areq->callback_pending);
7296
7297 /*
7298 * If process_pending_request() has been invoked on the given request
7299 * before we get here, we might have some tuples already; in which case
7300 * produce the next tuple
7301 */
7302 if (fsstate->next_tuple < fsstate->num_tuples)
7303 {
7305 return;
7306 }
7307
7308 /* We must have run out of tuples */
7309 Assert(fsstate->next_tuple >= fsstate->num_tuples);
7310
7311 /* The request should be currently in-process */
7312 Assert(fsstate->conn_state->pendingAreq == areq);
7313
7314 /* On error, report the original query, not the FETCH. */
7315 if (!PQconsumeInput(fsstate->conn))
7316 pgfdw_report_error(NULL, fsstate->conn, fsstate->query);
7317
7318 fetch_more_data(node);
7319
7321}
7322
7323/*
7324 * Asynchronously produce next tuple from a foreign PostgreSQL table.
7325 */
7326static void
7328{
7329 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7330 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7331 AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
7332 TupleTableSlot *result;
7333
7334 /* This should not be called if the request is currently in-process */
7335 Assert(areq != pendingAreq);
7336
7337 /* Fetch some more tuples, if we've run out */
7338 if (fsstate->next_tuple >= fsstate->num_tuples)
7339 {
7340 /* No point in another fetch if we already detected EOF, though */
7341 if (!fsstate->eof_reached)
7342 {
7343 /* Mark the request as pending for a callback */
7345 /* Begin another fetch if requested and if no pending request */
7346 if (fetch && !pendingAreq)
7348 }
7349 else
7350 {
7351 /* There's nothing more to do; just return a NULL pointer */
7352 result = NULL;
7353 /* Mark the request as complete */
7354 ExecAsyncRequestDone(areq, result);
7355 }
7356 return;
7357 }
7358
7359 /* Get a tuple from the ForeignScan node */
7360 result = areq->requestee->ExecProcNodeReal(areq->requestee);
7361 if (!TupIsNull(result))
7362 {
7363 /* Mark the request as complete */
7364 ExecAsyncRequestDone(areq, result);
7365 return;
7366 }
7367
7368 /* We must have run out of tuples */
7369 Assert(fsstate->next_tuple >= fsstate->num_tuples);
7370
7371 /* Fetch some more tuples, if we've not detected EOF yet */
7372 if (!fsstate->eof_reached)
7373 {
7374 /* Mark the request as pending for a callback */
7376 /* Begin another fetch if requested and if no pending request */
7377 if (fetch && !pendingAreq)
7379 }
7380 else
7381 {
7382 /* There's nothing more to do; just return a NULL pointer */
7383 result = NULL;
7384 /* Mark the request as complete */
7385 ExecAsyncRequestDone(areq, result);
7386 }
7387}
7388
7389/*
7390 * Begin an asynchronous data fetch.
7391 *
7392 * Note: this function assumes there is no currently-in-progress asynchronous
7393 * data fetch.
7394 *
7395 * Note: fetch_more_data must be called to fetch the result.
7396 */
7397static void
7399{
7400 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7401 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7402 char sql[64];
7403
7404 Assert(!fsstate->conn_state->pendingAreq);
7405
7406 /* Create the cursor synchronously. */
7407 if (!fsstate->cursor_exists)
7408 create_cursor(node);
7409
7410 /* We will send this query, but not wait for the response. */
7411 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
7412 fsstate->fetch_size, fsstate->cursor_number);
7413
7414 if (!PQsendQuery(fsstate->conn, sql))
7415 pgfdw_report_error(NULL, fsstate->conn, fsstate->query);
7416
7417 /* Remember that the request is in process */
7418 fsstate->conn_state->pendingAreq = areq;
7419}
7420
7421/*
7422 * Process a pending asynchronous request.
7423 */
7424void
7426{
7427 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7428 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7429
7430 /* The request would have been pending for a callback */
7431 Assert(areq->callback_pending);
7432
7433 /* The request should be currently in-process */
7434 Assert(fsstate->conn_state->pendingAreq == areq);
7435
7436 fetch_more_data(node);
7437
7438 /*
7439 * If we didn't get any tuples, must be end of data; complete the request
7440 * now. Otherwise, we postpone completing the request until we are called
7441 * from postgresForeignAsyncConfigureWait()/postgresForeignAsyncNotify().
7442 */
7443 if (fsstate->next_tuple >= fsstate->num_tuples)
7444 {
7445 /* Unlike AsyncNotify, we unset callback_pending ourselves */
7446 areq->callback_pending = false;
7447 /* Mark the request as complete */
7449 /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7451 }
7452}
7453
7454/*
7455 * Complete a pending asynchronous request.
7456 */
7457static void
7459{
7460 /* The request would have been pending for a callback */
7461 Assert(areq->callback_pending);
7462
7463 /* Unlike AsyncNotify, we unset callback_pending ourselves */
7464 areq->callback_pending = false;
7465
7466 /* We begin a fetch afterwards if necessary; don't fetch */
7468
7469 /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7471
7472 /* Also, we do instrumentation ourselves, if required */
7473 if (areq->requestee->instrument)
7474 InstrUpdateTupleCount(areq->requestee->instrument,
7475 TupIsNull(areq->result) ? 0.0 : 1.0);
7476}
7477
7478/*
7479 * Create a tuple from the specified row of the PGresult.
7480 *
7481 * rel is the local representation of the foreign table, attinmeta is
7482 * conversion data for the rel's tupdesc, and retrieved_attrs is an
7483 * integer list of the table column numbers present in the PGresult.
7484 * fsstate is the ForeignScan plan node's execution state.
7485 * temp_context is a working context that can be reset after each tuple.
7486 *
7487 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL
7488 * if we're processing a remote join, while fsstate is NULL in a non-query
7489 * context such as ANALYZE, or if we're processing a non-scan query node.
7490 */
7491static HeapTuple
7493 int row,
7494 Relation rel,
7495 AttInMetadata *attinmeta,
7496 List *retrieved_attrs,
7497 ForeignScanState *fsstate,
7499{
7500 HeapTuple tuple;
7501 TupleDesc tupdesc;
7502 Datum *values;
7503 bool *nulls;
7504 ItemPointer ctid = NULL;
7506 ErrorContextCallback errcallback;
7507 MemoryContext oldcontext;
7508 ListCell *lc;
7509 int j;
7510
7511 Assert(row < PQntuples(res));
7512
7513 /*
7514 * Do the following work in a temp context that we reset after each tuple.
7515 * This cleans up not only the data we have direct access to, but any
7516 * cruft the I/O functions might leak.
7517 */
7518 oldcontext = MemoryContextSwitchTo(temp_context);
7519
7520 /*
7521 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
7522 * provided, otherwise look to the scan node's ScanTupleSlot.
7523 */
7524 if (rel)
7525 tupdesc = RelationGetDescr(rel);
7526 else
7527 {
7528 Assert(fsstate);
7529 tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
7530 }
7531
7532 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
7533 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
7534 /* Initialize to nulls for any columns not present in result */
7535 memset(nulls, true, tupdesc->natts * sizeof(bool));
7536
7537 /*
7538 * Set up and install callback to report where conversion error occurs.
7539 */
7540 errpos.cur_attno = 0;
7541 errpos.rel = rel;
7542 errpos.fsstate = fsstate;
7543 errcallback.callback = conversion_error_callback;
7544 errcallback.arg = &errpos;
7545 errcallback.previous = error_context_stack;
7546 error_context_stack = &errcallback;
7547
7548 /*
7549 * i indexes columns in the relation, j indexes columns in the PGresult.
7550 */
7551 j = 0;
7552 foreach(lc, retrieved_attrs)
7553 {
7554 int i = lfirst_int(lc);
7555 char *valstr;
7556
7557 /* fetch next column's textual value */
7558 if (PQgetisnull(res, row, j))
7559 valstr = NULL;
7560 else
7561 valstr = PQgetvalue(res, row, j);
7562
7563 /*
7564 * convert value to internal representation
7565 *
7566 * Note: we ignore system columns other than ctid and oid in result
7567 */
7568 errpos.cur_attno = i;
7569 if (i > 0)
7570 {
7571 /* ordinary column */
7572 Assert(i <= tupdesc->natts);
7573 nulls[i - 1] = (valstr == NULL);
7574 /* Apply the input function even to nulls, to support domains */
7575 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
7576 valstr,
7577 attinmeta->attioparams[i - 1],
7578 attinmeta->atttypmods[i - 1]);
7579 }
7581 {
7582 /* ctid */
7583 if (valstr != NULL)
7584 {
7585 Datum datum;
7586
7588 ctid = (ItemPointer) DatumGetPointer(datum);
7589 }
7590 }
7591 errpos.cur_attno = 0;
7592
7593 j++;
7594 }
7595
7596 /* Uninstall error context callback. */
7597 error_context_stack = errcallback.previous;
7598
7599 /*
7600 * Check we got the expected number of columns. Note: j == 0 and
7601 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
7602 */
7603 if (j > 0 && j != PQnfields(res))
7604 elog(ERROR, "remote query result does not match the foreign table");
7605
7606 /*
7607 * Build the result tuple in caller's memory context.
7608 */
7609 MemoryContextSwitchTo(oldcontext);
7610
7611 tuple = heap_form_tuple(tupdesc, values, nulls);
7612
7613 /*
7614 * If we have a CTID to return, install it in both t_self and t_ctid.
7615 * t_self is the normal place, but if the tuple is converted to a
7616 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
7617 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
7618 */
7619 if (ctid)
7620 tuple->t_self = tuple->t_data->t_ctid = *ctid;
7621
7622 /*
7623 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
7624 * heap_form_tuple. heap_form_tuple actually creates the tuple with
7625 * DatumTupleFields, not HeapTupleFields, but the executor expects
7626 * HeapTupleFields and will happily extract system columns on that
7627 * assumption. If we don't do this then, for example, the tuple length
7628 * ends up in the xmin field, which isn't what we want.
7629 */
7633
7634 /* Clean up */
7636
7637 return tuple;
7638}
7639
7640/*
7641 * Callback function which is called when error occurs during column value
7642 * conversion. Print names of column and relation.
7643 *
7644 * Note that this function mustn't do any catalog lookups, since we are in
7645 * an already-failed transaction. Fortunately, we can get the needed info
7646 * from the relation or the query's rangetable instead.
7647 */
7648static void
7650{
7652 Relation rel = errpos->rel;
7653 ForeignScanState *fsstate = errpos->fsstate;
7654 const char *attname = NULL;
7655 const char *relname = NULL;
7656 bool is_wholerow = false;
7657
7658 /*
7659 * If we're in a scan node, always use aliases from the rangetable, for
7660 * consistency between the simple-relation and remote-join cases. Look at
7661 * the relation's tupdesc only if we're not in a scan node.
7662 */
7663 if (fsstate)
7664 {
7665 /* ForeignScan case */
7667 int varno = 0;
7668 AttrNumber colno = 0;
7669
7670 if (fsplan->scan.scanrelid > 0)
7671 {
7672 /* error occurred in a scan against a foreign table */
7673 varno = fsplan->scan.scanrelid;
7674 colno = errpos->cur_attno;
7675 }
7676 else
7677 {
7678 /* error occurred in a scan against a foreign join */
7680
7681 tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
7682 errpos->cur_attno - 1);
7683
7684 /*
7685 * Target list can have Vars and expressions. For Vars, we can
7686 * get some information, however for expressions we can't. Thus
7687 * for expressions, just show generic context message.
7688 */
7689 if (IsA(tle->expr, Var))
7690 {
7691 Var *var = (Var *) tle->expr;
7692
7693 varno = var->varno;
7694 colno = var->varattno;
7695 }
7696 }
7697
7698 if (varno > 0)
7699 {
7700 EState *estate = fsstate->ss.ps.state;
7701 RangeTblEntry *rte = exec_rt_fetch(varno, estate);
7702
7703 relname = rte->eref->aliasname;
7704
7705 if (colno == 0)
7706 is_wholerow = true;
7707 else if (colno > 0 && colno <= list_length(rte->eref->colnames))
7708 attname = strVal(list_nth(rte->eref->colnames, colno - 1));
7709 else if (colno == SelfItemPointerAttributeNumber)
7710 attname = "ctid";
7711 }
7712 }
7713 else if (rel)
7714 {
7715 /* Non-ForeignScan case (we should always have a rel here) */
7716 TupleDesc tupdesc = RelationGetDescr(rel);
7717
7719 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
7720 {
7721 Form_pg_attribute attr = TupleDescAttr(tupdesc,
7722 errpos->cur_attno - 1);
7723
7724 attname = NameStr(attr->attname);
7725 }
7726 else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
7727 attname = "ctid";
7728 }
7729
7730 if (relname && is_wholerow)
7731 errcontext("whole-row reference to foreign table \"%s\"", relname);
7732 else if (relname && attname)
7733 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
7734 else
7735 errcontext("processing expression at position %d in select list",
7736 errpos->cur_attno);
7737}
7738
7739/*
7740 * Given an EquivalenceClass and a foreign relation, find an EC member
7741 * that can be used to sort the relation remotely according to a pathkey
7742 * using this EC.
7743 *
7744 * If there is more than one suitable candidate, return an arbitrary
7745 * one of them. If there is none, return NULL.
7746 *
7747 * This checks that the EC member expression uses only Vars from the given
7748 * rel and is shippable. Caller must separately verify that the pathkey's
7749 * ordering operator is shippable.
7750 */
7753{
7754 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
7757
7759 while ((em = eclass_member_iterator_next(&it)) != NULL)
7760 {
7761 /*
7762 * Note we require !bms_is_empty, else we'd accept constant
7763 * expressions which are not suitable for the purpose.
7764 */
7765 if (bms_is_subset(em->em_relids, rel->relids) &&
7766 !bms_is_empty(em->em_relids) &&
7767 bms_is_empty(bms_intersect(em->em_relids, fpinfo->hidden_subquery_rels)) &&
7768 is_foreign_expr(root, rel, em->em_expr))
7769 return em;
7770 }
7771
7772 return NULL;
7773}
7774
7775/*
7776 * Find an EquivalenceClass member that is to be computed as a sort column
7777 * in the given rel's reltarget, and is shippable.
7778 *
7779 * If there is more than one suitable candidate, return an arbitrary
7780 * one of them. If there is none, return NULL.
7781 *
7782 * This checks that the EC member expression uses only Vars from the given
7783 * rel and is shippable. Caller must separately verify that the pathkey's
7784 * ordering operator is shippable.
7785 */
7788 RelOptInfo *rel)
7789{
7790 PathTarget *target = rel->reltarget;
7791 ListCell *lc1;
7792 int i;
7793
7794 i = 0;
7795 foreach(lc1, target->exprs)
7796 {
7797 Expr *expr = (Expr *) lfirst(lc1);
7799 ListCell *lc2;
7800
7801 /* Ignore non-sort expressions */
7802 if (sgref == 0 ||
7804 root->parse->sortClause) == NULL)
7805 {
7806 i++;
7807 continue;
7808 }
7809
7810 /* We ignore binary-compatible relabeling on both ends */
7811 while (expr && IsA(expr, RelabelType))
7812 expr = ((RelabelType *) expr)->arg;
7813
7814 /*
7815 * Locate an EquivalenceClass member matching this expr, if any.
7816 * Ignore child members.
7817 */
7818 foreach(lc2, ec->ec_members)
7819 {
7821 Expr *em_expr;
7822
7823 /* Don't match constants */
7824 if (em->em_is_const)
7825 continue;
7826
7827 /* Child members should not exist in ec_members */
7828 Assert(!em->em_is_child);
7829
7830 /* Match if same expression (after stripping relabel) */
7831 em_expr = em->em_expr;
7832 while (em_expr && IsA(em_expr, RelabelType))
7833 em_expr = ((RelabelType *) em_expr)->arg;
7834
7835 if (!equal(em_expr, expr))
7836 continue;
7837
7838 /* Check that expression (including relabels!) is shippable */
7839 if (is_foreign_expr(root, rel, em->em_expr))
7840 return em;
7841 }
7842
7843 i++;
7844 }
7845
7846 return NULL;
7847}
7848
7849/*
7850 * Determine batch size for a given foreign table. The option specified for
7851 * a table has precedence.
7852 */
7853static int
7855{
7858 ForeignServer *server;
7859 List *options;
7860 ListCell *lc;
7861
7862 /* we use 1 by default, which means "no batching" */
7863 int batch_size = 1;
7864
7865 /*
7866 * Load options for table and server. We append server options after table
7867 * options, because table options take precedence.
7868 */
7870 server = GetForeignServer(table->serverid);
7871
7872 options = NIL;
7873 options = list_concat(options, table->options);
7874 options = list_concat(options, server->options);
7875
7876 /* See if either table or server specifies batch_size. */
7877 foreach(lc, options)
7878 {
7879 DefElem *def = (DefElem *) lfirst(lc);
7880
7881 if (strcmp(def->defname, "batch_size") == 0)
7882 {
7883 (void) parse_int(defGetString(def), &batch_size, 0, NULL);
7884 break;
7885 }
7886 }
7887
7888 return batch_size;
7889}
void get_translated_update_targetlist(PlannerInfo *root, Index relid, List **processed_tlist, List **update_colnos)
Definition appendinfo.c:766
void add_row_identity_var(PlannerInfo *root, Var *orig_var, Index rtindex, const char *rowid_name)
Definition appendinfo.c:864
int16 AttrNumber
Definition attnum.h:21
#define AttributeNumberIsValid(attributeNumber)
Definition attnum.h:34
#define InvalidAttrNumber
Definition attnum.h:23
Bitmapset * bms_intersect(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:292
int bms_next_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1290
Bitmapset * bms_del_member(Bitmapset *a, int x)
Definition bitmapset.c:852
bool bms_is_subset(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:412
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
Bitmapset * bms_add_members(Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:901
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:251
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:575
bool bms_nonempty_difference(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:634
#define bms_is_empty(a)
Definition bitmapset.h:118
uint32 BlockNumber
Definition block.h:31
static Datum values[MAXATTR]
Definition bootstrap.c:188
#define NameStr(name)
Definition c.h:837
#define Min(x, y)
Definition c.h:1093
#define MAXALIGN(LEN)
Definition c.h:898
#define Max(x, y)
Definition c.h:1087
#define Assert(condition)
Definition c.h:945
int64_t int64
Definition c.h:615
#define CppAsString2(x)
Definition c.h:500
unsigned int Index
Definition c.h:700
#define OidIsValid(objectId)
Definition c.h:860
Selectivity clauselist_selectivity(PlannerInfo *root, List *clauses, int varRelid, JoinType jointype, SpecialJoinInfo *sjinfo)
Definition clausesel.c:100
@ COMPARE_LT
Definition cmptype.h:34
unsigned int GetCursorNumber(PGconn *conn)
Definition connection.c:940
void do_sql_command(PGconn *conn, const char *sql)
Definition connection.c:833
PGresult * pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
Definition connection.c:969
void ReleaseConnection(PGconn *conn)
Definition connection.c:919
PGresult * pgfdw_get_result(PGconn *conn)
Definition connection.c:986
void pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
static unsigned int cursor_number
Definition connection.c:83
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition connection.c:954
List * ExtractExtensionList(const char *extensionsString, bool warnOnMissing)
Definition option.c:445
double cpu_operator_cost
Definition costsize.c:135
void set_baserel_size_estimates(PlannerInfo *root, RelOptInfo *rel)
Definition costsize.c:5492
void cost_sort(Path *path, PlannerInfo *root, List *pathkeys, int input_disabled_nodes, Cost input_cost, double tuples, int width, Cost comparison_cost, int sort_mem, double limit_tuples)
Definition costsize.c:2201
double cpu_tuple_cost
Definition costsize.c:133
void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root)
Definition costsize.c:4899
double seq_page_cost
Definition costsize.c:131
double clamp_row_est(double nrows)
Definition costsize.c:214
bool is_projection_capable_path(Path *path)
ForeignScan * make_foreignscan(List *qptlist, List *qpqual, Index scanrelid, List *fdw_exprs, List *fdw_private, List *fdw_scan_tlist, List *fdw_recheck_quals, Plan *outer_plan)
Plan * change_plan_targetlist(Plan *subplan, List *tlist, bool tlist_parallel_safe)
char * defGetString(DefElem *def)
Definition define.c:34
bool defGetBoolean(DefElem *def)
Definition define.c:93
void deparseAnalyzeSizeSql(StringInfo buf, Relation rel)
Definition deparse.c:2530
const char * get_jointype_name(JoinType jointype)
Definition deparse.c:1672
void deparseAnalyzeInfoSql(StringInfo buf, Relation rel)
Definition deparse.c:2552
void deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, RelOptInfo *foreignrel, List *remote_conds, List **params_list, List *returningList, List **retrieved_attrs)
Definition deparse.c:2422
void deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, RelOptInfo *foreignrel, List *targetlist, List *targetAttrs, List *remote_conds, List **params_list, List *returningList, List **retrieved_attrs)
Definition deparse.c:2307
bool is_foreign_param(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
Definition deparse.c:1115
void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List *tlist, List *remote_conds, List *pathkeys, bool has_final_sort, bool has_limit, bool is_subquery, List **retrieved_attrs, List **params_list)
Definition deparse.c:1266
void deparseStringLiteral(StringInfo buf, const char *val)
Definition deparse.c:2880
void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows)
Definition deparse.c:2187
void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, List **retrieved_attrs, int *values_end_len)
Definition deparse.c:2114
void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, List *withCheckOptionList, List *returningList, List **retrieved_attrs)
Definition deparse.c:2247
void deparseDeleteSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *returningList, List **retrieved_attrs)
Definition deparse.c:2393
void deparseAnalyzeSql(StringInfo buf, Relation rel, PgFdwSamplingMethod sample_method, double sample_frac, List **retrieved_attrs)
Definition deparse.c:2592
bool is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
Definition deparse.c:244
void classifyConditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, List **remote_conds, List **local_conds)
Definition deparse.c:218
void deparseTruncateSql(StringInfo buf, List *rels, DropBehavior behavior, bool restart_seqs)
Definition deparse.c:2677
bool is_foreign_pathkey(PlannerInfo *root, RelOptInfo *baserel, PathKey *pathkey)
Definition deparse.c:1156
List * build_tlist_to_deparse(RelOptInfo *foreignrel)
Definition deparse.c:1209
Datum arg
Definition elog.c:1322
ErrorContextCallback * error_context_stack
Definition elog.c:99
int errcode(int sqlerrcode)
Definition elog.c:874
#define errcontext
Definition elog.h:198
#define DEBUG3
Definition elog.h:28
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void setup_eclass_member_iterator(EquivalenceMemberIterator *it, EquivalenceClass *ec, Relids child_relids)
List * generate_implied_equalities_for_column(PlannerInfo *root, RelOptInfo *rel, ec_matches_callback_type callback, void *callback_arg, Relids prohibited_rels)
EquivalenceMember * eclass_member_iterator_next(EquivalenceMemberIterator *it)
bool eclass_useful_for_merging(PlannerInfo *root, EquivalenceClass *eclass, RelOptInfo *rel)
void ExecAsyncResponse(AsyncRequest *areq)
Definition execAsync.c:118
void ExecAsyncRequestPending(AsyncRequest *areq)
Definition execAsync.c:150
void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
Definition execAsync.c:138
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition execExpr.c:356
AttrNumber ExecFindJunkAttributeInTlist(List *targetlist, const char *attrName)
Definition execJunk.c:222
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
HeapTuple ExecFetchSlotHeapTuple(TupleTableSlot *slot, bool materialize, bool *shouldFree)
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
TupleTableSlot * ExecGetReturningSlot(EState *estate, ResultRelInfo *relInfo)
Definition execUtils.c:1253
Relation ExecOpenScanRelation(EState *estate, Index scanrelid, int eflags)
Definition execUtils.c:747
Oid ExecGetResultRelCheckAsUser(ResultRelInfo *relInfo, EState *estate)
Definition execUtils.c:1494
#define outerPlanState(node)
Definition execnodes.h:1273
static RangeTblEntry * exec_rt_fetch(Index rti, EState *estate)
Definition executor.h:701
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition executor.h:315
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition executor.h:396
#define EXEC_FLAG_EXPLAIN_ONLY
Definition executor.h:67
static Datum ExecGetJunkAttribute(TupleTableSlot *slot, AttrNumber attno, bool *isNull)
Definition executor.h:226
void ExplainPropertyText(const char *qlabel, const char *value, ExplainState *es)
void ExplainPropertyInteger(const char *qlabel, const char *unit, int64 value, ExplainState *es)
int(* AcquireSampleRowsFunc)(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows)
Definition fdwapi.h:151
int PQserverVersion(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition fe-exec.c:1509
int PQconsumeInput(PGconn *conn)
Definition fe-exec.c:2001
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition fe-exec.c:1553
int PQsendQuery(PGconn *conn, const char *query)
Definition fe-exec.c:1433
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition fe-exec.c:1650
#define palloc0_array(type, count)
Definition fe_memutils.h:77
#define palloc0_object(type)
Definition fe_memutils.h:75
int extra_float_digits
Definition float.c:57
Datum InputFunctionCall(FmgrInfo *flinfo, char *str, Oid typioparam, int32 typmod)
Definition fmgr.c:1532
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition fmgr.c:129
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition fmgr.c:1684
#define PG_MODULE_MAGIC_EXT(...)
Definition fmgr.h:540
#define DirectFunctionCall1(func, arg1)
Definition fmgr.h:684
#define PG_FUNCTION_INFO_V1(funcname)
Definition fmgr.h:417
#define PG_RETURN_POINTER(x)
Definition fmgr.h:363
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
ForeignTable * GetForeignTable(Oid relid)
Definition foreign.c:343
Path * GetExistingLocalJoinPath(RelOptInfo *joinrel)
Definition foreign.c:830
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition foreign.c:289
ForeignServer * GetForeignServer(Oid serverid)
Definition foreign.c:114
int DateStyle
Definition globals.c:125
int IntervalStyle
Definition globals.c:127
int work_mem
Definition globals.c:131
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2775
int NewGUCNestLevel(void)
Definition guc.c:2142
bool parse_real(const char *value, double *result, int flags, const char **hintmsg)
Definition guc.c:2865
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition guc.c:2169
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition guc.c:3248
@ GUC_ACTION_SAVE
Definition guc.h:205
@ PGC_S_SESSION
Definition guc.h:126
@ PGC_USERSET
Definition guc.h:79
static int server_version_num
Definition guc_tables.c:605
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1037
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1384
#define SizeofHeapTupleHeader
static void HeapTupleHeaderSetCmin(HeapTupleHeaderData *tup, CommandId cid)
static void HeapTupleHeaderSetXmin(HeapTupleHeaderData *tup, TransactionId xid)
static void HeapTupleHeaderSetXmax(HeapTupleHeaderData *tup, TransactionId xid)
void parse(int)
Definition parse.c:49
#define stmt
static struct @174 value
Bitmapset * get_rel_all_updated_cols(PlannerInfo *root, RelOptInfo *rel)
Definition inherit.c:652
void InstrUpdateTupleCount(Instrumentation *instr, double nTuples)
Definition instrument.c:136
int j
Definition isn.c:78
int i
Definition isn.c:77
ItemPointerData * ItemPointer
Definition itemptr.h:49
#define PQgetvalue
#define PQclear
static libpqsrv_PGresult * libpqsrv_PGresultSetParent(libpqsrv_PGresult *bres, MemoryContext ctx)
#define PQcmdTuples
#define PQnfields
#define PQresultStatus
#define PQgetisnull
#define PQntuples
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131
@ PGRES_TUPLES_OK
Definition libpq-fe.h:134
#define PQ_QUERY_PARAM_MAX_LIMIT
Definition libpq-fe.h:517
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_delete(List *list, void *datum)
Definition list.c:853
List * list_concat(List *list1, const List *list2)
Definition list.c:561
List * list_copy(const List *oldlist)
Definition list.c:1573
List * lappend_int(List *list, int datum)
Definition list.c:357
bool list_member_ptr(const List *list, const void *datum)
Definition list.c:682
void list_free(List *list)
Definition list.c:1546
bool list_member_int(const List *list, int datum)
Definition list.c:702
bool list_member(const List *list, const void *datum)
Definition list.c:661
List * list_append_unique_ptr(List *list, void *datum)
Definition list.c:1356
#define NoLock
Definition lockdefs.h:34
char * get_rel_name(Oid relid)
Definition lsyscache.c:2148
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition lsyscache.c:3129
Oid get_rel_namespace(Oid relid)
Definition lsyscache.c:2172
Oid get_rel_type_id(Oid relid)
Definition lsyscache.c:2199
char * get_namespace_name_or_temp(Oid nspid)
Definition lsyscache.c:3612
Datum subpath(PG_FUNCTION_ARGS)
Definition ltree_op.c:311
Var * makeVar(int varno, AttrNumber varattno, Oid vartype, int32 vartypmod, Oid varcollid, Index varlevelsup)
Definition makefuncs.c:66
TargetEntry * makeTargetEntry(Expr *expr, AttrNumber resno, char *resname, bool resjunk)
Definition makefuncs.c:289
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc0(Size size)
Definition mcxt.c:1417
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
MemoryContext GetMemoryChunkContext(void *pointer)
Definition mcxt.c:756
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define ALLOCSET_SMALL_SIZES
Definition memutils.h:170
#define USE_ISO_DATES
Definition miscadmin.h:237
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define INTSTYLE_POSTGRES
Definition miscadmin.h:257
Oid GetUserId(void)
Definition miscinit.c:470
Oid exprType(const Node *expr)
Definition nodeFuncs.c:42
#define IsA(nodeptr, _type_)
Definition nodes.h:164
#define copyObject(obj)
Definition nodes.h:232
double Cost
Definition nodes.h:261
#define IS_OUTER_JOIN(jointype)
Definition nodes.h:348
OnConflictAction
Definition nodes.h:427
@ ONCONFLICT_NONE
Definition nodes.h:428
@ ONCONFLICT_NOTHING
Definition nodes.h:429
CmdType
Definition nodes.h:273
@ CMD_INSERT
Definition nodes.h:277
@ CMD_DELETE
Definition nodes.h:278
@ CMD_UPDATE
Definition nodes.h:276
@ CMD_SELECT
Definition nodes.h:275
double Selectivity
Definition nodes.h:260
@ AGGSPLIT_SIMPLE
Definition nodes.h:387
@ LIMIT_OPTION_WITH_TIES
Definition nodes.h:443
#define makeNode(_type_)
Definition nodes.h:161
#define castNode(_type_, nodeptr)
Definition nodes.h:182
JoinType
Definition nodes.h:298
@ JOIN_SEMI
Definition nodes.h:317
@ JOIN_FULL
Definition nodes.h:305
@ JOIN_INNER
Definition nodes.h:303
@ JOIN_RIGHT
Definition nodes.h:306
@ JOIN_LEFT
Definition nodes.h:304
static char * errmsg
#define PVC_RECURSE_PLACEHOLDERS
Definition optimizer.h:202
#define PVC_INCLUDE_PLACEHOLDERS
Definition optimizer.h:201
#define PVC_INCLUDE_AGGREGATES
Definition optimizer.h:197
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
@ FDW_IMPORT_SCHEMA_LIMIT_TO
@ FDW_IMPORT_SCHEMA_EXCEPT
@ RTE_RELATION
DropBehavior
#define rt_fetch(rangetable_index, rangetable)
Definition parsetree.h:31
PathKey * make_canonical_pathkey(PlannerInfo *root, EquivalenceClass *eclass, Oid opfamily, CompareType cmptype, bool nulls_first)
Definition pathkeys.c:56
void update_mergeclause_eclasses(PlannerInfo *root, RestrictInfo *restrictinfo)
Definition pathkeys.c:1510
bool pathkeys_contained_in(List *keys1, List *keys2)
Definition pathkeys.c:343
ForeignPath * create_foreign_upper_path(PlannerInfo *root, RelOptInfo *rel, PathTarget *target, double rows, int disabled_nodes, Cost startup_cost, Cost total_cost, List *pathkeys, Path *fdw_outerpath, List *fdw_restrictinfo, List *fdw_private)
Definition pathnode.c:2230
ProjectionPath * create_projection_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, PathTarget *target)
Definition pathnode.c:2587
SortPath * create_sort_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, List *pathkeys, double limit_tuples)
Definition pathnode.c:2904
ForeignPath * create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel, PathTarget *target, double rows, int disabled_nodes, Cost startup_cost, Cost total_cost, List *pathkeys, Relids required_outer, Path *fdw_outerpath, List *fdw_restrictinfo, List *fdw_private)
Definition pathnode.c:2128
ForeignPath * create_foreign_join_path(PlannerInfo *root, RelOptInfo *rel, PathTarget *target, double rows, int disabled_nodes, Cost startup_cost, Cost total_cost, List *pathkeys, Relids required_outer, Path *fdw_outerpath, List *fdw_restrictinfo, List *fdw_private)
Definition pathnode.c:2176
void add_path(RelOptInfo *parent_rel, Path *new_path)
Definition pathnode.c:459
void adjust_limit_rows_costs(double *rows, Cost *startup_cost, Cost *total_cost, int64 offset_est, int64 count_est)
Definition pathnode.c:3848
#define RINFO_IS_PUSHED_DOWN(rinfo, joinrelids)
Definition pathnodes.h:3045
@ PARTITIONWISE_AGGREGATE_FULL
Definition pathnodes.h:3635
@ PARTITIONWISE_AGGREGATE_NONE
Definition pathnodes.h:3634
#define IS_SIMPLE_REL(rel)
Definition pathnodes.h:977
#define IS_JOIN_REL(rel)
Definition pathnodes.h:982
#define get_pathtarget_sortgroupref(target, colno)
Definition pathnodes.h:1882
#define planner_rt_fetch(rti, root)
Definition pathnodes.h:692
UpperRelationKind
Definition pathnodes.h:143
@ UPPERREL_GROUP_AGG
Definition pathnodes.h:147
@ UPPERREL_FINAL
Definition pathnodes.h:152
@ UPPERREL_ORDERED
Definition pathnodes.h:151
@ RELOPT_BASEREL
Definition pathnodes.h:965
@ RELOPT_UPPER_REL
Definition pathnodes.h:969
@ RELOPT_JOINREL
Definition pathnodes.h:966
#define IS_OTHER_REL(rel)
Definition pathnodes.h:992
#define IS_UPPER_REL(rel)
Definition pathnodes.h:987
NameData attname
int16 attnum
FormData_pg_attribute * Form_pg_attribute
bool attnotnull
NameData relname
Definition pg_class.h:40
#define NAMEDATALEN
#define lfirst(lc)
Definition pg_list.h:172
#define lfirst_node(type, lc)
Definition pg_list.h:176
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define forboth(cell1, list1, cell2, list2)
Definition pg_list.h:518
#define lfirst_int(lc)
Definition pg_list.h:173
#define list_make5(x1, x2, x3, x4, x5)
Definition pg_list.h:222
#define list_make1(x1)
Definition pg_list.h:212
static void * list_nth(const List *list, int n)
Definition pg_list.h:299
#define linitial(l)
Definition pg_list.h:178
#define list_make3(x1, x2, x3)
Definition pg_list.h:216
#define list_nth_node(type, list, n)
Definition pg_list.h:327
#define linitial_oid(l)
Definition pg_list.h:180
#define list_make2(x1, x2)
Definition pg_list.h:214
#define list_make4(x1, x2, x3, x4)
Definition pg_list.h:219
static const struct lconv_member_info table[]
#define plan(x)
Definition pg_regress.c:161
static char * user
Definition pg_regress.c:119
uint64 fetch_size
Definition pg_rewind.c:85
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define outerPlan(node)
Definition plannodes.h:265
#define snprintf
Definition port.h:260
static Datum PointerGetDatum(const void *X)
Definition postgres.h:342
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
static Datum CStringGetDatum(const char *X)
Definition postgres.h:370
#define InvalidOid
unsigned int Oid
static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
#define DEFAULT_FDW_SORT_MULTIPLIER
static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot **slots, int numSlots)
static TupleTableSlot * apply_returning_filter(PgFdwDirectModifyState *dmstate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra)
static void postgresBeginForeignScan(ForeignScanState *node, int eflags)
static bool postgresIsForeignPathAsyncCapable(ForeignPath *path)
static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
static void create_cursor(ForeignScanState *node)
static void postgresExecForeignTruncate(List *rels, DropBehavior behavior, bool restart_seqs)
static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es)
static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
static TupleTableSlot ** postgresExecForeignBatchInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot **slots, TupleTableSlot **planSlots, int *numSlots)
static void deallocate_query(PgFdwModifyState *fmstate)
static void postgresGetForeignJoinPaths(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra)
static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
static TupleDesc get_tupdesc_for_join_scan_tuples(ForeignScanState *node)
static TupleTableSlot * postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
static bool postgresPlanDirectModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
static void conversion_error_callback(void *arg)
static void postgresReScanForeignScan(ForeignScanState *node)
static void prepare_foreign_modify(PgFdwModifyState *fmstate)
static void postgresForeignAsyncRequest(AsyncRequest *areq)
static int postgresIsForeignRelUpdatable(Relation rel)
void reset_transmission_modes(int nestlevel)
int set_transmission_modes(void)
static double postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
FdwDirectModifyPrivateIndex
@ FdwDirectModifyPrivateSetProcessed
@ FdwDirectModifyPrivateHasReturning
@ FdwDirectModifyPrivateRetrievedAttrs
@ FdwDirectModifyPrivateUpdateSql
static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
static ForeignScan * postgresGetForeignPlan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan)
static void postgresEndForeignScan(ForeignScanState *node)
static void postgresGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
static void add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel, GroupPathExtraData *extra)
static List * postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
static void estimate_path_cost_size(PlannerInfo *root, RelOptInfo *foreignrel, List *param_join_conds, List *pathkeys, PgFdwPathExtraData *fpextra, double *p_rows, int *p_width, int *p_disabled_nodes, Cost *p_startup_cost, Cost *p_total_cost)
static void fetch_more_data(ForeignScanState *node)
static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values)
static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo)
static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
static ForeignScan * find_modifytable_subplan(PlannerInfo *root, ModifyTable *plan, Index rtindex, int subplan_index)
static void add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *ordered_rel)
static bool semijoin_target_ok(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel)
static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i)
static void postgresAddForeignUpdateTargets(PlannerInfo *root, Index rtindex, RangeTblEntry *target_rte, Relation target_relation)
static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context)
static void postgresEndDirectModify(ForeignScanState *node)
static void get_remote_estimate(const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost)
static void postgresForeignAsyncConfigureWait(AsyncRequest *areq)
EquivalenceMember * find_em_for_rel_target(PlannerInfo *root, EquivalenceClass *ec, RelOptInfo *rel)
static void postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra)
static void apply_server_options(PgFdwRelationInfo *fpinfo)
static void close_cursor(PGconn *conn, unsigned int cursor_number, PgFdwConnState *conn_state)
FdwPathPrivateIndex
@ FdwPathPrivateHasLimit
@ FdwPathPrivateHasFinalSort
static int get_batch_size_option(Relation rel)
static void postgresForeignAsyncNotify(AsyncRequest *areq)
static void adjust_foreign_grouping_path_cost(PlannerInfo *root, List *pathkeys, double retrieved_rows, double width, double limit_tuples, int *p_disabled_nodes, Cost *p_startup_cost, Cost *p_run_cost)
static void add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *final_rel, FinalPathExtraData *extra)
static void fetch_more_data_begin(AsyncRequest *areq)
static void execute_dml_stmt(ForeignScanState *node)
static TupleTableSlot * postgresIterateForeignScan(ForeignScanState *node)
static TupleTableSlot * postgresExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, Node *havingQual)
FdwScanPrivateIndex
@ FdwScanPrivateRetrievedAttrs
@ FdwScanPrivateSelectSql
@ FdwScanPrivateFetchSize
@ FdwScanPrivateRelations
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg)
FdwModifyPrivateIndex
@ FdwModifyPrivateLen
@ FdwModifyPrivateUpdateSql
@ FdwModifyPrivateTargetAttnums
@ FdwModifyPrivateRetrievedAttrs
@ FdwModifyPrivateHasReturning
static TupleTableSlot ** execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, TupleTableSlot **slots, TupleTableSlot **planSlots, int *numSlots)
static PgFdwModifyState * create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *resultRelInfo, CmdType operation, Plan *subplan, char *query, List *target_attrs, int values_end, bool has_returning, List *retrieved_attrs)
static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path, List *restrictlist)
static bool postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
static List * postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
static void complete_pending_request(AsyncRequest *areq)
static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages)
static List * build_remote_returning(Index rtindex, Relation rel, List *returningList)
static TupleTableSlot * get_returning_data(ForeignScanState *node)
void process_pending_request(AsyncRequest *areq)
#define DEFAULT_FDW_TUPLE_COST
static void postgresBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, List *fdw_private, int subplan_index, int eflags)
static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values)
static void postgresBeginForeignInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo)
static void init_returning_filter(PgFdwDirectModifyState *dmstate, List *fdw_scan_tlist, Index rtindex)
static TupleTableSlot * postgresIterateDirectModify(ForeignScanState *node)
static void postgresBeginDirectModify(ForeignScanState *node, int eflags)
static List * get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
Datum postgres_fdw_handler(PG_FUNCTION_ARGS)
static TupleTableSlot * postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
static void apply_table_options(PgFdwRelationInfo *fpinfo)
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows)
static void postgresEndForeignInsert(EState *estate, ResultRelInfo *resultRelInfo)
EquivalenceMember * find_em_for_rel(PlannerInfo *root, EquivalenceClass *ec, RelOptInfo *rel)
static void postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
#define DEFAULT_FDW_STARTUP_COST
static List * get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
static void finish_foreign_modify(PgFdwModifyState *fmstate)
bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo)
Definition shippable.c:163
PgFdwSamplingMethod
@ ANALYZE_SAMPLE_AUTO
@ ANALYZE_SAMPLE_OFF
@ ANALYZE_SAMPLE_BERNOULLI
@ ANALYZE_SAMPLE_SYSTEM
@ ANALYZE_SAMPLE_RANDOM
void get_agg_clause_costs(PlannerInfo *root, AggSplit aggsplit, AggClauseCosts *costs)
Definition prepagg.c:559
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
tree ctl root
Definition radixtree.h:1857
#define RelationGetRelid(relation)
Definition rel.h:514
#define RelationGetDescr(relation)
Definition rel.h:540
#define RelationGetRelationName(relation)
Definition rel.h:548
RelOptInfo * find_base_rel(PlannerInfo *root, int relid)
Definition relnode.c:544
RelOptInfo * find_join_rel(PlannerInfo *root, Relids relids)
Definition relnode.c:657
ParamPathInfo * get_baserel_parampathinfo(PlannerInfo *root, RelOptInfo *baserel, Relids required_outer)
Definition relnode.c:1704
List * extract_actual_clauses(List *restrictinfo_list, bool pseudoconstant)
bool join_clause_is_movable_to(RestrictInfo *rinfo, RelOptInfo *baserel)
RestrictInfo * make_restrictinfo(PlannerInfo *root, Expr *clause, bool is_pushed_down, bool has_clone, bool is_clone, bool pseudoconstant, Index security_level, Relids required_relids, Relids incompatible_relids, Relids outer_relids)
const char * quote_identifier(const char *ident)
void reservoir_init_selection_state(ReservoirState rs, int n)
Definition sampling.c:133
double sampler_random_fract(pg_prng_state *randstate)
Definition sampling.c:241
double reservoir_get_next_S(ReservoirState rs, double t, int n)
Definition sampling.c:147
double estimate_num_groups(PlannerInfo *root, List *groupExprs, double input_rows, List **pgset, EstimationInfo *estinfo)
Definition selfuncs.c:3788
PGconn * GetConnection(void)
Definition streamutil.c:60
PGconn * conn
Definition streamutil.c:52
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Bitmapset * as_needrequest
Definition execnodes.h:1521
struct WaitEventSet * as_eventset
Definition execnodes.h:1522
PlanState * requestor
Definition execnodes.h:651
PlanState * requestee
Definition execnodes.h:652
FmgrInfo * attinfuncs
Definition funcapi.h:41
Oid * attioparams
Definition funcapi.h:44
int32 * atttypmods
Definition funcapi.h:47
bool attgenerated
Definition tupdesc.h:79
bool attisdropped
Definition tupdesc.h:78
ForeignScanState * fsstate
char * defname
Definition parsenodes.h:857
uint64 es_processed
Definition execnodes.h:726
List * es_range_table
Definition execnodes.h:674
MemoryContext es_query_cxt
Definition execnodes.h:722
struct ErrorContextCallback * previous
Definition elog.h:297
void(* callback)(void *arg)
Definition elog.h:298
List * rtable_names
MemoryContext ecxt_per_tuple_memory
Definition execnodes.h:292
TupleTableSlot * ecxt_scantuple
Definition execnodes.h:284
EndForeignInsert_function EndForeignInsert
Definition fdwapi.h:239
ReScanForeignScan_function ReScanForeignScan
Definition fdwapi.h:214
BeginForeignInsert_function BeginForeignInsert
Definition fdwapi.h:238
RecheckForeignScan_function RecheckForeignScan
Definition fdwapi.h:249
AddForeignUpdateTargets_function AddForeignUpdateTargets
Definition fdwapi.h:229
BeginForeignModify_function BeginForeignModify
Definition fdwapi.h:231
EndForeignModify_function EndForeignModify
Definition fdwapi.h:237
BeginDirectModify_function BeginDirectModify
Definition fdwapi.h:242
PlanForeignModify_function PlanForeignModify
Definition fdwapi.h:230
PlanDirectModify_function PlanDirectModify
Definition fdwapi.h:241
ExecForeignInsert_function ExecForeignInsert
Definition fdwapi.h:232
BeginForeignScan_function BeginForeignScan
Definition fdwapi.h:212
ForeignAsyncRequest_function ForeignAsyncRequest
Definition fdwapi.h:278
IterateDirectModify_function IterateDirectModify
Definition fdwapi.h:243
ExecForeignUpdate_function ExecForeignUpdate
Definition fdwapi.h:235
GetForeignJoinPaths_function GetForeignJoinPaths
Definition fdwapi.h:223
ExecForeignBatchInsert_function ExecForeignBatchInsert
Definition fdwapi.h:233
GetForeignPaths_function GetForeignPaths
Definition fdwapi.h:210
GetForeignModifyBatchSize_function GetForeignModifyBatchSize
Definition fdwapi.h:234
GetForeignRelSize_function GetForeignRelSize
Definition fdwapi.h:209
ExplainForeignScan_function ExplainForeignScan
Definition fdwapi.h:252
EndForeignScan_function EndForeignScan
Definition fdwapi.h:215
AnalyzeForeignTable_function AnalyzeForeignTable
Definition fdwapi.h:257
EndDirectModify_function EndDirectModify
Definition fdwapi.h:244
ExplainForeignModify_function ExplainForeignModify
Definition fdwapi.h:253
IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable
Definition fdwapi.h:277
IterateForeignScan_function IterateForeignScan
Definition fdwapi.h:213
ForeignAsyncNotify_function ForeignAsyncNotify
Definition fdwapi.h:280
ImportForeignSchema_function ImportForeignSchema
Definition fdwapi.h:260
GetForeignPlan_function GetForeignPlan
Definition fdwapi.h:211
ExecForeignDelete_function ExecForeignDelete
Definition fdwapi.h:236
ExecForeignTruncate_function ExecForeignTruncate
Definition fdwapi.h:263
ExplainDirectModify_function ExplainDirectModify
Definition fdwapi.h:254
IsForeignRelUpdatable_function IsForeignRelUpdatable
Definition fdwapi.h:240
GetForeignUpperPaths_function GetForeignUpperPaths
Definition fdwapi.h:226
ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait
Definition fdwapi.h:279
Cardinality limit_tuples
Definition pathnodes.h:3680
ResultRelInfo * resultRelInfo
Definition execnodes.h:2070
List * options
Definition foreign.h:43
char * servername
Definition foreign.h:40
PartitionwiseAggregateType patype
Definition pathnodes.h:3664
ItemPointerData t_self
Definition htup.h:65
HeapTupleHeader t_data
Definition htup.h:68
ItemPointerData t_ctid
double tuplecount
Definition instrument.h:82
SpecialJoinInfo * sjinfo
Definition pathnodes.h:3596
Definition pg_list.h:54
ResultRelInfo * resultRelInfo
Definition execnodes.h:1420
Definition nodes.h:135
List * exprs
Definition pathnodes.h:1866
QualCost cost
Definition pathnodes.h:1872
List * pathkeys
Definition pathnodes.h:1999
Cardinality rows
Definition pathnodes.h:1993
Cost startup_cost
Definition pathnodes.h:1995
int disabled_nodes
Definition pathnodes.h:1994
Cost total_cost
Definition pathnodes.h:1996
ReservoirStateData rstate
AttInMetadata * attinmeta
MemoryContext anl_cxt
MemoryContext temp_cxt
AsyncRequest * pendingAreq
PgFdwConnState * conn_state
const char ** param_values
AttInMetadata * attinmeta
MemoryContext temp_cxt
AttInMetadata * attinmeta
FmgrInfo * p_flinfo
PgFdwConnState * conn_state
AttrNumber ctidAttno
struct PgFdwModifyState * aux_fmstate
PathTarget * target
List * retrieved_attrs
FmgrInfo * param_flinfo
const char ** param_values
AttInMetadata * attinmeta
MemoryContext batch_cxt
unsigned int cursor_number
MemoryContext temp_cxt
TupleDesc tupdesc
PgFdwConnState * conn_state
HeapTuple * tuples
Instrumentation * instrument
Definition execnodes.h:1187
Plan * plan
Definition execnodes.h:1177
EState * state
Definition execnodes.h:1179
Bitmapset * chgParam
Definition execnodes.h:1209
ExprContext * ps_ExprContext
Definition execnodes.h:1216
bool async_capable
Definition execnodes.h:1219
List * qual
Definition plannodes.h:235
List * targetlist
Definition plannodes.h:233
ExprContext * pi_exprContext
Definition execnodes.h:399
Cost per_tuple
Definition pathnodes.h:121
Cost startup
Definition pathnodes.h:120
List * groupClause
Definition parsenodes.h:216
List * groupingSets
Definition parsenodes.h:220
char * relname
Definition primnodes.h:84
List * joininfo
Definition pathnodes.h:1136
Relids relids
Definition pathnodes.h:1009
struct PathTarget * reltarget
Definition pathnodes.h:1033
Index relid
Definition pathnodes.h:1057
Cardinality tuples
Definition pathnodes.h:1084
Relids top_parent_relids
Definition pathnodes.h:1162
BlockNumber pages
Definition pathnodes.h:1083
Relids lateral_relids
Definition pathnodes.h:1052
RelOptKind reloptkind
Definition pathnodes.h:1003
QualCost baserestrictcost
Definition pathnodes.h:1132
bool has_eclass_joins
Definition pathnodes.h:1138
Cardinality rows
Definition pathnodes.h:1015
TriggerDesc * trigdesc
Definition rel.h:117
Form_pg_class rd_rel
Definition rel.h:111
pg_prng_state randstate
Definition sampling.h:49
Expr * clause
Definition pathnodes.h:2888
struct ResultRelInfo * ri_RootResultRelInfo
Definition execnodes.h:630
Relation ri_RelationDesc
Definition execnodes.h:492
List * ri_WithCheckOptions
Definition execnodes.h:561
TriggerDesc * ri_TrigDesc
Definition execnodes.h:527
Index ri_RangeTableIndex
Definition execnodes.h:489
void * ri_FdwState
Definition execnodes.h:548
ProjectionInfo * ri_projectReturning
Definition execnodes.h:589
List * ri_returningList
Definition execnodes.h:586
bool ri_usesFdwDirectModify
Definition execnodes.h:551
Relation ss_currentRelation
Definition execnodes.h:1634
TupleTableSlot * ss_ScanTupleSlot
Definition execnodes.h:1636
PlanState ps
Definition execnodes.h:1633
bool trig_insert_after_row
Definition reltrigger.h:57
bool trig_update_before_row
Definition reltrigger.h:61
bool trig_insert_before_row
Definition reltrigger.h:56
TupleDesc tts_tupleDescriptor
Definition tuptable.h:129
bool * tts_isnull
Definition tuptable.h:133
Datum * tts_values
Definition tuptable.h:131
AttrNumber varattno
Definition primnodes.h:275
int varno
Definition primnodes.h:270
#define FirstLowInvalidHeapAttributeNumber
Definition sysattr.h:27
#define SelfItemPointerAttributeNumber
Definition sysattr.h:21
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
Datum tidin(PG_FUNCTION_ARGS)
Definition tid.c:51
TargetEntry * tlist_member(Expr *node, List *targetlist)
Definition tlist.c:88
SortGroupClause * get_sortgroupref_clause_noerr(Index sortref, List *clauses)
Definition tlist.c:452
bool grouping_is_sortable(List *groupClause)
Definition tlist.c:549
PathTarget * copy_pathtarget(PathTarget *src)
Definition tlist.c:666
void add_new_columns_to_pathtarget(PathTarget *target, List *exprs)
Definition tlist.c:761
List * get_sortgrouplist_exprs(List *sgClauses, List *targetList)
Definition tlist.c:401
List * add_to_flat_tlist(List *tlist, List *exprs)
Definition tlist.c:141
#define InvalidTransactionId
Definition transam.h:31
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition tupdesc.c:242
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:178
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:193
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:417
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
#define TupIsNull(slot)
Definition tuptable.h:325
static void slot_getallattrs(TupleTableSlot *slot)
Definition tuptable.h:390
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:543
Integer * makeInteger(int i)
Definition value.c:23
String * makeString(char *str)
Definition value.c:63
Boolean * makeBoolean(bool val)
Definition value.c:49
#define boolVal(v)
Definition value.h:81
#define intVal(v)
Definition value.h:79
#define strVal(v)
Definition value.h:82
List * pull_var_clause(Node *node, int flags)
Definition var.c:653
void pull_varattnos(Node *node, Index varno, Bitmapset **varattnos)
Definition var.c:296
const char * name
int GetNumRegisteredWaitEvents(WaitEventSet *set)
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
#define WL_SOCKET_READABLE