PostgreSQL Source Code  git master
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "commands/defrem.h"
22 #include "commands/explain.h"
23 #include "commands/vacuum.h"
24 #include "foreign/fdwapi.h"
25 #include "funcapi.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "nodes/nodeFuncs.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/optimizer.h"
32 #include "optimizer/pathnode.h"
33 #include "optimizer/paths.h"
34 #include "optimizer/planmain.h"
35 #include "optimizer/restrictinfo.h"
36 #include "optimizer/tlist.h"
37 #include "parser/parsetree.h"
38 #include "utils/builtins.h"
39 #include "utils/float.h"
40 #include "utils/guc.h"
41 #include "utils/lsyscache.h"
42 #include "utils/memutils.h"
43 #include "utils/rel.h"
44 #include "utils/sampling.h"
45 #include "utils/selfuncs.h"
46 
48 
49 /* Default CPU cost to start up a foreign query. */
50 #define DEFAULT_FDW_STARTUP_COST 100.0
51 
52 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
53 #define DEFAULT_FDW_TUPLE_COST 0.01
54 
55 /* If no remote estimates, assume a sort costs 20% extra */
56 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
57 
58 /*
59  * Indexes of FDW-private information stored in fdw_private lists.
60  *
61  * These items are indexed with the enum FdwScanPrivateIndex, so an item
62  * can be fetched with list_nth(). For example, to get the SELECT statement:
63  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
64  */
66 {
67  /* SQL statement to execute remotely (as a String node) */
69  /* Integer list of attribute numbers retrieved by the SELECT */
71  /* Integer representing the desired fetch_size */
73 
74  /*
75  * String describing join i.e. names of relations being joined and types
76  * of join, added when the scan is join
77  */
79 };
80 
81 /*
82  * Similarly, this enum describes what's kept in the fdw_private list for
83  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
84  *
85  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
86  * 2) Integer list of target attribute numbers for INSERT/UPDATE
87  * (NIL for a DELETE)
88  * 3) Boolean flag showing if the remote query has a RETURNING clause
89  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
90  */
92 {
93  /* SQL statement to execute remotely (as a String node) */
95  /* Integer list of target attribute numbers for INSERT/UPDATE */
97  /* has-returning flag (as an integer Value node) */
99  /* Integer list of attribute numbers retrieved by RETURNING */
101 };
102 
103 /*
104  * Similarly, this enum describes what's kept in the fdw_private list for
105  * a ForeignScan node that modifies a foreign table directly. We store:
106  *
107  * 1) UPDATE/DELETE statement text to be sent to the remote server
108  * 2) Boolean flag showing if the remote query has a RETURNING clause
109  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
110  * 4) Boolean flag showing if we set the command es_processed
111  */
113 {
114  /* SQL statement to execute remotely (as a String node) */
116  /* has-returning flag (as an integer Value node) */
118  /* Integer list of attribute numbers retrieved by RETURNING */
120  /* set-processed flag (as an integer Value node) */
122 };
123 
124 /*
125  * Execution state of a foreign scan using postgres_fdw.
126  */
127 typedef struct PgFdwScanState
128 {
129  Relation rel; /* relcache entry for the foreign table. NULL
130  * for a foreign join scan. */
131  TupleDesc tupdesc; /* tuple descriptor of scan */
132  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
133 
134  /* extracted fdw_private data */
135  char *query; /* text of SELECT command */
136  List *retrieved_attrs; /* list of retrieved attribute numbers */
137 
138  /* for remote query execution */
139  PGconn *conn; /* connection for the scan */
140  unsigned int cursor_number; /* quasi-unique ID for my cursor */
141  bool cursor_exists; /* have we created the cursor? */
142  int numParams; /* number of parameters passed to query */
143  FmgrInfo *param_flinfo; /* output conversion functions for them */
144  List *param_exprs; /* executable expressions for param values */
145  const char **param_values; /* textual values of query parameters */
146 
147  /* for storing result tuples */
148  HeapTuple *tuples; /* array of currently-retrieved tuples */
149  int num_tuples; /* # of tuples in array */
150  int next_tuple; /* index of next one to return */
151 
152  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
153  int fetch_ct_2; /* Min(# of fetches done, 2) */
154  bool eof_reached; /* true if last fetch reached EOF */
155 
156  /* working memory contexts */
157  MemoryContext batch_cxt; /* context holding current batch of tuples */
158  MemoryContext temp_cxt; /* context for per-tuple temporary data */
159 
160  int fetch_size; /* number of tuples per fetch */
162 
163 /*
164  * Execution state of a foreign insert/update/delete operation.
165  */
166 typedef struct PgFdwModifyState
167 {
168  Relation rel; /* relcache entry for the foreign table */
169  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
170 
171  /* for remote query execution */
172  PGconn *conn; /* connection for the scan */
173  char *p_name; /* name of prepared statement, if created */
174 
175  /* extracted fdw_private data */
176  char *query; /* text of INSERT/UPDATE/DELETE command */
177  List *target_attrs; /* list of target attribute numbers */
178  bool has_returning; /* is there a RETURNING clause? */
179  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
180 
181  /* info about parameters for prepared statement */
182  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
183  int p_nums; /* number of parameters to transmit */
184  FmgrInfo *p_flinfo; /* output conversion functions for them */
185 
186  /* working memory context */
187  MemoryContext temp_cxt; /* context for per-tuple temporary data */
188 
189  /* for update row movement if subplan result rel */
190  struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
191  * created */
193 
194 /*
195  * Execution state of a foreign scan that modifies a foreign table directly.
196  */
198 {
199  Relation rel; /* relcache entry for the foreign table */
200  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
201 
202  /* extracted fdw_private data */
203  char *query; /* text of UPDATE/DELETE command */
204  bool has_returning; /* is there a RETURNING clause? */
205  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
206  bool set_processed; /* do we set the command es_processed? */
207 
208  /* for remote query execution */
209  PGconn *conn; /* connection for the update */
210  int numParams; /* number of parameters passed to query */
211  FmgrInfo *param_flinfo; /* output conversion functions for them */
212  List *param_exprs; /* executable expressions for param values */
213  const char **param_values; /* textual values of query parameters */
214 
215  /* for storing result tuples */
216  PGresult *result; /* result for query */
217  int num_tuples; /* # of result tuples */
218  int next_tuple; /* index of next one to return */
219  Relation resultRel; /* relcache entry for the target relation */
220  AttrNumber *attnoMap; /* array of attnums of input user columns */
221  AttrNumber ctidAttno; /* attnum of input ctid column */
222  AttrNumber oidAttno; /* attnum of input oid column */
223  bool hasSystemCols; /* are there system columns of resultRel? */
224 
225  /* working memory context */
226  MemoryContext temp_cxt; /* context for per-tuple temporary data */
228 
229 /*
230  * Workspace for analyzing a foreign table.
231  */
232 typedef struct PgFdwAnalyzeState
233 {
234  Relation rel; /* relcache entry for the foreign table */
235  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
236  List *retrieved_attrs; /* attr numbers retrieved by query */
237 
238  /* collected sample rows */
239  HeapTuple *rows; /* array of size targrows */
240  int targrows; /* target # of sample rows */
241  int numrows; /* # of sample rows collected */
242 
243  /* for random sampling */
244  double samplerows; /* # of rows fetched */
245  double rowstoskip; /* # of rows to skip before next sample */
246  ReservoirStateData rstate; /* state for reservoir sampling */
247 
248  /* working memory contexts */
249  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
250  MemoryContext temp_cxt; /* context for per-tuple temporary data */
252 
253 /*
254  * This enum describes what's kept in the fdw_private list for a ForeignPath.
255  * We store:
256  *
257  * 1) Boolean flag showing if the remote query has the final sort
258  * 2) Boolean flag showing if the remote query has the LIMIT clause
259  */
261 {
262  /* has-final-sort flag (as an integer Value node) */
264  /* has-limit flag (as an integer Value node) */
266 };
267 
268 /* Struct for extra information passed to estimate_path_cost_size() */
269 typedef struct
270 {
273  bool has_limit;
274  double limit_tuples;
275  int64 count_est;
276  int64 offset_est;
278 
279 /*
280  * Identify the attribute where data conversion fails.
281  */
282 typedef struct ConversionLocation
283 {
284  Relation rel; /* foreign table's relcache entry. */
285  AttrNumber cur_attno; /* attribute number being processed, or 0 */
286 
287  /*
288  * In case of foreign join push down, fdw_scan_tlist is used to identify
289  * the Var node corresponding to the error location and
290  * fsstate->ss.ps.state gives access to the RTEs of corresponding relation
291  * to get the relation name and attribute name.
292  */
295 
296 /* Callback argument for ec_member_matches_foreign */
297 typedef struct
298 {
299  Expr *current; /* current expr, or NULL if not yet found */
300  List *already_used; /* expressions already dealt with */
302 
303 /*
304  * SQL functions
305  */
307 
308 /*
309  * FDW callback routines
310  */
311 static void postgresGetForeignRelSize(PlannerInfo *root,
312  RelOptInfo *baserel,
313  Oid foreigntableid);
314 static void postgresGetForeignPaths(PlannerInfo *root,
315  RelOptInfo *baserel,
316  Oid foreigntableid);
318  RelOptInfo *foreignrel,
319  Oid foreigntableid,
320  ForeignPath *best_path,
321  List *tlist,
322  List *scan_clauses,
323  Plan *outer_plan);
324 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
327 static void postgresEndForeignScan(ForeignScanState *node);
328 static void postgresAddForeignUpdateTargets(Query *parsetree,
329  RangeTblEntry *target_rte,
330  Relation target_relation);
332  ModifyTable *plan,
333  Index resultRelation,
334  int subplan_index);
335 static void postgresBeginForeignModify(ModifyTableState *mtstate,
336  ResultRelInfo *resultRelInfo,
337  List *fdw_private,
338  int subplan_index,
339  int eflags);
341  ResultRelInfo *resultRelInfo,
342  TupleTableSlot *slot,
343  TupleTableSlot *planSlot);
345  ResultRelInfo *resultRelInfo,
346  TupleTableSlot *slot,
347  TupleTableSlot *planSlot);
349  ResultRelInfo *resultRelInfo,
350  TupleTableSlot *slot,
351  TupleTableSlot *planSlot);
352 static void postgresEndForeignModify(EState *estate,
353  ResultRelInfo *resultRelInfo);
354 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
355  ResultRelInfo *resultRelInfo);
356 static void postgresEndForeignInsert(EState *estate,
357  ResultRelInfo *resultRelInfo);
359 static bool postgresPlanDirectModify(PlannerInfo *root,
360  ModifyTable *plan,
361  Index resultRelation,
362  int subplan_index);
363 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
365 static void postgresEndDirectModify(ForeignScanState *node);
367  ExplainState *es);
369  ResultRelInfo *rinfo,
370  List *fdw_private,
371  int subplan_index,
372  ExplainState *es);
374  ExplainState *es);
375 static bool postgresAnalyzeForeignTable(Relation relation,
376  AcquireSampleRowsFunc *func,
377  BlockNumber *totalpages);
379  Oid serverOid);
380 static void postgresGetForeignJoinPaths(PlannerInfo *root,
381  RelOptInfo *joinrel,
382  RelOptInfo *outerrel,
383  RelOptInfo *innerrel,
384  JoinType jointype,
385  JoinPathExtraData *extra);
387  TupleTableSlot *slot);
388 static void postgresGetForeignUpperPaths(PlannerInfo *root,
389  UpperRelationKind stage,
390  RelOptInfo *input_rel,
391  RelOptInfo *output_rel,
392  void *extra);
393 
394 /*
395  * Helper functions
396  */
397 static void estimate_path_cost_size(PlannerInfo *root,
398  RelOptInfo *foreignrel,
399  List *param_join_conds,
400  List *pathkeys,
401  PgFdwPathExtraData *fpextra,
402  double *p_rows, int *p_width,
403  Cost *p_startup_cost, Cost *p_total_cost);
404 static void get_remote_estimate(const char *sql,
405  PGconn *conn,
406  double *rows,
407  int *width,
408  Cost *startup_cost,
409  Cost *total_cost);
411  List *pathkeys,
412  double retrieved_rows,
413  double width,
414  double limit_tuples,
415  Cost *p_startup_cost,
416  Cost *p_run_cost);
419  void *arg);
420 static void create_cursor(ForeignScanState *node);
421 static void fetch_more_data(ForeignScanState *node);
422 static void close_cursor(PGconn *conn, unsigned int cursor_number);
424  RangeTblEntry *rte,
425  ResultRelInfo *resultRelInfo,
426  CmdType operation,
427  Plan *subplan,
428  char *query,
429  List *target_attrs,
430  bool has_returning,
433  ResultRelInfo *resultRelInfo,
434  CmdType operation,
435  TupleTableSlot *slot,
436  TupleTableSlot *planSlot);
437 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
438 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
439  ItemPointer tupleid,
440  TupleTableSlot *slot);
441 static void store_returning_result(PgFdwModifyState *fmstate,
442  TupleTableSlot *slot, PGresult *res);
443 static void finish_foreign_modify(PgFdwModifyState *fmstate);
444 static List *build_remote_returning(Index rtindex, Relation rel,
445  List *returningList);
446 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
447 static void execute_dml_stmt(ForeignScanState *node);
449 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
450  List *fdw_scan_tlist,
451  Index rtindex);
453  TupleTableSlot *slot,
454  EState *estate);
455 static void prepare_query_params(PlanState *node,
456  List *fdw_exprs,
457  int numParams,
459  List **param_exprs,
460  const char ***param_values);
461 static void process_query_params(ExprContext *econtext,
463  List *param_exprs,
464  const char **param_values);
465 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
466  HeapTuple *rows, int targrows,
467  double *totalrows,
468  double *totaldeadrows);
469 static void analyze_row_processor(PGresult *res, int row,
470  PgFdwAnalyzeState *astate);
472  int row,
473  Relation rel,
476  ForeignScanState *fsstate,
477  MemoryContext temp_context);
478 static void conversion_error_callback(void *arg);
479 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
480  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
481  JoinPathExtraData *extra);
482 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
483  Node *havingQual);
485  RelOptInfo *rel);
488  Path *epq_path);
489 static void add_foreign_grouping_paths(PlannerInfo *root,
490  RelOptInfo *input_rel,
491  RelOptInfo *grouped_rel,
492  GroupPathExtraData *extra);
493 static void add_foreign_ordered_paths(PlannerInfo *root,
494  RelOptInfo *input_rel,
495  RelOptInfo *ordered_rel);
496 static void add_foreign_final_paths(PlannerInfo *root,
497  RelOptInfo *input_rel,
498  RelOptInfo *final_rel,
499  FinalPathExtraData *extra);
500 static void apply_server_options(PgFdwRelationInfo *fpinfo);
501 static void apply_table_options(PgFdwRelationInfo *fpinfo);
502 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
503  const PgFdwRelationInfo *fpinfo_o,
504  const PgFdwRelationInfo *fpinfo_i);
505 
506 
507 /*
508  * Foreign-data wrapper handler function: return a struct with pointers
509  * to my callback routines.
510  */
511 Datum
513 {
514  FdwRoutine *routine = makeNode(FdwRoutine);
515 
516  /* Functions for scanning foreign tables */
524 
525  /* Functions for updating foreign tables */
540 
541  /* Function for EvalPlanQual rechecks */
543  /* Support functions for EXPLAIN */
547 
548  /* Support functions for ANALYZE */
550 
551  /* Support functions for IMPORT FOREIGN SCHEMA */
553 
554  /* Support functions for join push-down */
556 
557  /* Support functions for upper relation push-down */
559 
560  PG_RETURN_POINTER(routine);
561 }
562 
563 /*
564  * postgresGetForeignRelSize
565  * Estimate # of rows and width of the result of the scan
566  *
567  * We should consider the effect of all baserestrictinfo clauses here, but
568  * not any join clauses.
569  */
570 static void
572  RelOptInfo *baserel,
573  Oid foreigntableid)
574 {
575  PgFdwRelationInfo *fpinfo;
576  ListCell *lc;
577  RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
578  const char *namespace;
579  const char *relname;
580  const char *refname;
581 
582  /*
583  * We use PgFdwRelationInfo to pass various information to subsequent
584  * functions.
585  */
586  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
587  baserel->fdw_private = (void *) fpinfo;
588 
589  /* Base foreign tables need to be pushed down always. */
590  fpinfo->pushdown_safe = true;
591 
592  /* Look up foreign-table catalog info. */
593  fpinfo->table = GetForeignTable(foreigntableid);
594  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
595 
596  /*
597  * Extract user-settable option values. Note that per-table setting of
598  * use_remote_estimate overrides per-server setting.
599  */
600  fpinfo->use_remote_estimate = false;
603  fpinfo->shippable_extensions = NIL;
604  fpinfo->fetch_size = 100;
605 
606  apply_server_options(fpinfo);
607  apply_table_options(fpinfo);
608 
609  /*
610  * If the table or the server is configured to use remote estimates,
611  * identify which user to do remote access as during planning. This
612  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
613  * permissions, the query would have failed at runtime anyway.
614  */
615  if (fpinfo->use_remote_estimate)
616  {
617  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
618 
619  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
620  }
621  else
622  fpinfo->user = NULL;
623 
624  /*
625  * Identify which baserestrictinfo clauses can be sent to the remote
626  * server and which can't.
627  */
628  classifyConditions(root, baserel, baserel->baserestrictinfo,
629  &fpinfo->remote_conds, &fpinfo->local_conds);
630 
631  /*
632  * Identify which attributes will need to be retrieved from the remote
633  * server. These include all attrs needed for joins or final output, plus
634  * all attrs used in the local_conds. (Note: if we end up using a
635  * parameterized scan, it's possible that some of the join clauses will be
636  * sent to the remote and thus we wouldn't really need to retrieve the
637  * columns used in them. Doesn't seem worth detecting that case though.)
638  */
639  fpinfo->attrs_used = NULL;
640  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
641  &fpinfo->attrs_used);
642  foreach(lc, fpinfo->local_conds)
643  {
644  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
645 
646  pull_varattnos((Node *) rinfo->clause, baserel->relid,
647  &fpinfo->attrs_used);
648  }
649 
650  /*
651  * Compute the selectivity and cost of the local_conds, so we don't have
652  * to do it over again for each path. The best we can do for these
653  * conditions is to estimate selectivity on the basis of local statistics.
654  */
656  fpinfo->local_conds,
657  baserel->relid,
658  JOIN_INNER,
659  NULL);
660 
661  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
662 
663  /*
664  * Set # of retrieved rows and cached relation costs to some negative
665  * value, so that we can detect when they are set to some sensible values,
666  * during one (usually the first) of the calls to estimate_path_cost_size.
667  */
668  fpinfo->retrieved_rows = -1;
669  fpinfo->rel_startup_cost = -1;
670  fpinfo->rel_total_cost = -1;
671 
672  /*
673  * If the table or the server is configured to use remote estimates,
674  * connect to the foreign server and execute EXPLAIN to estimate the
675  * number of rows selected by the restriction clauses, as well as the
676  * average row width. Otherwise, estimate using whatever statistics we
677  * have locally, in a way similar to ordinary tables.
678  */
679  if (fpinfo->use_remote_estimate)
680  {
681  /*
682  * Get cost/size estimates with help of remote server. Save the
683  * values in fpinfo so we don't need to do it again to generate the
684  * basic foreign path.
685  */
686  estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
687  &fpinfo->rows, &fpinfo->width,
688  &fpinfo->startup_cost, &fpinfo->total_cost);
689 
690  /* Report estimated baserel size to planner. */
691  baserel->rows = fpinfo->rows;
692  baserel->reltarget->width = fpinfo->width;
693  }
694  else
695  {
696  /*
697  * If the foreign table has never been ANALYZEd, it will have relpages
698  * and reltuples equal to zero, which most likely has nothing to do
699  * with reality. We can't do a whole lot about that if we're not
700  * allowed to consult the remote server, but we can use a hack similar
701  * to plancat.c's treatment of empty relations: use a minimum size
702  * estimate of 10 pages, and divide by the column-datatype-based width
703  * estimate to get the corresponding number of tuples.
704  */
705  if (baserel->pages == 0 && baserel->tuples == 0)
706  {
707  baserel->pages = 10;
708  baserel->tuples =
709  (10 * BLCKSZ) / (baserel->reltarget->width +
711  }
712 
713  /* Estimate baserel size as best we can with local statistics. */
714  set_baserel_size_estimates(root, baserel);
715 
716  /* Fill in basically-bogus cost estimates for use later. */
717  estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
718  &fpinfo->rows, &fpinfo->width,
719  &fpinfo->startup_cost, &fpinfo->total_cost);
720  }
721 
722  /*
723  * Set the name of relation in fpinfo, while we are constructing it here.
724  * It will be used to build the string describing the join relation in
725  * EXPLAIN output. We can't know whether VERBOSE option is specified or
726  * not, so always schema-qualify the foreign table name.
727  */
728  fpinfo->relation_name = makeStringInfo();
729  namespace = get_namespace_name(get_rel_namespace(foreigntableid));
730  relname = get_rel_name(foreigntableid);
731  refname = rte->eref->aliasname;
732  appendStringInfo(fpinfo->relation_name, "%s.%s",
733  quote_identifier(namespace),
734  quote_identifier(relname));
735  if (*refname && strcmp(refname, relname) != 0)
736  appendStringInfo(fpinfo->relation_name, " %s",
738 
739  /* No outer and inner relations. */
740  fpinfo->make_outerrel_subquery = false;
741  fpinfo->make_innerrel_subquery = false;
742  fpinfo->lower_subquery_rels = NULL;
743  /* Set the relation index. */
744  fpinfo->relation_index = baserel->relid;
745 }
746 
747 /*
748  * get_useful_ecs_for_relation
749  * Determine which EquivalenceClasses might be involved in useful
750  * orderings of this relation.
751  *
752  * This function is in some respects a mirror image of the core function
753  * pathkeys_useful_for_merging: for a regular table, we know what indexes
754  * we have and want to test whether any of them are useful. For a foreign
755  * table, we don't know what indexes are present on the remote side but
756  * want to speculate about which ones we'd like to use if they existed.
757  *
758  * This function returns a list of potentially-useful equivalence classes,
759  * but it does not guarantee that an EquivalenceMember exists which contains
760  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
761  * ft1.x + t1.x = 0, this function will say that the equivalence class
762  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
763  * t1 is local (or on a different server), it will turn out that no useful
764  * ORDER BY clause can be generated. It's not our job to figure that out
765  * here; we're only interested in identifying relevant ECs.
766  */
767 static List *
769 {
770  List *useful_eclass_list = NIL;
771  ListCell *lc;
772  Relids relids;
773 
774  /*
775  * First, consider whether any active EC is potentially useful for a merge
776  * join against this relation.
777  */
778  if (rel->has_eclass_joins)
779  {
780  foreach(lc, root->eq_classes)
781  {
782  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
783 
784  if (eclass_useful_for_merging(root, cur_ec, rel))
785  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
786  }
787  }
788 
789  /*
790  * Next, consider whether there are any non-EC derivable join clauses that
791  * are merge-joinable. If the joininfo list is empty, we can exit
792  * quickly.
793  */
794  if (rel->joininfo == NIL)
795  return useful_eclass_list;
796 
797  /* If this is a child rel, we must use the topmost parent rel to search. */
798  if (IS_OTHER_REL(rel))
799  {
801  relids = rel->top_parent_relids;
802  }
803  else
804  relids = rel->relids;
805 
806  /* Check each join clause in turn. */
807  foreach(lc, rel->joininfo)
808  {
809  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
810 
811  /* Consider only mergejoinable clauses */
812  if (restrictinfo->mergeopfamilies == NIL)
813  continue;
814 
815  /* Make sure we've got canonical ECs. */
816  update_mergeclause_eclasses(root, restrictinfo);
817 
818  /*
819  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
820  * that left_ec and right_ec will be initialized, per comments in
821  * distribute_qual_to_rels.
822  *
823  * We want to identify which side of this merge-joinable clause
824  * contains columns from the relation produced by this RelOptInfo. We
825  * test for overlap, not containment, because there could be extra
826  * relations on either side. For example, suppose we've got something
827  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
828  * A.y = D.y. The input rel might be the joinrel between A and B, and
829  * we'll consider the join clause A.y = D.y. relids contains a
830  * relation not involved in the join class (B) and the equivalence
831  * class for the left-hand side of the clause contains a relation not
832  * involved in the input rel (C). Despite the fact that we have only
833  * overlap and not containment in either direction, A.y is potentially
834  * useful as a sort column.
835  *
836  * Note that it's even possible that relids overlaps neither side of
837  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
838  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
839  * but overlaps neither side of B. In that case, we just skip this
840  * join clause, since it doesn't suggest a useful sort order for this
841  * relation.
842  */
843  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
844  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
845  restrictinfo->right_ec);
846  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
847  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
848  restrictinfo->left_ec);
849  }
850 
851  return useful_eclass_list;
852 }
853 
854 /*
855  * get_useful_pathkeys_for_relation
856  * Determine which orderings of a relation might be useful.
857  *
858  * Getting data in sorted order can be useful either because the requested
859  * order matches the final output ordering for the overall query we're
860  * planning, or because it enables an efficient merge join. Here, we try
861  * to figure out which pathkeys to consider.
862  */
863 static List *
865 {
866  List *useful_pathkeys_list = NIL;
867  List *useful_eclass_list;
869  EquivalenceClass *query_ec = NULL;
870  ListCell *lc;
871 
872  /*
873  * Pushing the query_pathkeys to the remote server is always worth
874  * considering, because it might let us avoid a local sort.
875  */
876  fpinfo->qp_is_pushdown_safe = false;
877  if (root->query_pathkeys)
878  {
879  bool query_pathkeys_ok = true;
880 
881  foreach(lc, root->query_pathkeys)
882  {
883  PathKey *pathkey = (PathKey *) lfirst(lc);
884  EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
885  Expr *em_expr;
886 
887  /*
888  * The planner and executor don't have any clever strategy for
889  * taking data sorted by a prefix of the query's pathkeys and
890  * getting it to be sorted by all of those pathkeys. We'll just
891  * end up resorting the entire data set. So, unless we can push
892  * down all of the query pathkeys, forget it.
893  *
894  * is_foreign_expr would detect volatile expressions as well, but
895  * checking ec_has_volatile here saves some cycles.
896  */
897  if (pathkey_ec->ec_has_volatile ||
898  !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
899  !is_foreign_expr(root, rel, em_expr))
900  {
901  query_pathkeys_ok = false;
902  break;
903  }
904  }
905 
906  if (query_pathkeys_ok)
907  {
908  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
909  fpinfo->qp_is_pushdown_safe = true;
910  }
911  }
912 
913  /*
914  * Even if we're not using remote estimates, having the remote side do the
915  * sort generally won't be any worse than doing it locally, and it might
916  * be much better if the remote side can generate data in the right order
917  * without needing a sort at all. However, what we're going to do next is
918  * try to generate pathkeys that seem promising for possible merge joins,
919  * and that's more speculative. A wrong choice might hurt quite a bit, so
920  * bail out if we can't use remote estimates.
921  */
922  if (!fpinfo->use_remote_estimate)
923  return useful_pathkeys_list;
924 
925  /* Get the list of interesting EquivalenceClasses. */
926  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
927 
928  /* Extract unique EC for query, if any, so we don't consider it again. */
929  if (list_length(root->query_pathkeys) == 1)
930  {
931  PathKey *query_pathkey = linitial(root->query_pathkeys);
932 
933  query_ec = query_pathkey->pk_eclass;
934  }
935 
936  /*
937  * As a heuristic, the only pathkeys we consider here are those of length
938  * one. It's surely possible to consider more, but since each one we
939  * choose to consider will generate a round-trip to the remote side, we
940  * need to be a bit cautious here. It would sure be nice to have a local
941  * cache of information about remote index definitions...
942  */
943  foreach(lc, useful_eclass_list)
944  {
945  EquivalenceClass *cur_ec = lfirst(lc);
946  Expr *em_expr;
947  PathKey *pathkey;
948 
949  /* If redundant with what we did above, skip it. */
950  if (cur_ec == query_ec)
951  continue;
952 
953  /* If no pushable expression for this rel, skip it. */
954  em_expr = find_em_expr_for_rel(cur_ec, rel);
955  if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
956  continue;
957 
958  /* Looks like we can generate a pathkey, so let's do it. */
959  pathkey = make_canonical_pathkey(root, cur_ec,
960  linitial_oid(cur_ec->ec_opfamilies),
962  false);
963  useful_pathkeys_list = lappend(useful_pathkeys_list,
964  list_make1(pathkey));
965  }
966 
967  return useful_pathkeys_list;
968 }
969 
970 /*
971  * postgresGetForeignPaths
972  * Create possible scan paths for a scan on the foreign table
973  */
974 static void
976  RelOptInfo *baserel,
977  Oid foreigntableid)
978 {
979  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
980  ForeignPath *path;
981  List *ppi_list;
982  ListCell *lc;
983 
984  /*
985  * Create simplest ForeignScan path node and add it to baserel. This path
986  * corresponds to SeqScan path of regular tables (though depending on what
987  * baserestrict conditions we were able to send to remote, there might
988  * actually be an indexscan happening there). We already did all the work
989  * to estimate cost and size of this path.
990  *
991  * Although this path uses no join clauses, it could still have required
992  * parameterization due to LATERAL refs in its tlist.
993  */
994  path = create_foreignscan_path(root, baserel,
995  NULL, /* default pathtarget */
996  fpinfo->rows,
997  fpinfo->startup_cost,
998  fpinfo->total_cost,
999  NIL, /* no pathkeys */
1000  baserel->lateral_relids,
1001  NULL, /* no extra plan */
1002  NIL); /* no fdw_private list */
1003  add_path(baserel, (Path *) path);
1004 
1005  /* Add paths with pathkeys */
1006  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
1007 
1008  /*
1009  * If we're not using remote estimates, stop here. We have no way to
1010  * estimate whether any join clauses would be worth sending across, so
1011  * don't bother building parameterized paths.
1012  */
1013  if (!fpinfo->use_remote_estimate)
1014  return;
1015 
1016  /*
1017  * Thumb through all join clauses for the rel to identify which outer
1018  * relations could supply one or more safe-to-send-to-remote join clauses.
1019  * We'll build a parameterized path for each such outer relation.
1020  *
1021  * It's convenient to manage this by representing each candidate outer
1022  * relation by the ParamPathInfo node for it. We can then use the
1023  * ppi_clauses list in the ParamPathInfo node directly as a list of the
1024  * interesting join clauses for that rel. This takes care of the
1025  * possibility that there are multiple safe join clauses for such a rel,
1026  * and also ensures that we account for unsafe join clauses that we'll
1027  * still have to enforce locally (since the parameterized-path machinery
1028  * insists that we handle all movable clauses).
1029  */
1030  ppi_list = NIL;
1031  foreach(lc, baserel->joininfo)
1032  {
1033  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1034  Relids required_outer;
1035  ParamPathInfo *param_info;
1036 
1037  /* Check if clause can be moved to this rel */
1038  if (!join_clause_is_movable_to(rinfo, baserel))
1039  continue;
1040 
1041  /* See if it is safe to send to remote */
1042  if (!is_foreign_expr(root, baserel, rinfo->clause))
1043  continue;
1044 
1045  /* Calculate required outer rels for the resulting path */
1046  required_outer = bms_union(rinfo->clause_relids,
1047  baserel->lateral_relids);
1048  /* We do not want the foreign rel itself listed in required_outer */
1049  required_outer = bms_del_member(required_outer, baserel->relid);
1050 
1051  /*
1052  * required_outer probably can't be empty here, but if it were, we
1053  * couldn't make a parameterized path.
1054  */
1055  if (bms_is_empty(required_outer))
1056  continue;
1057 
1058  /* Get the ParamPathInfo */
1059  param_info = get_baserel_parampathinfo(root, baserel,
1060  required_outer);
1061  Assert(param_info != NULL);
1062 
1063  /*
1064  * Add it to list unless we already have it. Testing pointer equality
1065  * is OK since get_baserel_parampathinfo won't make duplicates.
1066  */
1067  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1068  }
1069 
1070  /*
1071  * The above scan examined only "generic" join clauses, not those that
1072  * were absorbed into EquivalenceClauses. See if we can make anything out
1073  * of EquivalenceClauses.
1074  */
1075  if (baserel->has_eclass_joins)
1076  {
1077  /*
1078  * We repeatedly scan the eclass list looking for column references
1079  * (or expressions) belonging to the foreign rel. Each time we find
1080  * one, we generate a list of equivalence joinclauses for it, and then
1081  * see if any are safe to send to the remote. Repeat till there are
1082  * no more candidate EC members.
1083  */
1085 
1086  arg.already_used = NIL;
1087  for (;;)
1088  {
1089  List *clauses;
1090 
1091  /* Make clauses, skipping any that join to lateral_referencers */
1092  arg.current = NULL;
1094  baserel,
1096  (void *) &arg,
1097  baserel->lateral_referencers);
1098 
1099  /* Done if there are no more expressions in the foreign rel */
1100  if (arg.current == NULL)
1101  {
1102  Assert(clauses == NIL);
1103  break;
1104  }
1105 
1106  /* Scan the extracted join clauses */
1107  foreach(lc, clauses)
1108  {
1109  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1110  Relids required_outer;
1111  ParamPathInfo *param_info;
1112 
1113  /* Check if clause can be moved to this rel */
1114  if (!join_clause_is_movable_to(rinfo, baserel))
1115  continue;
1116 
1117  /* See if it is safe to send to remote */
1118  if (!is_foreign_expr(root, baserel, rinfo->clause))
1119  continue;
1120 
1121  /* Calculate required outer rels for the resulting path */
1122  required_outer = bms_union(rinfo->clause_relids,
1123  baserel->lateral_relids);
1124  required_outer = bms_del_member(required_outer, baserel->relid);
1125  if (bms_is_empty(required_outer))
1126  continue;
1127 
1128  /* Get the ParamPathInfo */
1129  param_info = get_baserel_parampathinfo(root, baserel,
1130  required_outer);
1131  Assert(param_info != NULL);
1132 
1133  /* Add it to list unless we already have it */
1134  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1135  }
1136 
1137  /* Try again, now ignoring the expression we found this time */
1138  arg.already_used = lappend(arg.already_used, arg.current);
1139  }
1140  }
1141 
1142  /*
1143  * Now build a path for each useful outer relation.
1144  */
1145  foreach(lc, ppi_list)
1146  {
1147  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1148  double rows;
1149  int width;
1150  Cost startup_cost;
1151  Cost total_cost;
1152 
1153  /* Get a cost estimate from the remote */
1154  estimate_path_cost_size(root, baserel,
1155  param_info->ppi_clauses, NIL, NULL,
1156  &rows, &width,
1157  &startup_cost, &total_cost);
1158 
1159  /*
1160  * ppi_rows currently won't get looked at by anything, but still we
1161  * may as well ensure that it matches our idea of the rowcount.
1162  */
1163  param_info->ppi_rows = rows;
1164 
1165  /* Make the path */
1166  path = create_foreignscan_path(root, baserel,
1167  NULL, /* default pathtarget */
1168  rows,
1169  startup_cost,
1170  total_cost,
1171  NIL, /* no pathkeys */
1172  param_info->ppi_req_outer,
1173  NULL,
1174  NIL); /* no fdw_private list */
1175  add_path(baserel, (Path *) path);
1176  }
1177 }
1178 
1179 /*
1180  * postgresGetForeignPlan
1181  * Create ForeignScan plan node which implements selected best path
1182  */
1183 static ForeignScan *
1185  RelOptInfo *foreignrel,
1186  Oid foreigntableid,
1187  ForeignPath *best_path,
1188  List *tlist,
1189  List *scan_clauses,
1190  Plan *outer_plan)
1191 {
1192  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1193  Index scan_relid;
1194  List *fdw_private;
1195  List *remote_exprs = NIL;
1196  List *local_exprs = NIL;
1197  List *params_list = NIL;
1198  List *fdw_scan_tlist = NIL;
1199  List *fdw_recheck_quals = NIL;
1201  StringInfoData sql;
1202  bool has_final_sort = false;
1203  bool has_limit = false;
1204  ListCell *lc;
1205 
1206  /*
1207  * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1208  */
1209  if (best_path->fdw_private)
1210  {
1211  has_final_sort = intVal(list_nth(best_path->fdw_private,
1213  has_limit = intVal(list_nth(best_path->fdw_private,
1215  }
1216 
1217  if (IS_SIMPLE_REL(foreignrel))
1218  {
1219  /*
1220  * For base relations, set scan_relid as the relid of the relation.
1221  */
1222  scan_relid = foreignrel->relid;
1223 
1224  /*
1225  * In a base-relation scan, we must apply the given scan_clauses.
1226  *
1227  * Separate the scan_clauses into those that can be executed remotely
1228  * and those that can't. baserestrictinfo clauses that were
1229  * previously determined to be safe or unsafe by classifyConditions
1230  * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1231  * else in the scan_clauses list will be a join clause, which we have
1232  * to check for remote-safety.
1233  *
1234  * Note: the join clauses we see here should be the exact same ones
1235  * previously examined by postgresGetForeignPaths. Possibly it'd be
1236  * worth passing forward the classification work done then, rather
1237  * than repeating it here.
1238  *
1239  * This code must match "extract_actual_clauses(scan_clauses, false)"
1240  * except for the additional decision about remote versus local
1241  * execution.
1242  */
1243  foreach(lc, scan_clauses)
1244  {
1245  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1246 
1247  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1248  if (rinfo->pseudoconstant)
1249  continue;
1250 
1251  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1252  remote_exprs = lappend(remote_exprs, rinfo->clause);
1253  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1254  local_exprs = lappend(local_exprs, rinfo->clause);
1255  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1256  remote_exprs = lappend(remote_exprs, rinfo->clause);
1257  else
1258  local_exprs = lappend(local_exprs, rinfo->clause);
1259  }
1260 
1261  /*
1262  * For a base-relation scan, we have to support EPQ recheck, which
1263  * should recheck all the remote quals.
1264  */
1265  fdw_recheck_quals = remote_exprs;
1266  }
1267  else
1268  {
1269  /*
1270  * Join relation or upper relation - set scan_relid to 0.
1271  */
1272  scan_relid = 0;
1273 
1274  /*
1275  * For a join rel, baserestrictinfo is NIL and we are not considering
1276  * parameterization right now, so there should be no scan_clauses for
1277  * a joinrel or an upper rel either.
1278  */
1279  Assert(!scan_clauses);
1280 
1281  /*
1282  * Instead we get the conditions to apply from the fdw_private
1283  * structure.
1284  */
1285  remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1286  local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1287 
1288  /*
1289  * We leave fdw_recheck_quals empty in this case, since we never need
1290  * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1291  * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1292  * If we're planning an upperrel (ie, remote grouping or aggregation)
1293  * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1294  * allowed, and indeed we *can't* put the remote clauses into
1295  * fdw_recheck_quals because the unaggregated Vars won't be available
1296  * locally.
1297  */
1298 
1299  /* Build the list of columns to be fetched from the foreign server. */
1300  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1301 
1302  /*
1303  * Ensure that the outer plan produces a tuple whose descriptor
1304  * matches our scan tuple slot. Also, remove the local conditions
1305  * from outer plan's quals, lest they be evaluated twice, once by the
1306  * local plan and once by the scan.
1307  */
1308  if (outer_plan)
1309  {
1310  ListCell *lc;
1311 
1312  /*
1313  * Right now, we only consider grouping and aggregation beyond
1314  * joins. Queries involving aggregates or grouping do not require
1315  * EPQ mechanism, hence should not have an outer plan here.
1316  */
1317  Assert(!IS_UPPER_REL(foreignrel));
1318 
1319  /*
1320  * First, update the plan's qual list if possible. In some cases
1321  * the quals might be enforced below the topmost plan level, in
1322  * which case we'll fail to remove them; it's not worth working
1323  * harder than this.
1324  */
1325  foreach(lc, local_exprs)
1326  {
1327  Node *qual = lfirst(lc);
1328 
1329  outer_plan->qual = list_delete(outer_plan->qual, qual);
1330 
1331  /*
1332  * For an inner join the local conditions of foreign scan plan
1333  * can be part of the joinquals as well. (They might also be
1334  * in the mergequals or hashquals, but we can't touch those
1335  * without breaking the plan.)
1336  */
1337  if (IsA(outer_plan, NestLoop) ||
1338  IsA(outer_plan, MergeJoin) ||
1339  IsA(outer_plan, HashJoin))
1340  {
1341  Join *join_plan = (Join *) outer_plan;
1342 
1343  if (join_plan->jointype == JOIN_INNER)
1344  join_plan->joinqual = list_delete(join_plan->joinqual,
1345  qual);
1346  }
1347  }
1348 
1349  /*
1350  * Now fix the subplan's tlist --- this might result in inserting
1351  * a Result node atop the plan tree.
1352  */
1353  outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1354  best_path->path.parallel_safe);
1355  }
1356  }
1357 
1358  /*
1359  * Build the query string to be sent for execution, and identify
1360  * expressions to be sent as parameters.
1361  */
1362  initStringInfo(&sql);
1363  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1364  remote_exprs, best_path->path.pathkeys,
1365  has_final_sort, has_limit, false,
1366  &retrieved_attrs, &params_list);
1367 
1368  /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1369  fpinfo->final_remote_exprs = remote_exprs;
1370 
1371  /*
1372  * Build the fdw_private list that will be available to the executor.
1373  * Items in the list must match order in enum FdwScanPrivateIndex.
1374  */
1375  fdw_private = list_make3(makeString(sql.data),
1377  makeInteger(fpinfo->fetch_size));
1378  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1379  fdw_private = lappend(fdw_private,
1380  makeString(fpinfo->relation_name->data));
1381 
1382  /*
1383  * Create the ForeignScan node for the given relation.
1384  *
1385  * Note that the remote parameter expressions are stored in the fdw_exprs
1386  * field of the finished plan node; we can't keep them in private state
1387  * because then they wouldn't be subject to later planner processing.
1388  */
1389  return make_foreignscan(tlist,
1390  local_exprs,
1391  scan_relid,
1392  params_list,
1393  fdw_private,
1394  fdw_scan_tlist,
1395  fdw_recheck_quals,
1396  outer_plan);
1397 }
1398 
1399 /*
1400  * postgresBeginForeignScan
1401  * Initiate an executor scan of a foreign PostgreSQL table.
1402  */
1403 static void
1405 {
1406  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1407  EState *estate = node->ss.ps.state;
1408  PgFdwScanState *fsstate;
1409  RangeTblEntry *rte;
1410  Oid userid;
1411  ForeignTable *table;
1412  UserMapping *user;
1413  int rtindex;
1414  int numParams;
1415 
1416  /*
1417  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1418  */
1419  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1420  return;
1421 
1422  /*
1423  * We'll save private state in node->fdw_state.
1424  */
1425  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1426  node->fdw_state = (void *) fsstate;
1427 
1428  /*
1429  * Identify which user to do the remote access as. This should match what
1430  * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1431  * lowest-numbered member RTE as a representative; we would get the same
1432  * result from any.
1433  */
1434  if (fsplan->scan.scanrelid > 0)
1435  rtindex = fsplan->scan.scanrelid;
1436  else
1437  rtindex = bms_next_member(fsplan->fs_relids, -1);
1438  rte = exec_rt_fetch(rtindex, estate);
1439  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1440 
1441  /* Get info about foreign table. */
1442  table = GetForeignTable(rte->relid);
1443  user = GetUserMapping(userid, table->serverid);
1444 
1445  /*
1446  * Get connection to the foreign server. Connection manager will
1447  * establish new connection if necessary.
1448  */
1449  fsstate->conn = GetConnection(user, false);
1450 
1451  /* Assign a unique ID for my cursor */
1452  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1453  fsstate->cursor_exists = false;
1454 
1455  /* Get private info created by planner functions. */
1456  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1458  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1460  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1462 
1463  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1464  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1465  "postgres_fdw tuple data",
1467  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1468  "postgres_fdw temporary data",
1470 
1471  /*
1472  * Get info we'll need for converting data fetched from the foreign server
1473  * into local representation and error reporting during that process.
1474  */
1475  if (fsplan->scan.scanrelid > 0)
1476  {
1477  fsstate->rel = node->ss.ss_currentRelation;
1478  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1479  }
1480  else
1481  {
1482  fsstate->rel = NULL;
1483  fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1484  }
1485 
1486  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1487 
1488  /*
1489  * Prepare for processing of parameters used in remote query, if any.
1490  */
1491  numParams = list_length(fsplan->fdw_exprs);
1492  fsstate->numParams = numParams;
1493  if (numParams > 0)
1495  fsplan->fdw_exprs,
1496  numParams,
1497  &fsstate->param_flinfo,
1498  &fsstate->param_exprs,
1499  &fsstate->param_values);
1500 }
1501 
1502 /*
1503  * postgresIterateForeignScan
1504  * Retrieve next row from the result set, or clear tuple slot to indicate
1505  * EOF.
1506  */
1507 static TupleTableSlot *
1509 {
1510  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1511  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1512 
1513  /*
1514  * If this is the first call after Begin or ReScan, we need to create the
1515  * cursor on the remote side.
1516  */
1517  if (!fsstate->cursor_exists)
1518  create_cursor(node);
1519 
1520  /*
1521  * Get some more tuples, if we've run out.
1522  */
1523  if (fsstate->next_tuple >= fsstate->num_tuples)
1524  {
1525  /* No point in another fetch if we already detected EOF, though. */
1526  if (!fsstate->eof_reached)
1527  fetch_more_data(node);
1528  /* If we didn't get any tuples, must be end of data. */
1529  if (fsstate->next_tuple >= fsstate->num_tuples)
1530  return ExecClearTuple(slot);
1531  }
1532 
1533  /*
1534  * Return the next tuple.
1535  */
1536  ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1537  slot,
1538  false);
1539 
1540  return slot;
1541 }
1542 
1543 /*
1544  * postgresReScanForeignScan
1545  * Restart the scan.
1546  */
1547 static void
1549 {
1550  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1551  char sql[64];
1552  PGresult *res;
1553 
1554  /* If we haven't created the cursor yet, nothing to do. */
1555  if (!fsstate->cursor_exists)
1556  return;
1557 
1558  /*
1559  * If any internal parameters affecting this node have changed, we'd
1560  * better destroy and recreate the cursor. Otherwise, rewinding it should
1561  * be good enough. If we've only fetched zero or one batch, we needn't
1562  * even rewind the cursor, just rescan what we have.
1563  */
1564  if (node->ss.ps.chgParam != NULL)
1565  {
1566  fsstate->cursor_exists = false;
1567  snprintf(sql, sizeof(sql), "CLOSE c%u",
1568  fsstate->cursor_number);
1569  }
1570  else if (fsstate->fetch_ct_2 > 1)
1571  {
1572  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1573  fsstate->cursor_number);
1574  }
1575  else
1576  {
1577  /* Easy: just rescan what we already have in memory, if anything */
1578  fsstate->next_tuple = 0;
1579  return;
1580  }
1581 
1582  /*
1583  * We don't use a PG_TRY block here, so be careful not to throw error
1584  * without releasing the PGresult.
1585  */
1586  res = pgfdw_exec_query(fsstate->conn, sql);
1587  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1588  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1589  PQclear(res);
1590 
1591  /* Now force a fresh FETCH. */
1592  fsstate->tuples = NULL;
1593  fsstate->num_tuples = 0;
1594  fsstate->next_tuple = 0;
1595  fsstate->fetch_ct_2 = 0;
1596  fsstate->eof_reached = false;
1597 }
1598 
1599 /*
1600  * postgresEndForeignScan
1601  * Finish scanning foreign table and dispose objects used for this scan
1602  */
1603 static void
1605 {
1606  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1607 
1608  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1609  if (fsstate == NULL)
1610  return;
1611 
1612  /* Close the cursor if open, to prevent accumulation of cursors */
1613  if (fsstate->cursor_exists)
1614  close_cursor(fsstate->conn, fsstate->cursor_number);
1615 
1616  /* Release remote connection */
1617  ReleaseConnection(fsstate->conn);
1618  fsstate->conn = NULL;
1619 
1620  /* MemoryContexts will be deleted automatically. */
1621 }
1622 
1623 /*
1624  * postgresAddForeignUpdateTargets
1625  * Add resjunk column(s) needed for update/delete on a foreign table
1626  */
1627 static void
1629  RangeTblEntry *target_rte,
1630  Relation target_relation)
1631 {
1632  Var *var;
1633  const char *attrname;
1634  TargetEntry *tle;
1635 
1636  /*
1637  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1638  */
1639 
1640  /* Make a Var representing the desired value */
1641  var = makeVar(parsetree->resultRelation,
1643  TIDOID,
1644  -1,
1645  InvalidOid,
1646  0);
1647 
1648  /* Wrap it in a resjunk TLE with the right name ... */
1649  attrname = "ctid";
1650 
1651  tle = makeTargetEntry((Expr *) var,
1652  list_length(parsetree->targetList) + 1,
1653  pstrdup(attrname),
1654  true);
1655 
1656  /* ... and add it to the query's targetlist */
1657  parsetree->targetList = lappend(parsetree->targetList, tle);
1658 }
1659 
1660 /*
1661  * postgresPlanForeignModify
1662  * Plan an insert/update/delete operation on a foreign table
1663  */
1664 static List *
1666  ModifyTable *plan,
1667  Index resultRelation,
1668  int subplan_index)
1669 {
1670  CmdType operation = plan->operation;
1671  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1672  Relation rel;
1673  StringInfoData sql;
1674  List *targetAttrs = NIL;
1675  List *withCheckOptionList = NIL;
1676  List *returningList = NIL;
1678  bool doNothing = false;
1679 
1680  initStringInfo(&sql);
1681 
1682  /*
1683  * Core code already has some lock on each rel being planned, so we can
1684  * use NoLock here.
1685  */
1686  rel = table_open(rte->relid, NoLock);
1687 
1688  /*
1689  * In an INSERT, we transmit all columns that are defined in the foreign
1690  * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1691  * foreign table, we transmit all columns like INSERT; else we transmit
1692  * only columns that were explicitly targets of the UPDATE, so as to avoid
1693  * unnecessary data transmission. (We can't do that for INSERT since we
1694  * would miss sending default values for columns not listed in the source
1695  * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1696  * those triggers might change values for non-target columns, in which
1697  * case we would miss sending changed values for those columns.)
1698  */
1699  if (operation == CMD_INSERT ||
1700  (operation == CMD_UPDATE &&
1701  rel->trigdesc &&
1703  {
1705  int attnum;
1706 
1707  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1708  {
1709  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1710 
1711  if (!attr->attisdropped)
1712  targetAttrs = lappend_int(targetAttrs, attnum);
1713  }
1714  }
1715  else if (operation == CMD_UPDATE)
1716  {
1717  int col;
1718  Bitmapset *allUpdatedCols = bms_union(rte->updatedCols, rte->extraUpdatedCols);
1719 
1720  col = -1;
1721  while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1722  {
1723  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1725 
1726  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1727  elog(ERROR, "system-column update is not supported");
1728  targetAttrs = lappend_int(targetAttrs, attno);
1729  }
1730  }
1731 
1732  /*
1733  * Extract the relevant WITH CHECK OPTION list if any.
1734  */
1735  if (plan->withCheckOptionLists)
1736  withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1737  subplan_index);
1738 
1739  /*
1740  * Extract the relevant RETURNING list if any.
1741  */
1742  if (plan->returningLists)
1743  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1744 
1745  /*
1746  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1747  * should have already been rejected in the optimizer, as presently there
1748  * is no way to recognize an arbiter index on a foreign table. Only DO
1749  * NOTHING is supported without an inference specification.
1750  */
1751  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1752  doNothing = true;
1753  else if (plan->onConflictAction != ONCONFLICT_NONE)
1754  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1755  (int) plan->onConflictAction);
1756 
1757  /*
1758  * Construct the SQL command string.
1759  */
1760  switch (operation)
1761  {
1762  case CMD_INSERT:
1763  deparseInsertSql(&sql, rte, resultRelation, rel,
1764  targetAttrs, doNothing,
1765  withCheckOptionList, returningList,
1766  &retrieved_attrs);
1767  break;
1768  case CMD_UPDATE:
1769  deparseUpdateSql(&sql, rte, resultRelation, rel,
1770  targetAttrs,
1771  withCheckOptionList, returningList,
1772  &retrieved_attrs);
1773  break;
1774  case CMD_DELETE:
1775  deparseDeleteSql(&sql, rte, resultRelation, rel,
1776  returningList,
1777  &retrieved_attrs);
1778  break;
1779  default:
1780  elog(ERROR, "unexpected operation: %d", (int) operation);
1781  break;
1782  }
1783 
1784  table_close(rel, NoLock);
1785 
1786  /*
1787  * Build the fdw_private list that will be available to the executor.
1788  * Items in the list must match enum FdwModifyPrivateIndex, above.
1789  */
1790  return list_make4(makeString(sql.data),
1791  targetAttrs,
1792  makeInteger((retrieved_attrs != NIL)),
1793  retrieved_attrs);
1794 }
1795 
1796 /*
1797  * postgresBeginForeignModify
1798  * Begin an insert/update/delete operation on a foreign table
1799  */
1800 static void
1802  ResultRelInfo *resultRelInfo,
1803  List *fdw_private,
1804  int subplan_index,
1805  int eflags)
1806 {
1807  PgFdwModifyState *fmstate;
1808  char *query;
1809  List *target_attrs;
1810  bool has_returning;
1812  RangeTblEntry *rte;
1813 
1814  /*
1815  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1816  * stays NULL.
1817  */
1818  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1819  return;
1820 
1821  /* Deconstruct fdw_private data. */
1822  query = strVal(list_nth(fdw_private,
1824  target_attrs = (List *) list_nth(fdw_private,
1826  has_returning = intVal(list_nth(fdw_private,
1828  retrieved_attrs = (List *) list_nth(fdw_private,
1830 
1831  /* Find RTE. */
1832  rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1833  mtstate->ps.state);
1834 
1835  /* Construct an execution state. */
1836  fmstate = create_foreign_modify(mtstate->ps.state,
1837  rte,
1838  resultRelInfo,
1839  mtstate->operation,
1840  mtstate->mt_plans[subplan_index]->plan,
1841  query,
1842  target_attrs,
1843  has_returning,
1844  retrieved_attrs);
1845 
1846  resultRelInfo->ri_FdwState = fmstate;
1847 }
1848 
1849 /*
1850  * postgresExecForeignInsert
1851  * Insert one row into a foreign table
1852  */
1853 static TupleTableSlot *
1855  ResultRelInfo *resultRelInfo,
1856  TupleTableSlot *slot,
1857  TupleTableSlot *planSlot)
1858 {
1859  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1860  TupleTableSlot *rslot;
1861 
1862  /*
1863  * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1864  * postgresBeginForeignInsert())
1865  */
1866  if (fmstate->aux_fmstate)
1867  resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1868  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1869  slot, planSlot);
1870  /* Revert that change */
1871  if (fmstate->aux_fmstate)
1872  resultRelInfo->ri_FdwState = fmstate;
1873 
1874  return rslot;
1875 }
1876 
1877 /*
1878  * postgresExecForeignUpdate
1879  * Update one row in a foreign table
1880  */
1881 static TupleTableSlot *
1883  ResultRelInfo *resultRelInfo,
1884  TupleTableSlot *slot,
1885  TupleTableSlot *planSlot)
1886 {
1887  return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
1888  slot, planSlot);
1889 }
1890 
1891 /*
1892  * postgresExecForeignDelete
1893  * Delete one row from a foreign table
1894  */
1895 static TupleTableSlot *
1897  ResultRelInfo *resultRelInfo,
1898  TupleTableSlot *slot,
1899  TupleTableSlot *planSlot)
1900 {
1901  return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
1902  slot, planSlot);
1903 }
1904 
1905 /*
1906  * postgresEndForeignModify
1907  * Finish an insert/update/delete operation on a foreign table
1908  */
1909 static void
1911  ResultRelInfo *resultRelInfo)
1912 {
1913  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1914 
1915  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1916  if (fmstate == NULL)
1917  return;
1918 
1919  /* Destroy the execution state */
1920  finish_foreign_modify(fmstate);
1921 }
1922 
1923 /*
1924  * postgresBeginForeignInsert
1925  * Begin an insert operation on a foreign table
1926  */
1927 static void
1929  ResultRelInfo *resultRelInfo)
1930 {
1931  PgFdwModifyState *fmstate;
1932  ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
1933  EState *estate = mtstate->ps.state;
1934  Index resultRelation = resultRelInfo->ri_RangeTableIndex;
1935  Relation rel = resultRelInfo->ri_RelationDesc;
1936  RangeTblEntry *rte;
1938  int attnum;
1939  StringInfoData sql;
1940  List *targetAttrs = NIL;
1942  bool doNothing = false;
1943 
1944  /*
1945  * If the foreign table we are about to insert routed rows into is also an
1946  * UPDATE subplan result rel that will be updated later, proceeding with
1947  * the INSERT will result in the later UPDATE incorrectly modifying those
1948  * routed rows, so prevent the INSERT --- it would be nice if we could
1949  * handle this case; but for now, throw an error for safety.
1950  */
1951  if (plan && plan->operation == CMD_UPDATE &&
1952  (resultRelInfo->ri_usesFdwDirectModify ||
1953  resultRelInfo->ri_FdwState) &&
1954  resultRelInfo > mtstate->resultRelInfo + mtstate->mt_whichplan)
1955  ereport(ERROR,
1956  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1957  errmsg("cannot route tuples into foreign table to be updated \"%s\"",
1958  RelationGetRelationName(rel))));
1959 
1960  initStringInfo(&sql);
1961 
1962  /* We transmit all columns that are defined in the foreign table. */
1963  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1964  {
1965  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1966 
1967  if (!attr->attisdropped)
1968  targetAttrs = lappend_int(targetAttrs, attnum);
1969  }
1970 
1971  /* Check if we add the ON CONFLICT clause to the remote query. */
1972  if (plan)
1973  {
1974  OnConflictAction onConflictAction = plan->onConflictAction;
1975 
1976  /* We only support DO NOTHING without an inference specification. */
1977  if (onConflictAction == ONCONFLICT_NOTHING)
1978  doNothing = true;
1979  else if (onConflictAction != ONCONFLICT_NONE)
1980  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1981  (int) onConflictAction);
1982  }
1983 
1984  /*
1985  * If the foreign table is a partition, we need to create a new RTE
1986  * describing the foreign table for use by deparseInsertSql and
1987  * create_foreign_modify() below, after first copying the parent's RTE and
1988  * modifying some fields to describe the foreign partition to work on.
1989  * However, if this is invoked by UPDATE, the existing RTE may already
1990  * correspond to this partition if it is one of the UPDATE subplan target
1991  * rels; in that case, we can just use the existing RTE as-is.
1992  */
1993  rte = exec_rt_fetch(resultRelation, estate);
1994  if (rte->relid != RelationGetRelid(rel))
1995  {
1996  rte = copyObject(rte);
1997  rte->relid = RelationGetRelid(rel);
1998  rte->relkind = RELKIND_FOREIGN_TABLE;
1999 
2000  /*
2001  * For UPDATE, we must use the RT index of the first subplan target
2002  * rel's RTE, because the core code would have built expressions for
2003  * the partition, such as RETURNING, using that RT index as varno of
2004  * Vars contained in those expressions.
2005  */
2006  if (plan && plan->operation == CMD_UPDATE &&
2007  resultRelation == plan->rootRelation)
2008  resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2009  }
2010 
2011  /* Construct the SQL command string. */
2012  deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2013  resultRelInfo->ri_WithCheckOptions,
2014  resultRelInfo->ri_returningList,
2015  &retrieved_attrs);
2016 
2017  /* Construct an execution state. */
2018  fmstate = create_foreign_modify(mtstate->ps.state,
2019  rte,
2020  resultRelInfo,
2021  CMD_INSERT,
2022  NULL,
2023  sql.data,
2024  targetAttrs,
2025  retrieved_attrs != NIL,
2026  retrieved_attrs);
2027 
2028  /*
2029  * If the given resultRelInfo already has PgFdwModifyState set, it means
2030  * the foreign table is an UPDATE subplan result rel; in which case, store
2031  * the resulting state into the aux_fmstate of the PgFdwModifyState.
2032  */
2033  if (resultRelInfo->ri_FdwState)
2034  {
2035  Assert(plan && plan->operation == CMD_UPDATE);
2036  Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2037  ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2038  }
2039  else
2040  resultRelInfo->ri_FdwState = fmstate;
2041 }
2042 
2043 /*
2044  * postgresEndForeignInsert
2045  * Finish an insert operation on a foreign table
2046  */
2047 static void
2049  ResultRelInfo *resultRelInfo)
2050 {
2051  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2052 
2053  Assert(fmstate != NULL);
2054 
2055  /*
2056  * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2057  * postgresBeginForeignInsert())
2058  */
2059  if (fmstate->aux_fmstate)
2060  fmstate = fmstate->aux_fmstate;
2061 
2062  /* Destroy the execution state */
2063  finish_foreign_modify(fmstate);
2064 }
2065 
2066 /*
2067  * postgresIsForeignRelUpdatable
2068  * Determine whether a foreign table supports INSERT, UPDATE and/or
2069  * DELETE.
2070  */
2071 static int
2073 {
2074  bool updatable;
2075  ForeignTable *table;
2076  ForeignServer *server;
2077  ListCell *lc;
2078 
2079  /*
2080  * By default, all postgres_fdw foreign tables are assumed updatable. This
2081  * can be overridden by a per-server setting, which in turn can be
2082  * overridden by a per-table setting.
2083  */
2084  updatable = true;
2085 
2086  table = GetForeignTable(RelationGetRelid(rel));
2087  server = GetForeignServer(table->serverid);
2088 
2089  foreach(lc, server->options)
2090  {
2091  DefElem *def = (DefElem *) lfirst(lc);
2092 
2093  if (strcmp(def->defname, "updatable") == 0)
2094  updatable = defGetBoolean(def);
2095  }
2096  foreach(lc, table->options)
2097  {
2098  DefElem *def = (DefElem *) lfirst(lc);
2099 
2100  if (strcmp(def->defname, "updatable") == 0)
2101  updatable = defGetBoolean(def);
2102  }
2103 
2104  /*
2105  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2106  */
2107  return updatable ?
2108  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2109 }
2110 
2111 /*
2112  * postgresRecheckForeignScan
2113  * Execute a local join execution plan for a foreign join
2114  */
2115 static bool
2117 {
2118  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2120  TupleTableSlot *result;
2121 
2122  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2123  if (scanrelid > 0)
2124  return true;
2125 
2126  Assert(outerPlan != NULL);
2127 
2128  /* Execute a local join execution plan */
2129  result = ExecProcNode(outerPlan);
2130  if (TupIsNull(result))
2131  return false;
2132 
2133  /* Store result in the given slot */
2134  ExecCopySlot(slot, result);
2135 
2136  return true;
2137 }
2138 
2139 /*
2140  * postgresPlanDirectModify
2141  * Consider a direct foreign table modification
2142  *
2143  * Decide whether it is safe to modify a foreign table directly, and if so,
2144  * rewrite subplan accordingly.
2145  */
2146 static bool
2148  ModifyTable *plan,
2149  Index resultRelation,
2150  int subplan_index)
2151 {
2152  CmdType operation = plan->operation;
2153  Plan *subplan;
2154  RelOptInfo *foreignrel;
2155  RangeTblEntry *rte;
2156  PgFdwRelationInfo *fpinfo;
2157  Relation rel;
2158  StringInfoData sql;
2159  ForeignScan *fscan;
2160  List *targetAttrs = NIL;
2161  List *remote_exprs;
2162  List *params_list = NIL;
2163  List *returningList = NIL;
2165 
2166  /*
2167  * Decide whether it is safe to modify a foreign table directly.
2168  */
2169 
2170  /*
2171  * The table modification must be an UPDATE or DELETE.
2172  */
2173  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2174  return false;
2175 
2176  /*
2177  * It's unsafe to modify a foreign table directly if there are any local
2178  * joins needed.
2179  */
2180  subplan = (Plan *) list_nth(plan->plans, subplan_index);
2181  if (!IsA(subplan, ForeignScan))
2182  return false;
2183  fscan = (ForeignScan *) subplan;
2184 
2185  /*
2186  * It's unsafe to modify a foreign table directly if there are any quals
2187  * that should be evaluated locally.
2188  */
2189  if (subplan->qual != NIL)
2190  return false;
2191 
2192  /* Safe to fetch data about the target foreign rel */
2193  if (fscan->scan.scanrelid == 0)
2194  {
2195  foreignrel = find_join_rel(root, fscan->fs_relids);
2196  /* We should have a rel for this foreign join. */
2197  Assert(foreignrel);
2198  }
2199  else
2200  foreignrel = root->simple_rel_array[resultRelation];
2201  rte = root->simple_rte_array[resultRelation];
2202  fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2203 
2204  /*
2205  * It's unsafe to update a foreign table directly, if any expressions to
2206  * assign to the target columns are unsafe to evaluate remotely.
2207  */
2208  if (operation == CMD_UPDATE)
2209  {
2210  int col;
2211 
2212  /*
2213  * We transmit only columns that were explicitly targets of the
2214  * UPDATE, so as to avoid unnecessary data transmission.
2215  */
2216  col = -1;
2217  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2218  {
2219  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2221  TargetEntry *tle;
2222 
2223  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2224  elog(ERROR, "system-column update is not supported");
2225 
2226  tle = get_tle_by_resno(subplan->targetlist, attno);
2227 
2228  if (!tle)
2229  elog(ERROR, "attribute number %d not found in subplan targetlist",
2230  attno);
2231 
2232  if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2233  return false;
2234 
2235  targetAttrs = lappend_int(targetAttrs, attno);
2236  }
2237  }
2238 
2239  /*
2240  * Ok, rewrite subplan so as to modify the foreign table directly.
2241  */
2242  initStringInfo(&sql);
2243 
2244  /*
2245  * Core code already has some lock on each rel being planned, so we can
2246  * use NoLock here.
2247  */
2248  rel = table_open(rte->relid, NoLock);
2249 
2250  /*
2251  * Recall the qual clauses that must be evaluated remotely. (These are
2252  * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2253  * doesn't care.)
2254  */
2255  remote_exprs = fpinfo->final_remote_exprs;
2256 
2257  /*
2258  * Extract the relevant RETURNING list if any.
2259  */
2260  if (plan->returningLists)
2261  {
2262  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2263 
2264  /*
2265  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2266  * we fetch from the foreign server any Vars specified in RETURNING
2267  * that refer not only to the target relation but to non-target
2268  * relations. So we'll deparse them into the RETURNING clause of the
2269  * remote query; use a targetlist consisting of them instead, which
2270  * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2271  * node below.
2272  */
2273  if (fscan->scan.scanrelid == 0)
2274  returningList = build_remote_returning(resultRelation, rel,
2275  returningList);
2276  }
2277 
2278  /*
2279  * Construct the SQL command string.
2280  */
2281  switch (operation)
2282  {
2283  case CMD_UPDATE:
2284  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2285  foreignrel,
2286  ((Plan *) fscan)->targetlist,
2287  targetAttrs,
2288  remote_exprs, &params_list,
2289  returningList, &retrieved_attrs);
2290  break;
2291  case CMD_DELETE:
2292  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2293  foreignrel,
2294  remote_exprs, &params_list,
2295  returningList, &retrieved_attrs);
2296  break;
2297  default:
2298  elog(ERROR, "unexpected operation: %d", (int) operation);
2299  break;
2300  }
2301 
2302  /*
2303  * Update the operation info.
2304  */
2305  fscan->operation = operation;
2306 
2307  /*
2308  * Update the fdw_exprs list that will be available to the executor.
2309  */
2310  fscan->fdw_exprs = params_list;
2311 
2312  /*
2313  * Update the fdw_private list that will be available to the executor.
2314  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2315  */
2316  fscan->fdw_private = list_make4(makeString(sql.data),
2317  makeInteger((retrieved_attrs != NIL)),
2318  retrieved_attrs,
2319  makeInteger(plan->canSetTag));
2320 
2321  /*
2322  * Update the foreign-join-related fields.
2323  */
2324  if (fscan->scan.scanrelid == 0)
2325  {
2326  /* No need for the outer subplan. */
2327  fscan->scan.plan.lefttree = NULL;
2328 
2329  /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2330  if (returningList)
2331  rebuild_fdw_scan_tlist(fscan, returningList);
2332  }
2333 
2334  table_close(rel, NoLock);
2335  return true;
2336 }
2337 
2338 /*
2339  * postgresBeginDirectModify
2340  * Prepare a direct foreign table modification
2341  */
2342 static void
2344 {
2345  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2346  EState *estate = node->ss.ps.state;
2347  PgFdwDirectModifyState *dmstate;
2348  Index rtindex;
2349  RangeTblEntry *rte;
2350  Oid userid;
2351  ForeignTable *table;
2352  UserMapping *user;
2353  int numParams;
2354 
2355  /*
2356  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2357  */
2358  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2359  return;
2360 
2361  /*
2362  * We'll save private state in node->fdw_state.
2363  */
2364  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2365  node->fdw_state = (void *) dmstate;
2366 
2367  /*
2368  * Identify which user to do the remote access as. This should match what
2369  * ExecCheckRTEPerms() does.
2370  */
2371  rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2372  rte = exec_rt_fetch(rtindex, estate);
2373  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2374 
2375  /* Get info about foreign table. */
2376  if (fsplan->scan.scanrelid == 0)
2377  dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2378  else
2379  dmstate->rel = node->ss.ss_currentRelation;
2380  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2381  user = GetUserMapping(userid, table->serverid);
2382 
2383  /*
2384  * Get connection to the foreign server. Connection manager will
2385  * establish new connection if necessary.
2386  */
2387  dmstate->conn = GetConnection(user, false);
2388 
2389  /* Update the foreign-join-related fields. */
2390  if (fsplan->scan.scanrelid == 0)
2391  {
2392  /* Save info about foreign table. */
2393  dmstate->resultRel = dmstate->rel;
2394 
2395  /*
2396  * Set dmstate->rel to NULL to teach get_returning_data() and
2397  * make_tuple_from_result_row() that columns fetched from the remote
2398  * server are described by fdw_scan_tlist of the foreign-scan plan
2399  * node, not the tuple descriptor for the target relation.
2400  */
2401  dmstate->rel = NULL;
2402  }
2403 
2404  /* Initialize state variable */
2405  dmstate->num_tuples = -1; /* -1 means not set yet */
2406 
2407  /* Get private info created by planner functions. */
2408  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2410  dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2412  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2414  dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2416 
2417  /* Create context for per-tuple temp workspace. */
2418  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2419  "postgres_fdw temporary data",
2421 
2422  /* Prepare for input conversion of RETURNING results. */
2423  if (dmstate->has_returning)
2424  {
2426 
2427  if (fsplan->scan.scanrelid == 0)
2428  tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2429  else
2430  tupdesc = RelationGetDescr(dmstate->rel);
2431 
2432  dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2433 
2434  /*
2435  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2436  * initialize a filter to extract an updated/deleted tuple from a scan
2437  * tuple.
2438  */
2439  if (fsplan->scan.scanrelid == 0)
2440  init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2441  }
2442 
2443  /*
2444  * Prepare for processing of parameters used in remote query, if any.
2445  */
2446  numParams = list_length(fsplan->fdw_exprs);
2447  dmstate->numParams = numParams;
2448  if (numParams > 0)
2450  fsplan->fdw_exprs,
2451  numParams,
2452  &dmstate->param_flinfo,
2453  &dmstate->param_exprs,
2454  &dmstate->param_values);
2455 }
2456 
2457 /*
2458  * postgresIterateDirectModify
2459  * Execute a direct foreign table modification
2460  */
2461 static TupleTableSlot *
2463 {
2465  EState *estate = node->ss.ps.state;
2466  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2467 
2468  /*
2469  * If this is the first call after Begin, execute the statement.
2470  */
2471  if (dmstate->num_tuples == -1)
2472  execute_dml_stmt(node);
2473 
2474  /*
2475  * If the local query doesn't specify RETURNING, just clear tuple slot.
2476  */
2477  if (!resultRelInfo->ri_projectReturning)
2478  {
2479  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2480  Instrumentation *instr = node->ss.ps.instrument;
2481 
2482  Assert(!dmstate->has_returning);
2483 
2484  /* Increment the command es_processed count if necessary. */
2485  if (dmstate->set_processed)
2486  estate->es_processed += dmstate->num_tuples;
2487 
2488  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2489  if (instr)
2490  instr->tuplecount += dmstate->num_tuples;
2491 
2492  return ExecClearTuple(slot);
2493  }
2494 
2495  /*
2496  * Get the next RETURNING tuple.
2497  */
2498  return get_returning_data(node);
2499 }
2500 
2501 /*
2502  * postgresEndDirectModify
2503  * Finish a direct foreign table modification
2504  */
2505 static void
2507 {
2509 
2510  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2511  if (dmstate == NULL)
2512  return;
2513 
2514  /* Release PGresult */
2515  if (dmstate->result)
2516  PQclear(dmstate->result);
2517 
2518  /* Release remote connection */
2519  ReleaseConnection(dmstate->conn);
2520  dmstate->conn = NULL;
2521 
2522  /* MemoryContext will be deleted automatically. */
2523 }
2524 
2525 /*
2526  * postgresExplainForeignScan
2527  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2528  */
2529 static void
2531 {
2532  List *fdw_private;
2533  char *sql;
2534  char *relations;
2535 
2536  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2537 
2538  /*
2539  * Add names of relation handled by the foreign scan when the scan is a
2540  * join
2541  */
2542  if (list_length(fdw_private) > FdwScanPrivateRelations)
2543  {
2544  relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2545  ExplainPropertyText("Relations", relations, es);
2546  }
2547 
2548  /*
2549  * Add remote query, when VERBOSE option is specified.
2550  */
2551  if (es->verbose)
2552  {
2553  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2554  ExplainPropertyText("Remote SQL", sql, es);
2555  }
2556 }
2557 
2558 /*
2559  * postgresExplainForeignModify
2560  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2561  */
2562 static void
2564  ResultRelInfo *rinfo,
2565  List *fdw_private,
2566  int subplan_index,
2567  ExplainState *es)
2568 {
2569  if (es->verbose)
2570  {
2571  char *sql = strVal(list_nth(fdw_private,
2573 
2574  ExplainPropertyText("Remote SQL", sql, es);
2575  }
2576 }
2577 
2578 /*
2579  * postgresExplainDirectModify
2580  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2581  * foreign table directly
2582  */
2583 static void
2585 {
2586  List *fdw_private;
2587  char *sql;
2588 
2589  if (es->verbose)
2590  {
2591  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2592  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2593  ExplainPropertyText("Remote SQL", sql, es);
2594  }
2595 }
2596 
2597 
2598 /*
2599  * estimate_path_cost_size
2600  * Get cost and size estimates for a foreign scan on given foreign relation
2601  * either a base relation or a join between foreign relations or an upper
2602  * relation containing foreign relations.
2603  *
2604  * param_join_conds are the parameterization clauses with outer relations.
2605  * pathkeys specify the expected sort order if any for given path being costed.
2606  * fpextra specifies additional post-scan/join-processing steps such as the
2607  * final sort and the LIMIT restriction.
2608  *
2609  * The function returns the cost and size estimates in p_rows, p_width,
2610  * p_startup_cost and p_total_cost variables.
2611  */
2612 static void
2614  RelOptInfo *foreignrel,
2615  List *param_join_conds,
2616  List *pathkeys,
2617  PgFdwPathExtraData *fpextra,
2618  double *p_rows, int *p_width,
2619  Cost *p_startup_cost, Cost *p_total_cost)
2620 {
2621  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2622  double rows;
2623  double retrieved_rows;
2624  int width;
2625  Cost startup_cost;
2626  Cost total_cost;
2627 
2628  /* Make sure the core code has set up the relation's reltarget */
2629  Assert(foreignrel->reltarget);
2630 
2631  /*
2632  * If the table or the server is configured to use remote estimates,
2633  * connect to the foreign server and execute EXPLAIN to estimate the
2634  * number of rows selected by the restriction+join clauses. Otherwise,
2635  * estimate rows using whatever statistics we have locally, in a way
2636  * similar to ordinary tables.
2637  */
2638  if (fpinfo->use_remote_estimate)
2639  {
2640  List *remote_param_join_conds;
2641  List *local_param_join_conds;
2642  StringInfoData sql;
2643  PGconn *conn;
2644  Selectivity local_sel;
2645  QualCost local_cost;
2646  List *fdw_scan_tlist = NIL;
2647  List *remote_conds;
2648 
2649  /* Required only to be passed to deparseSelectStmtForRel */
2651 
2652  /*
2653  * param_join_conds might contain both clauses that are safe to send
2654  * across, and clauses that aren't.
2655  */
2656  classifyConditions(root, foreignrel, param_join_conds,
2657  &remote_param_join_conds, &local_param_join_conds);
2658 
2659  /* Build the list of columns to be fetched from the foreign server. */
2660  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2661  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2662  else
2663  fdw_scan_tlist = NIL;
2664 
2665  /*
2666  * The complete list of remote conditions includes everything from
2667  * baserestrictinfo plus any extra join_conds relevant to this
2668  * particular path.
2669  */
2670  remote_conds = list_concat(remote_param_join_conds,
2671  fpinfo->remote_conds);
2672 
2673  /*
2674  * Construct EXPLAIN query including the desired SELECT, FROM, and
2675  * WHERE clauses. Params and other-relation Vars are replaced by dummy
2676  * values, so don't request params_list.
2677  */
2678  initStringInfo(&sql);
2679  appendStringInfoString(&sql, "EXPLAIN ");
2680  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2681  remote_conds, pathkeys,
2682  fpextra ? fpextra->has_final_sort : false,
2683  fpextra ? fpextra->has_limit : false,
2684  false, &retrieved_attrs, NULL);
2685 
2686  /* Get the remote estimate */
2687  conn = GetConnection(fpinfo->user, false);
2688  get_remote_estimate(sql.data, conn, &rows, &width,
2689  &startup_cost, &total_cost);
2690  ReleaseConnection(conn);
2691 
2692  retrieved_rows = rows;
2693 
2694  /* Factor in the selectivity of the locally-checked quals */
2695  local_sel = clauselist_selectivity(root,
2696  local_param_join_conds,
2697  foreignrel->relid,
2698  JOIN_INNER,
2699  NULL);
2700  local_sel *= fpinfo->local_conds_sel;
2701 
2702  rows = clamp_row_est(rows * local_sel);
2703 
2704  /* Add in the eval cost of the locally-checked quals */
2705  startup_cost += fpinfo->local_conds_cost.startup;
2706  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2707  cost_qual_eval(&local_cost, local_param_join_conds, root);
2708  startup_cost += local_cost.startup;
2709  total_cost += local_cost.per_tuple * retrieved_rows;
2710 
2711  /*
2712  * Add in tlist eval cost for each output row. In case of an
2713  * aggregate, some of the tlist expressions such as grouping
2714  * expressions will be evaluated remotely, so adjust the costs.
2715  */
2716  startup_cost += foreignrel->reltarget->cost.startup;
2717  total_cost += foreignrel->reltarget->cost.startup;
2718  total_cost += foreignrel->reltarget->cost.per_tuple * rows;
2719  if (IS_UPPER_REL(foreignrel))
2720  {
2721  QualCost tlist_cost;
2722 
2723  cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
2724  startup_cost -= tlist_cost.startup;
2725  total_cost -= tlist_cost.startup;
2726  total_cost -= tlist_cost.per_tuple * rows;
2727  }
2728  }
2729  else
2730  {
2731  Cost run_cost = 0;
2732 
2733  /*
2734  * We don't support join conditions in this mode (hence, no
2735  * parameterized paths can be made).
2736  */
2737  Assert(param_join_conds == NIL);
2738 
2739  /*
2740  * We will come here again and again with different set of pathkeys or
2741  * additional post-scan/join-processing steps that caller wants to
2742  * cost. We don't need to calculate the cost/size estimates for the
2743  * underlying scan, join, or grouping each time. Instead, use those
2744  * estimates if we have cached them already.
2745  */
2746  if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
2747  {
2748  Assert(fpinfo->retrieved_rows >= 1);
2749 
2750  rows = fpinfo->rows;
2751  retrieved_rows = fpinfo->retrieved_rows;
2752  width = fpinfo->width;
2753  startup_cost = fpinfo->rel_startup_cost;
2754  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2755 
2756  /*
2757  * If we estimate the costs of a foreign scan or a foreign join
2758  * with additional post-scan/join-processing steps, the scan or
2759  * join costs obtained from the cache wouldn't yet contain the
2760  * eval costs for the final scan/join target, which would've been
2761  * updated by apply_scanjoin_target_to_paths(); add the eval costs
2762  * now.
2763  */
2764  if (fpextra && !IS_UPPER_REL(foreignrel))
2765  {
2766  /* Shouldn't get here unless we have LIMIT */
2767  Assert(fpextra->has_limit);
2768  Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
2769  foreignrel->reloptkind == RELOPT_JOINREL);
2770  startup_cost += foreignrel->reltarget->cost.startup;
2771  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2772  }
2773  }
2774  else if (IS_JOIN_REL(foreignrel))
2775  {
2776  PgFdwRelationInfo *fpinfo_i;
2777  PgFdwRelationInfo *fpinfo_o;
2778  QualCost join_cost;
2779  QualCost remote_conds_cost;
2780  double nrows;
2781 
2782  /* Use rows/width estimates made by the core code. */
2783  rows = foreignrel->rows;
2784  width = foreignrel->reltarget->width;
2785 
2786  /* For join we expect inner and outer relations set */
2787  Assert(fpinfo->innerrel && fpinfo->outerrel);
2788 
2789  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2790  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2791 
2792  /* Estimate of number of rows in cross product */
2793  nrows = fpinfo_i->rows * fpinfo_o->rows;
2794 
2795  /*
2796  * Back into an estimate of the number of retrieved rows. Just in
2797  * case this is nuts, clamp to at most nrows.
2798  */
2799  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2800  retrieved_rows = Min(retrieved_rows, nrows);
2801 
2802  /*
2803  * The cost of foreign join is estimated as cost of generating
2804  * rows for the joining relations + cost for applying quals on the
2805  * rows.
2806  */
2807 
2808  /*
2809  * Calculate the cost of clauses pushed down to the foreign server
2810  */
2811  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2812  /* Calculate the cost of applying join clauses */
2813  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2814 
2815  /*
2816  * Startup cost includes startup cost of joining relations and the
2817  * startup cost for join and other clauses. We do not include the
2818  * startup cost specific to join strategy (e.g. setting up hash
2819  * tables) since we do not know what strategy the foreign server
2820  * is going to use.
2821  */
2822  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2823  startup_cost += join_cost.startup;
2824  startup_cost += remote_conds_cost.startup;
2825  startup_cost += fpinfo->local_conds_cost.startup;
2826 
2827  /*
2828  * Run time cost includes:
2829  *
2830  * 1. Run time cost (total_cost - startup_cost) of relations being
2831  * joined
2832  *
2833  * 2. Run time cost of applying join clauses on the cross product
2834  * of the joining relations.
2835  *
2836  * 3. Run time cost of applying pushed down other clauses on the
2837  * result of join
2838  *
2839  * 4. Run time cost of applying nonpushable other clauses locally
2840  * on the result fetched from the foreign server.
2841  */
2842  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2843  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2844  run_cost += nrows * join_cost.per_tuple;
2845  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2846  run_cost += nrows * remote_conds_cost.per_tuple;
2847  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2848 
2849  /* Add in tlist eval cost for each output row */
2850  startup_cost += foreignrel->reltarget->cost.startup;
2851  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2852  }
2853  else if (IS_UPPER_REL(foreignrel))
2854  {
2855  RelOptInfo *outerrel = fpinfo->outerrel;
2856  PgFdwRelationInfo *ofpinfo;
2857  AggClauseCosts aggcosts;
2858  double input_rows;
2859  int numGroupCols;
2860  double numGroups = 1;
2861 
2862  /* The upper relation should have its outer relation set */
2863  Assert(outerrel);
2864  /* and that outer relation should have its reltarget set */
2865  Assert(outerrel->reltarget);
2866 
2867  /*
2868  * This cost model is mixture of costing done for sorted and
2869  * hashed aggregates in cost_agg(). We are not sure which
2870  * strategy will be considered at remote side, thus for
2871  * simplicity, we put all startup related costs in startup_cost
2872  * and all finalization and run cost are added in total_cost.
2873  */
2874 
2875  ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
2876 
2877  /* Get rows from input rel */
2878  input_rows = ofpinfo->rows;
2879 
2880  /* Collect statistics about aggregates for estimating costs. */
2881  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2882  if (root->parse->hasAggs)
2883  {
2884  get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2885  AGGSPLIT_SIMPLE, &aggcosts);
2886 
2887  /*
2888  * The cost of aggregates in the HAVING qual will be the same
2889  * for each child as it is for the parent, so there's no need
2890  * to use a translated version of havingQual.
2891  */
2892  get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2893  AGGSPLIT_SIMPLE, &aggcosts);
2894  }
2895 
2896  /* Get number of grouping columns and possible number of groups */
2897  numGroupCols = list_length(root->parse->groupClause);
2898  numGroups = estimate_num_groups(root,
2900  fpinfo->grouped_tlist),
2901  input_rows, NULL);
2902 
2903  /*
2904  * Get the retrieved_rows and rows estimates. If there are HAVING
2905  * quals, account for their selectivity.
2906  */
2907  if (root->parse->havingQual)
2908  {
2909  /* Factor in the selectivity of the remotely-checked quals */
2910  retrieved_rows =
2911  clamp_row_est(numGroups *
2913  fpinfo->remote_conds,
2914  0,
2915  JOIN_INNER,
2916  NULL));
2917  /* Factor in the selectivity of the locally-checked quals */
2918  rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
2919  }
2920  else
2921  {
2922  rows = retrieved_rows = numGroups;
2923  }
2924 
2925  /* Use width estimate made by the core code. */
2926  width = foreignrel->reltarget->width;
2927 
2928  /*-----
2929  * Startup cost includes:
2930  * 1. Startup cost for underneath input relation, adjusted for
2931  * tlist replacement by apply_scanjoin_target_to_paths()
2932  * 2. Cost of performing aggregation, per cost_agg()
2933  *-----
2934  */
2935  startup_cost = ofpinfo->rel_startup_cost;
2936  startup_cost += outerrel->reltarget->cost.startup;
2937  startup_cost += aggcosts.transCost.startup;
2938  startup_cost += aggcosts.transCost.per_tuple * input_rows;
2939  startup_cost += aggcosts.finalCost.startup;
2940  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2941 
2942  /*-----
2943  * Run time cost includes:
2944  * 1. Run time cost of underneath input relation, adjusted for
2945  * tlist replacement by apply_scanjoin_target_to_paths()
2946  * 2. Run time cost of performing aggregation, per cost_agg()
2947  *-----
2948  */
2949  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2950  run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
2951  run_cost += aggcosts.finalCost.per_tuple * numGroups;
2952  run_cost += cpu_tuple_cost * numGroups;
2953 
2954  /* Account for the eval cost of HAVING quals, if any */
2955  if (root->parse->havingQual)
2956  {
2957  QualCost remote_cost;
2958 
2959  /* Add in the eval cost of the remotely-checked quals */
2960  cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
2961  startup_cost += remote_cost.startup;
2962  run_cost += remote_cost.per_tuple * numGroups;
2963  /* Add in the eval cost of the locally-checked quals */
2964  startup_cost += fpinfo->local_conds_cost.startup;
2965  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2966  }
2967 
2968  /* Add in tlist eval cost for each output row */
2969  startup_cost += foreignrel->reltarget->cost.startup;
2970  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2971  }
2972  else
2973  {
2974  Cost cpu_per_tuple;
2975 
2976  /* Use rows/width estimates made by set_baserel_size_estimates. */
2977  rows = foreignrel->rows;
2978  width = foreignrel->reltarget->width;
2979 
2980  /*
2981  * Back into an estimate of the number of retrieved rows. Just in
2982  * case this is nuts, clamp to at most foreignrel->tuples.
2983  */
2984  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2985  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2986 
2987  /*
2988  * Cost as though this were a seqscan, which is pessimistic. We
2989  * effectively imagine the local_conds are being evaluated
2990  * remotely, too.
2991  */
2992  startup_cost = 0;
2993  run_cost = 0;
2994  run_cost += seq_page_cost * foreignrel->pages;
2995 
2996  startup_cost += foreignrel->baserestrictcost.startup;
2997  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2998  run_cost += cpu_per_tuple * foreignrel->tuples;
2999 
3000  /* Add in tlist eval cost for each output row */
3001  startup_cost += foreignrel->reltarget->cost.startup;
3002  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3003  }
3004 
3005  /*
3006  * Without remote estimates, we have no real way to estimate the cost
3007  * of generating sorted output. It could be free if the query plan
3008  * the remote side would have chosen generates properly-sorted output
3009  * anyway, but in most cases it will cost something. Estimate a value
3010  * high enough that we won't pick the sorted path when the ordering
3011  * isn't locally useful, but low enough that we'll err on the side of
3012  * pushing down the ORDER BY clause when it's useful to do so.
3013  */
3014  if (pathkeys != NIL)
3015  {
3016  if (IS_UPPER_REL(foreignrel))
3017  {
3018  Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3019  fpinfo->stage == UPPERREL_GROUP_AGG);
3020  adjust_foreign_grouping_path_cost(root, pathkeys,
3021  retrieved_rows, width,
3022  fpextra->limit_tuples,
3023  &startup_cost, &run_cost);
3024  }
3025  else
3026  {
3027  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3028  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3029  }
3030  }
3031 
3032  total_cost = startup_cost + run_cost;
3033 
3034  /* Adjust the cost estimates if we have LIMIT */
3035  if (fpextra && fpextra->has_limit)
3036  {
3037  adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3038  fpextra->offset_est, fpextra->count_est);
3039  retrieved_rows = rows;
3040  }
3041  }
3042 
3043  /*
3044  * If this includes the final sort step, the given target, which will be
3045  * applied to the resulting path, might have different expressions from
3046  * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3047  * eval costs.
3048  */
3049  if (fpextra && fpextra->has_final_sort &&
3050  fpextra->target != foreignrel->reltarget)
3051  {
3052  QualCost oldcost = foreignrel->reltarget->cost;
3053  QualCost newcost = fpextra->target->cost;
3054 
3055  startup_cost += newcost.startup - oldcost.startup;
3056  total_cost += newcost.startup - oldcost.startup;
3057  total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3058  }
3059 
3060  /*
3061  * Cache the retrieved rows and cost estimates for scans, joins, or
3062  * groupings without any parameterization, pathkeys, or additional
3063  * post-scan/join-processing steps, before adding the costs for
3064  * transferring data from the foreign server. These estimates are useful
3065  * for costing remote joins involving this relation or costing other
3066  * remote operations on this relation such as remote sorts and remote
3067  * LIMIT restrictions, when the costs can not be obtained from the foreign
3068  * server. This function will be called at least once for every foreign
3069  * relation without any parameterization, pathkeys, or additional
3070  * post-scan/join-processing steps.
3071  */
3072  if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3073  {
3074  fpinfo->retrieved_rows = retrieved_rows;
3075  fpinfo->rel_startup_cost = startup_cost;
3076  fpinfo->rel_total_cost = total_cost;
3077  }
3078 
3079  /*
3080  * Add some additional cost factors to account for connection overhead
3081  * (fdw_startup_cost), transferring data across the network
3082  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3083  * (cpu_tuple_cost per retrieved row).
3084  */
3085  startup_cost += fpinfo->fdw_startup_cost;
3086  total_cost += fpinfo->fdw_startup_cost;
3087  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3088  total_cost += cpu_tuple_cost * retrieved_rows;
3089 
3090  /*
3091  * If we have LIMIT, we should prefer performing the restriction remotely
3092  * rather than locally, as the former avoids extra row fetches from the
3093  * remote that the latter might cause. But since the core code doesn't
3094  * account for such fetches when estimating the costs of the local
3095  * restriction (see create_limit_path()), there would be no difference
3096  * between the costs of the local restriction and the costs of the remote
3097  * restriction estimated above if we don't use remote estimates (except
3098  * for the case where the foreignrel is a grouping relation, the given
3099  * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3100  * accounted for in costing the remote restriction). Tweak the costs of
3101  * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3102  * one.
3103  */
3104  if (!fpinfo->use_remote_estimate &&
3105  fpextra && fpextra->has_limit &&
3106  fpextra->limit_tuples > 0 &&
3107  fpextra->limit_tuples < fpinfo->rows)
3108  {
3109  Assert(fpinfo->rows > 0);
3110  total_cost -= (total_cost - startup_cost) * 0.05 *
3111  (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3112  }
3113 
3114  /* Return results. */
3115  *p_rows = rows;
3116  *p_width = width;
3117  *p_startup_cost = startup_cost;
3118  *p_total_cost = total_cost;
3119 }
3120 
3121 /*
3122  * Estimate costs of executing a SQL statement remotely.
3123  * The given "sql" must be an EXPLAIN command.
3124  */
3125 static void
3126 get_remote_estimate(const char *sql, PGconn *conn,
3127  double *rows, int *width,
3128  Cost *startup_cost, Cost *total_cost)
3129 {
3130  PGresult *volatile res = NULL;
3131 
3132  /* PGresult must be released before leaving this function. */
3133  PG_TRY();
3134  {
3135  char *line;
3136  char *p;
3137  int n;
3138 
3139  /*
3140  * Execute EXPLAIN remotely.
3141  */
3142  res = pgfdw_exec_query(conn, sql);
3143  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3144  pgfdw_report_error(ERROR, res, conn, false, sql);
3145 
3146  /*
3147  * Extract cost numbers for topmost plan node. Note we search for a
3148  * left paren from the end of the line to avoid being confused by
3149  * other uses of parentheses.
3150  */
3151  line = PQgetvalue(res, 0, 0);
3152  p = strrchr(line, '(');
3153  if (p == NULL)
3154  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3155  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3156  startup_cost, total_cost, rows, width);
3157  if (n != 4)
3158  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3159 
3160  PQclear(res);
3161  res = NULL;
3162  }
3163  PG_CATCH();
3164  {
3165  if (res)
3166  PQclear(res);
3167  PG_RE_THROW();
3168  }
3169  PG_END_TRY();
3170 }
3171 
3172 /*
3173  * Adjust the cost estimates of a foreign grouping path to include the cost of
3174  * generating properly-sorted output.
3175  */
3176 static void
3178  List *pathkeys,
3179  double retrieved_rows,
3180  double width,
3181  double limit_tuples,
3182  Cost *p_startup_cost,
3183  Cost *p_run_cost)
3184 {
3185  /*
3186  * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3187  * side is unlikely to generate properly-sorted output, so it would need
3188  * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3189  * if the GROUP BY clause is sort-able but isn't a superset of the given
3190  * pathkeys, adjust the costs with that function. Otherwise, adjust the
3191  * costs by applying the same heuristic as for the scan or join case.
3192  */
3193  if (!grouping_is_sortable(root->parse->groupClause) ||
3194  !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3195  {
3196  Path sort_path; /* dummy for result of cost_sort */
3197 
3198  cost_sort(&sort_path,
3199  root,
3200  pathkeys,
3201  *p_startup_cost + *p_run_cost,
3202  retrieved_rows,
3203  width,
3204  0.0,
3205  work_mem,
3206  limit_tuples);
3207 
3208  *p_startup_cost = sort_path.startup_cost;
3209  *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3210  }
3211  else
3212  {
3213  /*
3214  * The default extra cost seems too large for foreign-grouping cases;
3215  * add 1/4th of that default.
3216  */
3217  double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3218  - 1.0) * 0.25;
3219 
3220  *p_startup_cost *= sort_multiplier;
3221  *p_run_cost *= sort_multiplier;
3222  }
3223 }
3224 
3225 /*
3226  * Detect whether we want to process an EquivalenceClass member.
3227  *
3228  * This is a callback for use by generate_implied_equalities_for_column.
3229  */
3230 static bool
3233  void *arg)
3234 {
3236  Expr *expr = em->em_expr;
3237 
3238  /*
3239  * If we've identified what we're processing in the current scan, we only
3240  * want to match that expression.
3241  */
3242  if (state->current != NULL)
3243  return equal(expr, state->current);
3244 
3245  /*
3246  * Otherwise, ignore anything we've already processed.
3247  */
3248  if (list_member(state->already_used, expr))
3249  return false;
3250 
3251  /* This is the new target to process. */
3252  state->current = expr;
3253  return true;
3254 }
3255 
3256 /*
3257  * Create cursor for node's query with current parameter values.
3258  */
3259 static void
3261 {
3262  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3263  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3264  int numParams = fsstate->numParams;
3265  const char **values = fsstate->param_values;
3266  PGconn *conn = fsstate->conn;
3268  PGresult *res;
3269 
3270  /*
3271  * Construct array of query parameter values in text format. We do the
3272  * conversions in the short-lived per-tuple context, so as not to cause a
3273  * memory leak over repeated scans.
3274  */
3275  if (numParams > 0)
3276  {
3277  MemoryContext oldcontext;
3278 
3279  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3280 
3281  process_query_params(econtext,
3282  fsstate->param_flinfo,
3283  fsstate->param_exprs,
3284  values);
3285 
3286  MemoryContextSwitchTo(oldcontext);
3287  }
3288 
3289  /* Construct the DECLARE CURSOR command */
3290  initStringInfo(&buf);
3291  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3292  fsstate->cursor_number, fsstate->query);
3293 
3294  /*
3295  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3296  * to infer types for all parameters. Since we explicitly cast every
3297  * parameter (see deparse.c), the "inference" is trivial and will produce
3298  * the desired result. This allows us to avoid assuming that the remote
3299  * server has the same OIDs we do for the parameters' types.
3300  */
3301  if (!PQsendQueryParams(conn, buf.data, numParams,
3302  NULL, values, NULL, NULL, 0))
3303  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3304 
3305  /*
3306  * Get the result, and check for success.
3307  *
3308  * We don't use a PG_TRY block here, so be careful not to throw error
3309  * without releasing the PGresult.
3310  */
3311  res = pgfdw_get_result(conn, buf.data);
3312  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3313  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3314  PQclear(res);
3315 
3316  /* Mark the cursor as created, and show no tuples have been retrieved */
3317  fsstate->cursor_exists = true;
3318  fsstate->tuples = NULL;
3319  fsstate->num_tuples = 0;
3320  fsstate->next_tuple = 0;
3321  fsstate->fetch_ct_2 = 0;
3322  fsstate->eof_reached = false;
3323 
3324  /* Clean up */
3325  pfree(buf.data);
3326 }
3327 
3328 /*
3329  * Fetch some more rows from the node's cursor.
3330  */
3331 static void
3333 {
3334  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3335  PGresult *volatile res = NULL;
3336  MemoryContext oldcontext;
3337 
3338  /*
3339  * We'll store the tuples in the batch_cxt. First, flush the previous
3340  * batch.
3341  */
3342  fsstate->tuples = NULL;
3343  MemoryContextReset(fsstate->batch_cxt);
3344  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3345 
3346  /* PGresult must be released before leaving this function. */
3347  PG_TRY();
3348  {
3349  PGconn *conn = fsstate->conn;
3350  char sql[64];
3351  int numrows;
3352  int i;
3353 
3354  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3355  fsstate->fetch_size, fsstate->cursor_number);
3356 
3357  res = pgfdw_exec_query(conn, sql);
3358  /* On error, report the original query, not the FETCH. */
3359  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3360  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3361 
3362  /* Convert the data into HeapTuples */
3363  numrows = PQntuples(res);
3364  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3365  fsstate->num_tuples = numrows;
3366  fsstate->next_tuple = 0;
3367 
3368  for (i = 0; i < numrows; i++)
3369  {
3370  Assert(IsA(node->ss.ps.plan, ForeignScan));
3371 
3372  fsstate->tuples[i] =
3374  fsstate->rel,
3375  fsstate->attinmeta,
3376  fsstate->retrieved_attrs,
3377  node,
3378  fsstate->temp_cxt);
3379  }
3380 
3381  /* Update fetch_ct_2 */
3382  if (fsstate->fetch_ct_2 < 2)
3383  fsstate->fetch_ct_2++;
3384 
3385  /* Must be EOF if we didn't get as many tuples as we asked for. */
3386  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3387 
3388  PQclear(res);
3389  res = NULL;
3390  }
3391  PG_CATCH();
3392  {
3393  if (res)
3394  PQclear(res);
3395  PG_RE_THROW();
3396  }
3397  PG_END_TRY();
3398 
3399  MemoryContextSwitchTo(oldcontext);
3400 }
3401 
3402 /*
3403  * Force assorted GUC parameters to settings that ensure that we'll output
3404  * data values in a form that is unambiguous to the remote server.
3405  *
3406  * This is rather expensive and annoying to do once per row, but there's
3407  * little choice if we want to be sure values are transmitted accurately;
3408  * we can't leave the settings in place between rows for fear of affecting
3409  * user-visible computations.
3410  *
3411  * We use the equivalent of a function SET option to allow the settings to
3412  * persist only until the caller calls reset_transmission_modes(). If an
3413  * error is thrown in between, guc.c will take care of undoing the settings.
3414  *
3415  * The return value is the nestlevel that must be passed to
3416  * reset_transmission_modes() to undo things.
3417  */
3418 int
3420 {
3421  int nestlevel = NewGUCNestLevel();
3422 
3423  /*
3424  * The values set here should match what pg_dump does. See also
3425  * configure_remote_session in connection.c.
3426  */
3427  if (DateStyle != USE_ISO_DATES)
3428  (void) set_config_option("datestyle", "ISO",
3430  GUC_ACTION_SAVE, true, 0, false);
3432  (void) set_config_option("intervalstyle", "postgres",
3434  GUC_ACTION_SAVE, true, 0, false);
3435  if (extra_float_digits < 3)
3436  (void) set_config_option("extra_float_digits", "3",
3438  GUC_ACTION_SAVE, true, 0, false);
3439 
3440  return nestlevel;
3441 }
3442 
3443 /*
3444  * Undo the effects of set_transmission_modes().
3445  */
3446 void
3448 {
3449  AtEOXact_GUC(true, nestlevel);
3450 }
3451 
3452 /*
3453  * Utility routine to close a cursor.
3454  */
3455 static void
3457 {
3458  char sql[64];
3459  PGresult *res;
3460 
3461  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3462 
3463  /*
3464  * We don't use a PG_TRY block here, so be careful not to throw error
3465  * without releasing the PGresult.
3466  */
3467  res = pgfdw_exec_query(conn, sql);
3468  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3469  pgfdw_report_error(ERROR, res, conn, true, sql);
3470  PQclear(res);
3471 }
3472 
3473 /*
3474  * create_foreign_modify
3475  * Construct an execution state of a foreign insert/update/delete
3476  * operation
3477  */
3478 static PgFdwModifyState *
3480  RangeTblEntry *rte,
3481  ResultRelInfo *resultRelInfo,
3482  CmdType operation,
3483  Plan *subplan,
3484  char *query,
3485  List *target_attrs,
3486  bool has_returning,
3488 {
3489  PgFdwModifyState *fmstate;
3490  Relation rel = resultRelInfo->ri_RelationDesc;
3492  Oid userid;
3493  ForeignTable *table;
3494  UserMapping *user;
3495  AttrNumber n_params;
3496  Oid typefnoid;
3497  bool isvarlena;
3498  ListCell *lc;
3499 
3500  /* Begin constructing PgFdwModifyState. */
3501  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3502  fmstate->rel = rel;
3503 
3504  /*
3505  * Identify which user to do the remote access as. This should match what
3506  * ExecCheckRTEPerms() does.
3507  */
3508  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3509 
3510  /* Get info about foreign table. */
3511  table = GetForeignTable(RelationGetRelid(rel));
3512  user = GetUserMapping(userid, table->serverid);
3513 
3514  /* Open connection; report that we'll create a prepared statement. */
3515  fmstate->conn = GetConnection(user, true);
3516  fmstate->p_name = NULL; /* prepared statement not made yet */
3517 
3518  /* Set up remote query information. */
3519  fmstate->query = query;
3520  fmstate->target_attrs = target_attrs;
3521  fmstate->has_returning = has_returning;
3522  fmstate->retrieved_attrs = retrieved_attrs;
3523 
3524  /* Create context for per-tuple temp workspace. */
3525  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3526  "postgres_fdw temporary data",
3528 
3529  /* Prepare for input conversion of RETURNING results. */
3530  if (fmstate->has_returning)
3531  fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3532 
3533  /* Prepare for output conversion of parameters used in prepared stmt. */
3534  n_params = list_length(fmstate->target_attrs) + 1;
3535  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3536  fmstate->p_nums = 0;
3537 
3538  if (operation == CMD_UPDATE || operation == CMD_DELETE)
3539  {
3540  Assert(subplan != NULL);
3541 
3542  /* Find the ctid resjunk column in the subplan's result */
3544  "ctid");
3545  if (!AttributeNumberIsValid(fmstate->ctidAttno))
3546  elog(ERROR, "could not find junk ctid column");
3547 
3548  /* First transmittable parameter will be ctid */
3549  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3550  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3551  fmstate->p_nums++;
3552  }
3553 
3554  if (operation == CMD_INSERT || operation == CMD_UPDATE)
3555  {
3556  /* Set up for remaining transmittable parameters */
3557  foreach(lc, fmstate->target_attrs)
3558  {
3559  int attnum = lfirst_int(lc);
3560  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3561 
3562  Assert(!attr->attisdropped);
3563 
3564  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3565  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3566  fmstate->p_nums++;
3567  }
3568  }
3569 
3570  Assert(fmstate->p_nums <= n_params);
3571 
3572  /* Initialize auxiliary state */
3573  fmstate->aux_fmstate = NULL;
3574 
3575  return fmstate;
3576 }
3577 
3578 /*
3579  * execute_foreign_modify
3580  * Perform foreign-table modification as required, and fetch RETURNING
3581  * result if any. (This is the shared guts of postgresExecForeignInsert,
3582  * postgresExecForeignUpdate, and postgresExecForeignDelete.)
3583  */
3584 static TupleTableSlot *
3586  ResultRelInfo *resultRelInfo,
3587  CmdType operation,
3588  TupleTableSlot *slot,
3589  TupleTableSlot *planSlot)
3590 {
3591  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
3592  ItemPointer ctid = NULL;
3593  const char **p_values;
3594  PGresult *res;
3595  int n_rows;
3596 
3597  /* The operation should be INSERT, UPDATE, or DELETE */
3598  Assert(operation == CMD_INSERT ||
3599  operation == CMD_UPDATE ||
3600  operation == CMD_DELETE);
3601 
3602  /* Set up the prepared statement on the remote server, if we didn't yet */
3603  if (!fmstate->p_name)
3604  prepare_foreign_modify(fmstate);
3605 
3606  /*
3607  * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
3608  */
3609  if (operation == CMD_UPDATE || operation == CMD_DELETE)
3610  {
3611  Datum datum;
3612  bool isNull;
3613 
3614  datum = ExecGetJunkAttribute(planSlot,
3615  fmstate->ctidAttno,
3616  &isNull);
3617  /* shouldn't ever get a null result... */
3618  if (isNull)
3619  elog(ERROR, "ctid is NULL");
3620  ctid = (ItemPointer) DatumGetPointer(datum);
3621  }
3622 
3623  /* Convert parameters needed by prepared statement to text form */
3624  p_values = convert_prep_stmt_params(fmstate, ctid, slot);
3625 
3626  /*
3627  * Execute the prepared statement.
3628  */
3629  if (!PQsendQueryPrepared(fmstate->conn,
3630  fmstate->p_name,
3631  fmstate->p_nums,
3632  p_values,
3633  NULL,
3634  NULL,
3635  0))
3636  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3637 
3638  /*
3639  * Get the result, and check for success.
3640  *
3641  * We don't use a PG_TRY block here, so be careful not to throw error
3642  * without releasing the PGresult.
3643  */
3644  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3645  if (PQresultStatus(res) !=
3647  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3648 
3649  /* Check number of rows affected, and fetch RETURNING tuple if any */
3650  if (fmstate->has_returning)
3651  {
3652  n_rows = PQntuples(res);
3653  if (n_rows > 0)
3654  store_returning_result(fmstate, slot, res);
3655  }
3656  else
3657  n_rows = atoi(PQcmdTuples(res));
3658 
3659  /* And clean up */
3660  PQclear(res);
3661 
3662  MemoryContextReset(fmstate->temp_cxt);
3663 
3664  /*
3665  * Return NULL if nothing was inserted/updated/deleted on the remote end
3666  */
3667  return (n_rows > 0) ? slot : NULL;
3668 }
3669 
3670 /*
3671  * prepare_foreign_modify
3672  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3673  */
3674 static void
3676 {
3677  char prep_name[NAMEDATALEN];
3678  char *p_name;
3679  PGresult *res;
3680 
3681  /* Construct name we'll use for the prepared statement. */
3682  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3683  GetPrepStmtNumber(fmstate->conn));
3684  p_name = pstrdup(prep_name);
3685 
3686  /*
3687  * We intentionally do not specify parameter types here, but leave the
3688  * remote server to derive them by default. This avoids possible problems
3689  * with the remote server using different type OIDs than we do. All of
3690  * the prepared statements we use in this module are simple enough that
3691  * the remote server will make the right choices.
3692  */
3693  if (!PQsendPrepare(fmstate->conn,
3694  p_name,
3695  fmstate->query,
3696  0,
3697  NULL))
3698  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3699 
3700  /*
3701  * Get the result, and check for success.
3702  *
3703  * We don't use a PG_TRY block here, so be careful not to throw error
3704  * without releasing the PGresult.
3705  */
3706  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3707  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3708  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3709  PQclear(res);
3710 
3711  /* This action shows that the prepare has been done. */
3712  fmstate->p_name = p_name;
3713 }
3714 
3715 /*
3716  * convert_prep_stmt_params
3717  * Create array of text strings representing parameter values
3718  *
3719  * tupleid is ctid to send, or NULL if none
3720  * slot is slot to get remaining parameters from, or NULL if none
3721  *
3722  * Data is constructed in temp_cxt; caller should reset that after use.
3723  */
3724 static const char **
3726  ItemPointer tupleid,
3727  TupleTableSlot *slot)
3728 {
3729  const char **p_values;
3730  int pindex = 0;
3731  MemoryContext oldcontext;
3732 
3733  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3734 
3735  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3736 
3737  /* 1st parameter should be ctid, if it's in use */
3738  if (tupleid != NULL)
3739  {
3740  /* don't need set_transmission_modes for TID output */
3741  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3742  PointerGetDatum(tupleid));
3743  pindex++;
3744  }
3745 
3746  /* get following parameters from slot */
3747  if (slot != NULL && fmstate->target_attrs != NIL)
3748  {
3749  int nestlevel;
3750  ListCell *lc;
3751 
3752  nestlevel = set_transmission_modes();
3753 
3754  foreach(lc, fmstate->target_attrs)
3755  {
3756  int attnum = lfirst_int(lc);
3757  Datum value;
3758  bool isnull;
3759 
3760  value = slot_getattr(slot, attnum, &isnull);
3761  if (isnull)
3762  p_values[pindex] = NULL;
3763  else
3764  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3765  value);
3766  pindex++;
3767  }
3768 
3769  reset_transmission_modes(nestlevel);
3770  }
3771 
3772  Assert(pindex == fmstate->p_nums);
3773 
3774  MemoryContextSwitchTo(oldcontext);
3775 
3776  return p_values;
3777 }
3778 
3779 /*
3780  * store_returning_result
3781  * Store the result of a RETURNING clause
3782  *
3783  * On error, be sure to release the PGresult on the way out. Callers do not
3784  * have PG_TRY blocks to ensure this happens.
3785  */
3786 static void
3788  TupleTableSlot *slot, PGresult *res)
3789 {
3790  PG_TRY();
3791  {
3792  HeapTuple newtup;
3793 
3794  newtup = make_tuple_from_result_row(res, 0,
3795  fmstate->rel,
3796  fmstate->attinmeta,
3797  fmstate->retrieved_attrs,
3798  NULL,
3799  fmstate->temp_cxt);
3800 
3801  /*
3802  * The returning slot will not necessarily be suitable to store
3803  * heaptuples directly, so allow for conversion.
3804  */
3805  ExecForceStoreHeapTuple(newtup, slot, true);
3806  }
3807  PG_CATCH();
3808  {
3809  if (res)
3810  PQclear(res);
3811  PG_RE_THROW();
3812  }
3813  PG_END_TRY();
3814 }
3815 
3816 /*
3817  * finish_foreign_modify
3818  * Release resources for a foreign insert/update/delete operation
3819  */
3820 static void
3822 {
3823  Assert(fmstate != NULL);
3824 
3825  /* If we created a prepared statement, destroy it */
3826  if (fmstate->p_name)
3827  {
3828  char sql[64];
3829  PGresult *res;
3830 
3831  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3832 
3833  /*
3834  * We don't use a PG_TRY block here, so be careful not to throw error
3835  * without releasing the PGresult.
3836  */
3837  res = pgfdw_exec_query(fmstate->conn, sql);
3838  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3839  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3840  PQclear(res);
3841  fmstate->p_name = NULL;
3842  }
3843 
3844  /* Release remote connection */
3845  ReleaseConnection(fmstate->conn);
3846  fmstate->conn = NULL;
3847 }
3848 
3849 /*
3850  * build_remote_returning
3851  * Build a RETURNING targetlist of a remote query for performing an
3852  * UPDATE/DELETE .. RETURNING on a join directly
3853  */
3854 static List *
3855 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3856 {
3857  bool have_wholerow = false;
3858  List *tlist = NIL;
3859  List *vars;
3860  ListCell *lc;
3861 
3862  Assert(returningList);
3863 
3864  vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3865 
3866  /*
3867  * If there's a whole-row reference to the target relation, then we'll
3868  * need all the columns of the relation.
3869  */
3870  foreach(lc, vars)
3871  {
3872  Var *var = (Var *) lfirst(lc);
3873 
3874  if (IsA(var, Var) &&
3875  var->varno == rtindex &&
3876  var->varattno == InvalidAttrNumber)
3877  {
3878  have_wholerow = true;
3879  break;
3880  }
3881  }
3882 
3883  if (have_wholerow)
3884  {
3886  int i;
3887 
3888  for (i = 1; i <= tupdesc->natts; i++)
3889  {
3890  Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3891  Var *var;
3892 
3893  /* Ignore dropped attributes. */
3894  if (attr->attisdropped)
3895  continue;
3896 
3897  var = makeVar(rtindex,
3898  i,
3899  attr->atttypid,
3900  attr->atttypmod,
3901  attr->attcollation,
3902  0);
3903 
3904  tlist = lappend(tlist,
3905  makeTargetEntry((Expr *) var,
3906  list_length(tlist) + 1,
3907  NULL,
3908  false));
3909  }
3910  }
3911 
3912  /* Now add any remaining columns to tlist. */
3913  foreach(lc, vars)
3914  {
3915  Var *var = (Var *) lfirst(lc);
3916 
3917  /*
3918  * No need for whole-row references to the target relation. We don't
3919  * need system columns other than ctid and oid either, since those are
3920  * set locally.
3921  */
3922  if (IsA(var, Var) &&
3923  var->varno == rtindex &&
3924  var->varattno <= InvalidAttrNumber &&
3926  continue; /* don't need it */
3927 
3928  if (tlist_member((Expr *) var, tlist))
3929  continue; /* already got it */
3930 
3931  tlist = lappend(tlist,
3932  makeTargetEntry((Expr *) var,
3933  list_length(tlist) + 1,
3934  NULL,
3935  false));
3936  }
3937 
3938  list_free(vars);
3939 
3940  return tlist;
3941 }
3942 
3943 /*
3944  * rebuild_fdw_scan_tlist
3945  * Build new fdw_scan_tlist of given foreign-scan plan node from given
3946  * tlist
3947  *
3948  * There might be columns that the fdw_scan_tlist of the given foreign-scan
3949  * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
3950  * have contained resjunk columns such as 'ctid' of the target relation and
3951  * 'wholerow' of non-target relations, but the tlist might not contain them,
3952  * for example. So, adjust the tlist so it contains all the columns specified
3953  * in the fdw_scan_tlist; else setrefs.c will get confused.
3954  */
3955 static void
3957 {
3958  List *new_tlist = tlist;
3959  List *old_tlist = fscan->fdw_scan_tlist;
3960  ListCell *lc;
3961 
3962  foreach(lc, old_tlist)
3963  {
3964  TargetEntry *tle = (TargetEntry *) lfirst(lc);
3965 
3966  if (tlist_member(tle->expr, new_tlist))
3967  continue; /* already got it */
3968 
3969  new_tlist = lappend(new_tlist,
3970  makeTargetEntry(tle->expr,
3971  list_length(new_tlist) + 1,
3972  NULL,
3973  false));
3974  }
3975  fscan->fdw_scan_tlist = new_tlist;
3976 }
3977 
3978 /*
3979  * Execute a direct UPDATE/DELETE statement.
3980  */
3981 static void
3983 {
3985  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3986  int numParams = dmstate->numParams;
3987  const char **values = dmstate->param_values;
3988 
3989  /*
3990  * Construct array of query parameter values in text format.
3991  */
3992  if (numParams > 0)
3993  process_query_params(econtext,
3994  dmstate->param_flinfo,
3995  dmstate->param_exprs,
3996  values);
3997 
3998  /*
3999  * Notice that we pass NULL for paramTypes, thus forcing the remote server
4000  * to infer types for all parameters. Since we explicitly cast every
4001  * parameter (see deparse.c), the "inference" is trivial and will produce
4002  * the desired result. This allows us to avoid assuming that the remote
4003  * server has the same OIDs we do for the parameters' types.
4004  */
4005  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4006  NULL, values, NULL, NULL, 0))
4007  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4008 
4009  /*
4010  * Get the result, and check for success.
4011  *
4012  * We don't use a PG_TRY block here, so be careful not to throw error
4013  * without releasing the PGresult.
4014  */
4015  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4016  if (PQresultStatus(dmstate->result) !=
4018  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4019  dmstate->query);
4020 
4021  /* Get the number of rows affected. */
4022  if (dmstate->has_returning)
4023  dmstate->num_tuples = PQntuples(dmstate->result);
4024  else
4025  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4026 }
4027 
4028 /*
4029  * Get the result of a RETURNING clause.
4030  */
4031 static TupleTableSlot *
4033 {
4035  EState *estate = node->ss.ps.state;
4036  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
4037  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4038  TupleTableSlot *resultSlot;
4039 
4040  Assert(resultRelInfo->ri_projectReturning);
4041 
4042  /* If we didn't get any tuples, must be end of data. */
4043  if (dmstate->next_tuple >= dmstate->num_tuples)
4044  return ExecClearTuple(slot);
4045 
4046  /* Increment the command es_processed count if necessary. */
4047  if (dmstate->set_processed)
4048  estate->es_processed += 1;
4049 
4050  /*
4051  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4052  * tuple. (has_returning is false when the local query is of the form
4053  * "UPDATE/DELETE .. RETURNING 1" for example.)
4054  */
4055  if (!dmstate->has_returning)
4056  {
4057  ExecStoreAllNullTuple(slot);
4058  resultSlot = slot;
4059  }
4060  else
4061  {
4062  /*
4063  * On error, be sure to release the PGresult on the way out. Callers
4064  * do not have PG_TRY blocks to ensure this happens.
4065  */
4066  PG_TRY();
4067  {
4068  HeapTuple newtup;
4069 
4070  newtup = make_tuple_from_result_row(dmstate->result,
4071  dmstate->next_tuple,
4072  dmstate->rel,
4073  dmstate->attinmeta,
4074  dmstate->retrieved_attrs,
4075  node,
4076  dmstate->temp_cxt);
4077  ExecStoreHeapTuple(newtup, slot, false);
4078  }
4079  PG_CATCH();
4080  {
4081  if (dmstate->result)
4082  PQclear(dmstate->result);
4083  PG_RE_THROW();
4084  }
4085  PG_END_TRY();
4086 
4087  /* Get the updated/deleted tuple. */
4088  if (dmstate->rel)
4089  resultSlot = slot;
4090  else
4091  resultSlot = apply_returning_filter(dmstate, slot, estate);
4092  }
4093  dmstate->next_tuple++;
4094 
4095  /* Make slot available for evaluation of the local query RETURNING list. */
4096  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4097  resultSlot;
4098 
4099  return slot;
4100 }
4101 
4102 /*
4103  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4104  */
4105 static void
4107  List *fdw_scan_tlist,
4108  Index rtindex)
4109 {
4110  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4111  ListCell *lc;
4112  int i;
4113 
4114  /*
4115  * Calculate the mapping between the fdw_scan_tlist's entries and the
4116  * result tuple's attributes.
4117  *
4118  * The "map" is an array of indexes of the result tuple's attributes in
4119  * fdw_scan_tlist, i.e., one entry for every attribute of the result
4120  * tuple. We store zero for any attributes that don't have the
4121  * corresponding entries in that list, marking that a NULL is needed in
4122  * the result tuple.
4123  *
4124  * Also get the indexes of the entries for ctid and oid if any.
4125  */
4126  dmstate->attnoMap = (AttrNumber *)
4127  palloc0(resultTupType->natts * sizeof(AttrNumber));
4128 
4129  dmstate->ctidAttno = dmstate->oidAttno = 0;
4130 
4131  i = 1;
4132  dmstate->hasSystemCols = false;
4133  foreach(lc, fdw_scan_tlist)
4134  {
4135  TargetEntry *tle = (TargetEntry *) lfirst(lc);
4136  Var *var = (Var *) tle->expr;
4137 
4138  Assert(IsA(var, Var));
4139 
4140  /*
4141  * If the Var is a column of the target relation to be retrieved from
4142  * the foreign server, get the index of the entry.
4143  */
4144  if (var->varno == rtindex &&
4145  list_member_int(dmstate->retrieved_attrs, i))
4146  {
4147  int attrno = var->varattno;
4148 
4149  if (attrno < 0)
4150  {
4151  /*
4152  * We don't retrieve system columns other than ctid and oid.
4153  */
4154  if (attrno == SelfItemPointerAttributeNumber)
4155  dmstate->ctidAttno = i;
4156  else
4157  Assert(false);
4158  dmstate->hasSystemCols = true;
4159  }
4160  else
4161  {
4162  /*
4163  * We don't retrieve whole-row references to the target
4164  * relation either.
4165  */
4166  Assert(attrno > 0);
4167 
4168  dmstate->attnoMap[attrno - 1] = i;
4169  }
4170  }
4171  i++;
4172  }
4173 }
4174 
4175 /*
4176  * Extract and return an updated/deleted tuple from a scan tuple.
4177  */
4178 static TupleTableSlot *
4180  TupleTableSlot *slot,
4181  EState *estate)
4182 {
4183  ResultRelInfo *relInfo = estate->es_result_relation_info;
4184  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4185  TupleTableSlot *resultSlot;
4186  Datum *values;
4187  bool *isnull;
4188  Datum *old_values;
4189  bool *old_isnull;
4190  int i;
4191 
4192  /*
4193  * Use the return tuple slot as a place to store the result tuple.
4194  */
4195  resultSlot = ExecGetReturningSlot(estate, relInfo);
4196 
4197  /*
4198  * Extract all the values of the scan tuple.
4199  */
4200  slot_getallattrs(slot);
4201  old_values = slot->tts_values;
4202  old_isnull = slot->tts_isnull;
4203 
4204  /*
4205  * Prepare to build the result tuple.
4206  */
4207  ExecClearTuple(resultSlot);
4208  values = resultSlot->tts_values;
4209  isnull = resultSlot->tts_isnull;
4210 
4211  /*
4212  * Transpose data into proper fields of the result tuple.
4213  */
4214  for (i = 0; i < resultTupType->natts; i++)
4215  {
4216  int j = dmstate->attnoMap[i];
4217 
4218  if (j == 0)
4219  {
4220  values[i] = (Datum) 0;
4221  isnull[i] = true;
4222  }
4223  else
4224  {
4225  values[i] = old_values[j - 1];
4226  isnull[i] = old_isnull[j - 1];
4227  }
4228  }
4229 
4230  /*
4231  * Build the virtual tuple.
4232  */
4233  ExecStoreVirtualTuple(resultSlot);
4234 
4235  /*
4236  * If we have any system columns to return, materialize a heap tuple in
4237  * the slot from column values set above and install system columns in
4238  * that tuple.
4239  */
4240  if (dmstate->hasSystemCols)
4241  {
4242  HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4243 
4244  /* ctid */
4245  if (dmstate->ctidAttno)
4246  {
4247  ItemPointer ctid = NULL;
4248 
4249  ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4250  resultTup->t_self = *ctid;
4251  }
4252 
4253  /*
4254  * And remaining columns
4255  *
4256  * Note: since we currently don't allow the target relation to appear
4257  * on the nullable side of an outer join, any system columns wouldn't
4258  * go to NULL.
4259  *
4260  * Note: no need to care about tableoid here because it will be
4261  * initialized in ExecProcessReturning().
4262  */
4266  }
4267 
4268  /*
4269  * And return the result tuple.
4270  */
4271  return resultSlot;
4272 }
4273 
4274 /*
4275  * Prepare for processing of parameters used in remote query.
4276  */
4277 static void
4279  List *fdw_exprs,
4280  int numParams,
4282  List **param_exprs,
4283  const char ***param_values)
4284 {
4285  int i;
4286  ListCell *lc;
4287 
4288  Assert(numParams > 0);
4289 
4290  /* Prepare for output conversion of parameters used in remote query. */
4291  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4292 
4293  i = 0;
4294  foreach(lc, fdw_exprs)
4295  {
4296  Node *param_expr = (Node *) lfirst(lc);
4297  Oid typefnoid;
4298  bool isvarlena;
4299 
4300  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4301  fmgr_info(typefnoid, &(*param_flinfo)[i]);
4302  i++;
4303  }
4304 
4305  /*
4306  * Prepare remote-parameter expressions for evaluation. (Note: in
4307  * practice, we expect that all these expressions will be just Params, so
4308  * we could possibly do something more efficient than using the full
4309  * expression-eval machinery for this. But probably there would be little
4310  * benefit, and it'd require postgres_fdw to know more than is desirable
4311  * about Param evaluation.)
4312  */
4313  *param_exprs = ExecInitExprList(fdw_exprs, node);
4314 
4315  /* Allocate buffer for text form of query parameters. */
4316  *param_values = (const char **) palloc0(numParams * sizeof(char *));
4317 }
4318 
4319 /*
4320  * Construct array of query parameter values in text format.
4321  */
4322 static void
4325  List *param_exprs,
4326  const char **param_values)
4327 {
4328  int nestlevel;
4329  int i;
4330  ListCell *lc;
4331 
4332  nestlevel = set_transmission_modes();
4333 
4334  i = 0;
4335  foreach(lc, param_exprs)
4336  {
4337  ExprState *expr_state = (ExprState *) lfirst(lc);
4338  Datum expr_value;
4339  bool isNull;
4340 
4341  /* Evaluate the parameter expression */
4342  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4343 
4344  /*
4345  * Get string representation of each parameter value by invoking
4346  * type-specific output function, unless the value is null.
4347  */
4348  if (isNull)
4349  param_values[i] = NULL;
4350  else
4351  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4352 
4353  i++;
4354  }
4355 
4356  reset_transmission_modes(nestlevel);
4357 }
4358 
4359 /*
4360  * postgresAnalyzeForeignTable
4361  * Test whether analyzing this foreign table is supported
4362  */
4363 static bool
4365  AcquireSampleRowsFunc *func,
4366  BlockNumber *totalpages)
4367 {
4368  ForeignTable *table;
4369  UserMapping *user;
4370  PGconn *conn;
4371  StringInfoData sql;
4372  PGresult *volatile res = NULL;
4373 
4374  /* Return the row-analysis function pointer */
4376 
4377  /*
4378  * Now we have to get the number of pages. It's annoying that the ANALYZE
4379  * API requires us to return that now, because it forces some duplication
4380  * of effort between this routine and postgresAcquireSampleRowsFunc. But
4381  * it's probably not worth redefining that API at this point.
4382  */
4383 
4384  /*
4385  * Get the connection to use. We do the remote access as the table's
4386  * owner, even if the ANALYZE was started by some other user.
4387  */
4388  table = GetForeignTable(RelationGetRelid(relation));
4389  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4390  conn = GetConnection(user, false);
4391 
4392  /*
4393  * Construct command to get page count for relation.
4394  */
4395  initStringInfo(&sql);
4396  deparseAnalyzeSizeSql(&sql, relation);
4397 
4398  /* In what follows, do not risk leaking any PGresults. */
4399  PG_TRY();
4400  {
4401  res = pgfdw_exec_query(conn, sql.data);
4402  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4403  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4404 
4405  if (PQntuples(res) != 1 || PQnfields(res) != 1)
4406  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4407  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4408 
4409  PQclear(res);
4410  res = NULL;
4411  }
4412  PG_CATCH();
4413  {
4414  if (res)
4415  PQclear(res);
4416  PG_RE_THROW();
4417  }
4418  PG_END_TRY();
4419 
4420  ReleaseConnection(conn);
4421 
4422  return true;
4423 }
4424 
4425 /*
4426  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4427  *
4428  * We fetch the whole table from the remote side and pick out some sample rows.
4429  *
4430  * Selected rows are returned in the caller-allocated array rows[],
4431  * which must have at least targrows entries.
4432  * The actual number of rows selected is returned as the function result.
4433  * We also count the total number of rows in the table and return it into
4434  * *totalrows. Note that *totaldeadrows is always set to 0.
4435  *
4436  * Note that the returned list of rows is not always in order by physical
4437  * position in the table. Therefore, correlation estimates derived later
4438  * may be meaningless, but it's OK because we don't use the estimates
4439  * currently (the planner only pays attention to correlation for indexscans).
4440  */
4441 static int
4443  HeapTuple *rows, int targrows,
4444  double *totalrows,
4445  double *totaldeadrows)
4446 {
4447  PgFdwAnalyzeState astate;
4448  ForeignTable *table;
4449  ForeignServer *server;
4450  UserMapping *user;
4451  PGconn *conn;
4452  unsigned int cursor_number;
4453  StringInfoData sql;
4454  PGresult *volatile res = NULL;
4455 
4456  /* Initialize workspace state */
4457  astate.rel = relation;
4459 
4460  astate.rows = rows;
4461  astate.targrows = targrows;
4462  astate.numrows = 0;
4463  astate.samplerows = 0;
4464  astate.rowstoskip = -1; /* -1 means not set yet */
4465  reservoir_init_selection_state(&astate.rstate, targrows);
4466 
4467  /* Remember ANALYZE context, and create a per-tuple temp context */
4468  astate.anl_cxt = CurrentMemoryContext;
4470  "postgres_fdw temporary data",
4472 
4473  /*
4474  * Get the connection to use. We do the remote access as the table's
4475  * owner, even if the ANALYZE was started by some other user.
4476  */
4477  table = GetForeignTable(RelationGetRelid(relation));
4478  server = GetForeignServer(table->serverid);
4479  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4480  conn = GetConnection(user, false);
4481 
4482  /*
4483  * Construct cursor that retrieves whole rows from remote.
4484  */
4485  cursor_number = GetCursorNumber(conn);
4486  initStringInfo(&sql);
4487  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4488  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4489 
4490  /* In what follows, do not risk leaking any PGresults. */
4491  PG_TRY();
4492  {
4493  char fetch_sql[64];
4494  int fetch_size;
4495  ListCell *lc;
4496 
4497  res = pgfdw_exec_query(conn, sql.data);
4498  if (PQresultStatus(res) != PGRES_COMMAND_OK)
4499  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4500  PQclear(res);
4501  res = NULL;
4502 
4503  /*
4504  * Determine the fetch size. The default is arbitrary, but shouldn't
4505  * be enormous.
4506  */
4507  fetch_size = 100;
4508  foreach(lc, server->options)
4509  {
4510  DefElem *def = (DefElem *) lfirst(lc);
4511 
4512  if (strcmp(def->defname, "fetch_size") == 0)
4513  {
4514  fetch_size = strtol(defGetString(def), NULL, 10);
4515  break;
4516  }
4517  }
4518  foreach(lc, table->options)
4519  {
4520  DefElem *def = (DefElem *) lfirst(lc);
4521 
4522  if (strcmp(def->defname, "fetch_size") == 0)
4523  {
4524  fetch_size = strtol(defGetString(def), NULL, 10);
4525  break;
4526  }
4527  }
4528 
4529  /* Construct command to fetch rows from remote. */
4530  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4531  fetch_size, cursor_number);
4532 
4533  /* Retrieve and process rows a batch at a time. */
4534  for (;;)
4535  {
4536  int numrows;
4537  int i;
4538 
4539  /* Allow users to cancel long query */
4541 
4542  /*
4543  * XXX possible future improvement: if rowstoskip is large, we
4544  * could issue a MOVE rather than physically fetching the rows,
4545  * then just adjust rowstoskip and samplerows appropriately.
4546  */
4547 
4548  /* Fetch some rows */
4549  res = pgfdw_exec_query(conn, fetch_sql);
4550  /* On error, report the original query, not the FETCH. */
4551  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4552  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4553 
4554  /* Process whatever we got. */
4555  numrows = PQntuples(res);
4556  for (i = 0; i < numrows; i++)
4557  analyze_row_processor(res, i, &astate);
4558 
4559  PQclear(res);
4560  res = NULL;
4561 
4562  /* Must be EOF if we didn't get all the rows requested. */
4563  if (numrows < fetch_size)
4564  break;
4565  }
4566 
4567  /* Close the cursor, just to be tidy. */
4568  close_cursor(conn, cursor_number);
4569  }
4570  PG_CATCH();
4571  {
4572  if (res)
4573  PQclear(res);
4574  PG_RE_THROW();
4575  }
4576  PG_END_TRY();
4577 
4578  ReleaseConnection(conn);
4579 
4580  /* We assume that we have no dead tuple. */
4581  *totaldeadrows = 0.0;
4582 
4583  /* We've retrieved all living tuples from foreign server. */
4584  *totalrows = astate.samplerows;
4585 
4586  /*
4587  * Emit some interesting relation info
4588  */
4589  ereport(elevel,
4590  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4591  RelationGetRelationName(relation),
4592  astate.samplerows, astate.numrows)));
4593 
4594  return astate.numrows;
4595 }
4596 
4597 /*
4598  * Collect sample rows from the result of query.
4599  * - Use all tuples in sample until target # of samples are collected.
4600  * - Subsequently, replace already-sampled tuples randomly.
4601  */
4602 static void
4604 {
4605  int targrows = astate->targrows;
4606  int pos; /* array index to store tuple in */
4607  MemoryContext oldcontext;
4608 
4609  /* Always increment sample row counter. */
4610  astate->samplerows += 1;
4611 
4612  /*
4613  * Determine the slot where this sample row should be stored. Set pos to
4614  * negative value to indicate the row should be skipped.
4615  */
4616  if (astate->numrows < targrows)
4617  {
4618  /* First targrows rows are always included into the sample */
4619  pos = astate->numrows++;
4620  }
4621  else
4622  {
4623  /*
4624  * Now we start replacing tuples in the sample until we reach the end
4625  * of the relation. Same algorithm as in acquire_sample_rows in
4626  * analyze.c; see Jeff Vitter's paper.
4627  */
4628  if (astate->rowstoskip < 0)
4629  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4630 
4631  if (astate->rowstoskip <= 0)
4632  {
4633  /* Choose a random reservoir element to replace. */
4634  pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4635  Assert(pos >= 0 && pos < targrows);
4636  heap_freetuple(astate->rows[pos]);
4637  }
4638  else
4639  {
4640  /* Skip this tuple. */
4641  pos = -1;
4642  }
4643 
4644  astate->rowstoskip -= 1;
4645  }
4646 
4647  if (pos >= 0)
4648  {
4649  /*
4650  * Create sample tuple from current result row, and store it in the
4651  * position determined above. The tuple has to be created in anl_cxt.
4652  */
4653  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4654 
4655  astate->rows[pos] = make_tuple_from_result_row(res, row,
4656  astate->rel,
4657  astate->attinmeta,
4658  astate->retrieved_attrs,
4659  NULL,
4660  astate->temp_cxt);
4661 
4662  MemoryContextSwitchTo(oldcontext);
4663  }
4664 }
4665 
4666 /*
4667  * Import a foreign schema
4668  */
4669 static List *
4671 {
4672  List *commands = NIL;
4673  bool import_collate = true;
4674  bool import_default = false;
4675  bool import_not_null = true;
4676  ForeignServer *server;
4677  UserMapping *mapping;
4678  PGconn *conn;
4680  PGresult *volatile res = NULL;
4681  int numrows,
4682  i;
4683  ListCell *lc;
4684 
4685  /* Parse statement options */
4686  foreach(lc, stmt->options)
4687  {
4688  DefElem *def = (DefElem *) lfirst(lc);
4689 
4690  if (strcmp(def->defname, "import_collate") == 0)
4691  import_collate = defGetBoolean(def);
4692  else if (strcmp(def->defname, "import_default") == 0)
4693  import_default = defGetBoolean(def);
4694  else if (strcmp(def->defname, "import_not_null") == 0)
4695  import_not_null = defGetBoolean(def);
4696  else
4697  ereport(ERROR,
4698  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4699  errmsg("invalid option \"%s\"", def->defname)));
4700  }
4701 
4702  /*
4703  * Get connection to the foreign server. Connection manager will
4704  * establish new connection if necessary.
4705  */
4706  server = GetForeignServer(serverOid);
4707  mapping = GetUserMapping(GetUserId(), server->serverid);
4708  conn = GetConnection(mapping, false);
4709 
4710  /* Don't attempt to import collation if remote server hasn't got it */
4711  if (PQserverVersion(conn) < 90100)
4712  import_collate = false;
4713 
4714  /* Create workspace for strings */
4715  initStringInfo(&buf);
4716 
4717  /* In what follows, do not risk leaking any PGresults. */
4718  PG_TRY();
4719  {
4720  /* Check that the schema really exists */
4721  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4722  deparseStringLiteral(&buf, stmt->remote_schema);
4723 
4724  res = pgfdw_exec_query(conn, buf.data);
4725  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4726  pgfdw_report_error(ERROR, res, conn, false, buf.data);
4727 
4728  if (PQntuples(res) != 1)
4729  ereport(ERROR,
4730  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4731  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4732  stmt->remote_schema, server->servername)));
4733 
4734  PQclear(res);
4735  res = NULL;
4736  resetStringInfo(&buf);
4737 
4738  /*
4739  * Fetch all table data from this schema, possibly restricted by
4740  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
4741  * to EXCEPT/LIMIT TO here, because the core code will filter the
4742  * statements we return according to those lists anyway. But it
4743  * should save a few cycles to not process excluded tables in the
4744  * first place.)
4745  *
4746  * Ignore table data for partitions and only include the definitions
4747  * of the root partitioned tables to allow access to the complete
4748  * remote data set locally in the schema imported.
4749  *
4750  * Note: because we run the connection with search_path restricted to
4751  * pg_catalog, the format_type() and pg_get_expr() outputs will always
4752  * include a schema name for types/functions in other schemas, which
4753  * is what we want.
4754  */
4755  if (import_collate)
4757  "SELECT relname, "
4758  " attname, "
4759  " format_type(atttypid, atttypmod), "
4760  " attnotnull, "
4761  " pg_get_expr(adbin, adrelid), "
4762  " collname, "
4763  " collnsp.nspname "
4764  "FROM pg_class c "
4765  " JOIN pg_namespace n ON "
4766  " relnamespace = n.oid "
4767  " LEFT JOIN pg_attribute a ON "
4768  " attrelid = c.oid AND attnum > 0 "
4769  " AND NOT attisdropped "
4770  " LEFT JOIN pg_attrdef ad ON "
4771  " adrelid = c.oid AND adnum = attnum "
4772  " LEFT JOIN pg_collation coll ON "
4773  " coll.oid = attcollation "
4774  " LEFT JOIN pg_namespace collnsp ON "
4775  " collnsp.oid = collnamespace ");
4776  else
4778  "SELECT relname, "
4779  " attname, "
4780  " format_type(atttypid, atttypmod), "
4781  " attnotnull, "
4782  " pg_get_expr(adbin, adrelid), "
4783  " NULL, NULL "
4784  "FROM pg_class c "
4785  " JOIN pg_namespace n ON "
4786  " relnamespace = n.oid "
4787  " LEFT JOIN pg_attribute a ON "
4788  " attrelid = c.oid AND attnum > 0 "
4789  " AND NOT attisdropped "
4790  " LEFT JOIN pg_attrdef ad ON "
4791  " adrelid = c.oid AND adnum = attnum ");
4792 
4794  "WHERE c.relkind IN ("
4795  CppAsString2(RELKIND_RELATION) ","
4796  CppAsString2(RELKIND_VIEW) ","
4797  CppAsString2(RELKIND_FOREIGN_TABLE) ","
4798  CppAsString2(RELKIND_MATVIEW) ","
4799  CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
4800  " AND n.nspname = ");
4801  deparseStringLiteral(&buf, stmt->remote_schema);
4802 
4803  /* Partitions are supported since Postgres 10 */
4804  if (PQserverVersion(conn) >= 100000)
4805  appendStringInfoString(&buf, " AND NOT c.relispartition ");
4806 
4807  /* Apply restrictions for LIMIT TO and EXCEPT */
4808  if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4810  {
4811  bool first_item = true;
4812 
4813  appendStringInfoString(&buf, " AND c.relname ");
4814  if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4815  appendStringInfoString(&buf, "NOT ");
4816  appendStringInfoString(&buf, "IN (");
4817 
4818  /* Append list of table names within IN clause */
4819  foreach(lc, stmt->table_list)
4820  {
4821  RangeVar *rv = (RangeVar *) lfirst(lc);
4822 
4823  if (first_item)
4824  first_item = false;
4825  else
4826  appendStringInfoString(&buf, ", ");
4827  deparseStringLiteral(&buf, rv->relname);
4828  }
4829  appendStringInfoChar(&buf, ')');
4830  }
4831 
4832  /* Append ORDER BY at the end of query to ensure output ordering */
4833  appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4834 
4835  /* Fetch the data */
4836  res = pgfdw_exec_query(conn, buf.data);
4837  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4838  pgfdw_report_error(ERROR, res, conn, false, buf.data);
4839 
4840  /* Process results */
4841  numrows = PQntuples(res);
4842  /* note: incrementation of i happens in inner loop's while() test */
4843  for (i = 0; i < numrows;)
4844  {
4845  char *tablename = PQgetvalue(res, i, 0);
4846  bool first_item = true;
4847 
4848  resetStringInfo(&buf);
4849  appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4850  quote_identifier(tablename));
4851 
4852  /* Scan all rows for this table */
4853  do
4854  {
4855  char *attname;
4856  char *typename;
4857  char *attnotnull;
4858  char *attdefault;
4859  char *collname;
4860  char *collnamespace;
4861 
4862  /* If table has no columns, we'll see nulls here */
4863  if (PQgetisnull(res, i, 1))
4864  continue;
4865 
4866  attname = PQgetvalue(res, i, 1);
4867  typename = PQgetvalue(res, i, 2);
4868  attnotnull = PQgetvalue(res, i, 3);
4869  attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
4870  PQgetvalue(res, i, 4);
4871  collname = PQgetisnull(res, i, 5) ? (char *) NULL :
4872  PQgetvalue(res, i, 5);
4873  collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
4874  PQgetvalue(res, i, 6);
4875 
4876  if (first_item)
4877  first_item = false;
4878  else
4879  appendStringInfoString(&buf, ",\n");
4880 
4881  /* Print column name and type */
4882  appendStringInfo(&buf, " %s %s",
4883  quote_identifier(attname),
4884  typename);
4885 
4886  /*
4887  * Add column_name option so that renaming the foreign table's
4888  * column doesn't break the association to the underlying
4889  * column.
4890  */
4891  appendStringInfoString(&buf, " OPTIONS (column_name ");
4892  deparseStringLiteral(&buf, attname);
4893  appendStringInfoChar(&buf, ')');
4894 
4895  /* Add COLLATE if needed */
4896  if (import_collate && collname != NULL && collnamespace != NULL)
4897  appendStringInfo(&buf, " COLLATE %s.%s",
4898  quote_identifier(collnamespace),
4899  quote_identifier(collname));
4900 
4901  /* Add DEFAULT if needed */
4902  if (import_default && attdefault != NULL)
4903  appendStringInfo(&buf, " DEFAULT %s", attdefault);
4904 
4905  /* Add NOT NULL if needed */
4906  if (import_not_null && attnotnull[0] == 't')
4907  appendStringInfoString(&buf, " NOT NULL");
4908  }
4909  while (++i < numrows &&
4910  strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4911 
4912  /*
4913  * Add server name and table-level options. We specify remote
4914  * schema and table name as options (the latter to ensure that
4915  * renaming the foreign table doesn't break the association).
4916  */
4917  appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4918  quote_identifier(server->servername));
4919 
4920  appendStringInfoString(&buf, "schema_name ");
4921  deparseStringLiteral(&buf, stmt->remote_schema);
4922  appendStringInfoString(&buf, ", table_name ");
4923  deparseStringLiteral(&buf, tablename);
4924 
4925  appendStringInfoString(&buf, ");");
4926 
4927  commands = lappend(commands, pstrdup(buf.data));
4928  }
4929 
4930  /* Clean up */
4931  PQclear(res);
4932  res = NULL;
4933  }
4934  PG_CATCH();
4935  {
4936  if (res)
4937  PQclear(res);
4938  PG_RE_THROW();
4939  }
4940  PG_END_TRY();
4941 
4942  ReleaseConnection(conn);
4943 
4944  return commands;
4945 }
4946 
4947 /*
4948  * Assess whether the join between inner and outer relations can be pushed down
4949  * to the foreign server. As a side effect, save information we obtain in this
4950  * function to PgFdwRelationInfo passed in.
4951  */
4952 static bool
4954  RelOptInfo *outerrel, RelOptInfo *innerrel,
4955  JoinPathExtraData *extra)
4956 {
4957  PgFdwRelationInfo *fpinfo;
4958  PgFdwRelationInfo *fpinfo_o;
4959  PgFdwRelationInfo *fpinfo_i;
4960  ListCell *lc;
4961  List *joinclauses;
4962 
4963  /*
4964  * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4965  * Constructing queries representing SEMI and ANTI joins is hard, hence
4966  * not considered right now.
4967  */
4968  if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4969  jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4970  return false;
4971 
4972  /*
4973  * If either of the joining relations is marked as unsafe to pushdown, the
4974  * join can not be pushed down.
4975  */
4976  fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4977  fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4978  fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4979  if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4980  !fpinfo_i || !fpinfo_i->pushdown_safe)
4981  return false;
4982 
4983  /*
4984  * If joining relations have local conditions, those conditions are
4985  * required to be applied before joining the relations. Hence the join can
4986  * not be pushed down.
4987  */
4988  if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4989  return false;
4990 
4991  /*
4992  * Merge FDW options. We might be tempted to do this after we have deemed
4993  * the foreign join to be OK. But we must do this beforehand so that we
4994  * know which quals can be evaluated on the foreign server, which might
4995  * depend on shippable_extensions.
4996  */
4997  fpinfo->server = fpinfo_o->server;
4998  merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
4999 
5000  /*
5001  * Separate restrict list into join quals and pushed-down (other) quals.
5002  *
5003  * Join quals belonging to an outer join must all be shippable, else we
5004  * cannot execute the join remotely. Add such quals to 'joinclauses'.
5005  *
5006  * Add other quals to fpinfo->remote_conds if they are shippable, else to
5007  * fpinfo->local_conds. In an inner join it's okay to execute conditions
5008  * either locally or remotely; the same is true for pushed-down conditions
5009  * at an outer join.
5010  *
5011  * Note we might return failure after having already scribbled on
5012  * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
5013  * won't consult those lists again if we deem the join unshippable.
5014  */
5015  joinclauses = NIL;
5016  foreach(lc, extra->restrictlist)
5017  {
5018  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5019  bool is_remote_clause = is_foreign_expr(root, joinrel,
5020  rinfo->clause);
5021 
5022  if (IS_OUTER_JOIN(jointype) &&
5023  !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
5024  {
5025  if (!is_remote_clause)
5026  return false;
5027  joinclauses = lappend(joinclauses, rinfo);
5028  }
5029  else
5030  {
5031  if (is_remote_clause)
5032  fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5033  else
5034  fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5035  }
5036  }
5037 
5038  /*
5039  * deparseExplicitTargetList() isn't smart enough to handle anything other
5040  * than a Var. In particular, if there's some PlaceHolderVar that would
5041  * need to be evaluated within this join tree (because there's an upper
5042  * reference to a quantity that may go to NULL as a result of an outer
5043  * join), then we can't try to push the join down because we'll fail when
5044  * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
5045  * needs to be evaluated *at the top* of this join tree is OK, because we
5046  * can do that locally after fetching the results from the remote side.
5047  */
5048  foreach(lc, root->placeholder_list)
5049  {
5050  PlaceHolderInfo *phinfo = lfirst(lc);
5051  Relids relids;
5052 
5053  /* PlaceHolderInfo refers to parent relids, not child relids. */
5054  relids = IS_OTHER_REL(joinrel) ?
5055  joinrel->top_parent_relids : joinrel->relids;
5056 
5057  if (bms_is_subset(phinfo->ph_eval_at, relids) &&
5058  bms_nonempty_difference(relids, phinfo->ph_eval_at))
5059  return false;
5060  }
5061 
5062  /* Save the join clauses, for later use. */
5063  fpinfo->joinclauses = joinclauses;
5064 
5065  fpinfo->outerrel = outerrel;
5066  fpinfo->innerrel = innerrel;
5067  fpinfo->jointype = jointype;
5068 
5069  /*
5070  * By default, both the input relations are not required to be deparsed as
5071  * subqueries, but there might be some relations covered by the input
5072  * relations that are required to be deparsed as subqueries, so save the
5073  * relids of those relations for later use by the deparser.
5074  */
5075  fpinfo->make_outerrel_subquery = false;
5076  fpinfo->make_innerrel_subquery = false;
5077  Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
5078  Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
5080  fpinfo_i->lower_subquery_rels);
5081 
5082  /*
5083  * Pull the other remote conditions from the joining relations into join
5084  * clauses or other remote clauses (remote_conds) of this relation
5085  * wherever possible. This avoids building subqueries at every join step.
5086  *
5087  * For an inner join, clauses from both the relations are added to the
5088  * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5089  * the outer side are added to remote_conds since those can be evaluated
5090  * after the join is evaluated. The clauses from inner side are added to
5091  * the joinclauses, since they need to be evaluated while constructing the
5092  * join.
5093  *
5094  * For a FULL OUTER JOIN, the other clauses from either relation can not
5095  * be added to the joinclauses or remote_conds, since each relation acts
5096  * as an outer relation for the other.
5097  *
5098  * The joining sides can not have local conditions, thus no need to test
5099  * shippability of the clauses being pulled up.
5100  */
5101  switch (jointype)
5102  {
5103  case JOIN_INNER:
5104  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5105  fpinfo_i->remote_conds);
5106  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5107  fpinfo_o->remote_conds);
5108  break;
5109 
5110  case JOIN_LEFT:
5111  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5112  fpinfo_i->remote_conds);
5113  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5114  fpinfo_o->remote_conds);
5115  break;
5116 
5117  case JOIN_RIGHT:
5118  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5119  fpinfo_o->remote_conds);
5120  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5121  fpinfo_i->remote_conds);
5122  break;
5123 
5124  case JOIN_FULL:
5125 
5126  /*
5127  * In this case, if any of the input relations has conditions, we
5128  * need to deparse that relation as a subquery so that the
5129  * conditions can be evaluated before the join. Remember it in
5130  * the fpinfo of this relation so that the deparser can take
5131  * appropriate action. Also, save the relids of base relations
5132  * covered by that relation for later use by the deparser.
5133  */
5134  if (fpinfo_o->remote_conds)
5135  {
5136  fpinfo->make_outerrel_subquery = true;
5137  fpinfo->lower_subquery_rels =
5139  outerrel->relids);
5140  }
5141  if (fpinfo_i->remote_conds)
5142  {
5143  fpinfo->make_innerrel_subquery = true;
5144  fpinfo->lower_subquery_rels =
5146  innerrel->relids);
5147  }
5148  break;
5149 
5150  default:
5151  /* Should not happen, we have just checked this above */
5152  elog(ERROR, "unsupported join type %d", jointype);
5153  }
5154 
5155  /*
5156  * For an inner join, all restrictions can be treated alike. Treating the
5157  * pushed down conditions as join conditions allows a top level full outer
5158  * join to be deparsed without requiring subqueries.
5159  */
5160  if (jointype == JOIN_INNER)
5161  {
5162  Assert(!fpinfo->joinclauses);
5163  fpinfo->joinclauses = fpinfo->remote_conds;
5164  fpinfo->remote_conds = NIL;
5165  }
5166 
5167  /* Mark that this join can be pushed down safely */
5168  fpinfo->pushdown_safe = true;
5169 
5170  /* Get user mapping */
5171  if (fpinfo->use_remote_estimate)
5172  {
5173  if (fpinfo_o->use_remote_estimate)
5174  fpinfo->user = fpinfo_o->user;
5175  else
5176  fpinfo->user = fpinfo_i->user;
5177  }
5178  else
5179  fpinfo->user = NULL;
5180 
5181  /*
5182  * Set # of retrieved rows and cached relation costs to some negative
5183  * value, so that we can detect when they are set to some sensible values,
5184  * during one (usually the first) of the calls to estimate_path_cost_size.
5185  */
5186  fpinfo->retrieved_rows = -1;
5187  fpinfo->rel_startup_cost = -1;
5188  fpinfo->rel_total_cost = -1;
5189 
5190  /*
5191  * Set the string describing this join relation to be used in EXPLAIN
5192  * output of corresponding ForeignScan.
5193  */
5194  fpinfo->relation_name = makeStringInfo();
5195  appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
5196  fpinfo_o->relation_name->data,
5197  get_jointype_name(fpinfo->jointype),
5198  fpinfo_i->relation_name->data);
5199 
5200  /*
5201  * Set the relation index. This is defined as the position of this
5202  * joinrel in the join_rel_list list plus the length of the rtable list.
5203  * Note that since this joinrel is at the end of the join_rel_list list
5204  * when we are called, we can get the position by list_length.
5205  */
5206  Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
5207  fpinfo->relation_index =
5209 
5210  return true;
5211 }
5212 
5213 static void
5215  Path *epq_path)
5216 {
5217  List *useful_pathkeys_list = NIL; /* List of all pathkeys */
5218  ListCell *lc;
5219 
5220  useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
5221 
5222  /* Create one path for each set of pathkeys we found above. */
5223  foreach(lc, useful_pathkeys_list)
5224  {
5225  double rows;
5226  int width;
5227  Cost startup_cost;
5228  Cost total_cost;
5229  List *useful_pathkeys = lfirst(lc);
5230  Path *sorted_epq_path;
5231 
5232  estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL,
5233  &rows, &width, &startup_cost, &total_cost);
5234 
5235  /*
5236  * The EPQ path must be at least as well sorted as the path itself, in
5237  * case it gets used as input to a mergejoin.
5238  */
5239  sorted_epq_path = epq_path;
5240  if (sorted_epq_path != NULL &&
5241  !pathkeys_contained_in(useful_pathkeys,
5242  sorted_epq_path->pathkeys))
5243  sorted_epq_path = (Path *)
5244  create_sort_path(root,
5245  rel,
5246  sorted_epq_path,
5247  useful_pathkeys,
5248  -1.0);
5249 
5250  if (IS_SIMPLE_REL(rel))
5251  add_path(rel, (Path *)
5252  create_foreignscan_path(root, rel,
5253  NULL,
5254  rows,
5255  startup_cost,
5256  total_cost,
5257  useful_pathkeys,
5258  rel->lateral_relids,
5259  sorted_epq_path,
5260  NIL));
5261  else
5262  add_path(rel, (Path *)
5263  create_foreign_join_path(root, rel,
5264  NULL,
5265  rows,
5266  startup_cost,
5267  total_cost,
5268  useful_pathkeys,
5269  rel->lateral_relids,
5270  sorted_epq_path,
5271  NIL));
5272  }
5273 }
5274 
5275 /*
5276  * Parse options from foreign server and apply them to fpinfo.
5277  *
5278  * New options might also require tweaking merge_fdw_options().
5279  */
5280 static void
5282 {
5283  ListCell *lc;
5284 
5285  foreach(lc, fpinfo->server->options)
5286  {
5287  DefElem *def = (DefElem *) lfirst(lc);
5288 
5289  if (strcmp(def->defname, "use_remote_estimate") == 0)
5290  fpinfo->use_remote_estimate = defGetBoolean(def);
5291  else if (strcmp(def->defname, "fdw_startup_cost") == 0)
5292  fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
5293  else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
5294  fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
5295  else if (strcmp(def->defname, "extensions") == 0)
5296  fpinfo->shippable_extensions =
5297  ExtractExtensionList(defGetString(def), false);
5298  else if (strcmp(def->defname, "fetch_size") == 0)
5299  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5300  }
5301 }
5302 
5303 /*
5304  * Parse options from foreign table and apply them to fpinfo.
5305  *
5306  * New options might also require tweaking merge_fdw_options().
5307  */
5308 static void
5310 {
5311  ListCell *lc;
5312 
5313  foreach(lc, fpinfo->table->options)
5314  {
5315  DefElem *def = (DefElem *) lfirst(lc);
5316 
5317  if (strcmp(def->defname, "use_remote_estimate") == 0)
5318  fpinfo->use_remote_estimate = defGetBoolean(def);
5319  else if (strcmp(def->defname, "fetch_size") == 0)
5320  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5321  }
5322 }
5323 
5324 /*
5325  * Merge FDW options from input relations into a new set of options for a join
5326  * or an upper rel.
5327  *
5328  * For a join relation, FDW-specific information about the inner and outer
5329  * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
5330  * fpinfo_o provides the information for the input relation; fpinfo_i is
5331  * expected to NULL.
5332  */
5333 static void
5335  const PgFdwRelationInfo *fpinfo_o,
5336  const PgFdwRelationInfo *fpinfo_i)
5337 {
5338  /* We must always have fpinfo_o. */
5339  Assert(fpinfo_o);
5340 
5341  /* fpinfo_i may be NULL, but if present the servers must both match. */
5342  Assert(!fpinfo_i ||
5343  fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5344 
5345  /*
5346  * Copy the server specific FDW options. (For a join, both relations come
5347  * from the same server, so the server options should have the same value
5348  * for both relations.)
5349  */
5350  fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5351  fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5352  fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5353  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5354  fpinfo->fetch_size = fpinfo_o->fetch_size;
5355 
5356  /* Merge the table level options from either side of the join. */
5357  if (fpinfo_i)
5358  {
5359  /*
5360  * We'll prefer to use remote estimates for this join if any table
5361  * from either side of the join is using remote estimates. This is
5362  * most likely going to be preferred since they're already willing to
5363  * pay the price of a round trip to get the remote EXPLAIN. In any
5364  * case it's not entirely clear how we might otherwise handle this
5365  * best.
5366  */
5367  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5368  fpinfo_i->use_remote_estimate;
5369 
5370  /*
5371  * Set fetch size to maximum of the joining sides, since we are
5372  * expecting the rows returned by the join to be proportional to the
5373  * relation sizes.
5374  */
5375  fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5376  }
5377 }
5378 
5379 /*
5380  * postgresGetForeignJoinPaths
5381  * Add possible ForeignPath to joinrel, if join is safe to push down.
5382  */
5383 static void
5385  RelOptInfo *joinrel,
5386  RelOptInfo *outerrel,
5387  RelOptInfo *innerrel,
5388  JoinType jointype,
5389  JoinPathExtraData *extra)
5390 {
5391  PgFdwRelationInfo *fpinfo;
5392  ForeignPath *joinpath;
5393  double rows;
5394  int width;
5395  Cost startup_cost;
5396  Cost total_cost;
5397  Path *epq_path; /* Path to create plan to be executed when
5398  * EvalPlanQual gets triggered. */
5399 
5400  /*
5401  * Skip if this join combination has been considered already.
5402  */
5403  if (joinrel->fdw_private)
5404  return;
5405 
5406  /*
5407  * This code does not work for joins with lateral references, since those
5408  * must have parameterized paths, which we don't generate yet.
5409  */
5410  if (!bms_is_empty(joinrel->lateral_relids))
5411  return;
5412 
5413  /*
5414  * Create unfinished PgFdwRelationInfo entry which is used to indicate
5415  * that the join relation is already considered, so that we won't waste
5416  * time in judging safety of join pushdown and adding the same paths again
5417  * if found safe. Once we know that this join can be pushed down, we fill
5418  * the entry.
5419  */
5420  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5421  fpinfo->pushdown_safe = false;
5422  joinrel->fdw_private = fpinfo;
5423  /* attrs_used is only for base relations. */
5424  fpinfo->attrs_used = NULL;
5425 
5426  /*
5427  * If there is a possibility that EvalPlanQual will be executed, we need
5428  * to be able to reconstruct the row using scans of the base relations.
5429  * GetExistingLocalJoinPath will find a suitable path for this purpose in
5430  * the path list of the joinrel, if one exists. We must be careful to
5431  * call it before adding any ForeignPath, since the ForeignPath might
5432  * dominate the only suitable local path available. We also do it before
5433  * calling foreign_join_ok(), since that function updates fpinfo and marks
5434  * it as pushable if the join is found to be pushable.
5435  */
5436  if (root->parse->commandType == CMD_DELETE ||
5437  root->parse->commandType == CMD_UPDATE ||
5438  root->rowMarks)
5439  {
5440  epq_path = GetExistingLocalJoinPath(joinrel);
5441  if (!epq_path)
5442  {
5443  elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
5444  return;
5445  }
5446  }
5447  else
5448  epq_path = NULL;
5449 
5450  if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
5451  {
5452  /* Free path required for EPQ if we copied one; we don't need it now */
5453  if (epq_path)
5454  pfree(epq_path);
5455  return;
5456  }
5457 
5458  /*
5459  * Compute the selectivity and cost of the local_conds, so we don't have
5460  * to do it over again for each path. The best we can do for these
5461  * conditions is to estimate selectivity on the basis of local statistics.
5462  * The local conditions are applied after the join has been computed on
5463  * the remote side like quals in WHERE clause, so pass jointype as
5464  * JOIN_INNER.
5465  */
5466  fpinfo->local_conds_sel = clauselist_selectivity(root,
5467  fpinfo->local_conds,
5468  0,
5469  JOIN_INNER,
5470  NULL);
5471  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5472 
5473  /*
5474  * If we are going to estimate costs locally, estimate the join clause
5475  * selectivity here while we have special join info.
5476  */
5477  if (!fpinfo->use_remote_estimate)
5478  fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
5479  0, fpinfo->jointype,
5480  extra->sjinfo);
5481 
5482  /* Estimate costs for bare join relation */
5483  estimate_path_cost_size(root, joinrel, NIL, NIL, NULL,
5484  &rows, &width, &startup_cost, &total_cost);
5485  /* Now update this information in the joinrel */
5486  joinrel->rows = rows;
5487  joinrel->reltarget->width = width;
5488  fpinfo->rows = rows;
5489  fpinfo->width = width;
5490  fpinfo->startup_cost = startup_cost;
5491  fpinfo->total_cost = total_cost;
5492 
5493  /*
5494  * Create a new join path and add it to the joinrel which represents a
5495  * join between foreign tables.
5496  */
5497  joinpath = create_foreign_join_path(root,
5498  joinrel,
5499  NULL, /* default pathtarget */
5500  rows,
5501  startup_cost,
5502  total_cost,
5503  NIL, /* no pathkeys */
5504  joinrel->lateral_relids,
5505  epq_path,
5506  NIL); /* no fdw_private */
5507 
5508  /* Add generated path into joinrel by add_path(). */
5509  add_path(joinrel, (Path *) joinpath);
5510 
5511  /* Consider pathkeys for the join relation */
5512  add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5513 
5514  /* XXX Consider parameterized paths for the join relation */
5515 }
5516 
5517 /*
5518  * Assess whether the aggregation, grouping and having operations can be pushed
5519  * down to the foreign server. As a side effect, save information we obtain in
5520  * this function to PgFdwRelationInfo of the input relation.
5521  */
5522 static bool
5524  Node *havingQual)
5525 {
5526  Query *query = root->parse;
5527  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5528  PathTarget *grouping_target = grouped_rel->reltarget;
5529  PgFdwRelationInfo *ofpinfo;
5530  ListCell *lc;
5531  int i;
5532  List *tlist = NIL;
5533 
5534  /* We currently don't support pushing Grouping Sets. */
5535  if (query->groupingSets)
5536  return false;
5537 
5538  /* Get the fpinfo of the underlying scan relation. */
5539  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5540 
5541  /*
5542  * If underlying scan relation has any local conditions, those conditions
5543  * are required to be applied before performing aggregation. Hence the
5544  * aggregate cannot be pushed down.
5545  */
5546  if (ofpinfo->local_conds)
5547  return false;
5548 
5549  /*
5550  * Examine grouping expressions, as well as other expressions we'd need to
5551  * compute, and check whether they are safe to push down to the foreign
5552  * server. All GROUP BY expressions will be part of the grouping target
5553  * and thus there is no need to search for them separately. Add grouping
5554  * expressions into target list which will be passed to foreign server.
5555  *
5556  * A tricky fine point is that we must not put any expression into the
5557  * target list that is just a foreign param (that is, something that
5558  * deparse.c would conclude has to be sent to the foreign server). If we
5559  * do, the expression will also appear in the fdw_exprs list of the plan
5560  * node, and setrefs.c will get confused and decide that the fdw_exprs
5561  * entry is actually a reference to the fdw_scan_tlist entry, resulting in
5562  * a broken plan. Somewhat oddly, it's OK if the expression contains such
5563  * a node, as long as it's not at top level; then no match is possible.
5564  */
5565  i = 0;
5566  foreach(lc, grouping_target->exprs)
5567  {
5568  Expr *expr = (Expr *) lfirst(lc);
5569  Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
5570  ListCell *l;
5571 
5572  /* Check whether this expression is part of GROUP BY clause */
5573  if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5574  {
5575  TargetEntry *tle;
5576 
5577  /*
5578  * If any GROUP BY expression is not shippable, then we cannot
5579  * push down aggregation to the foreign server.
5580  */
5581  if (!is_foreign_expr(root, grouped_rel, expr))
5582  return false;
5583 
5584  /*
5585  * If it would be a foreign param, we can't put it into the tlist,
5586  * so we have to fail.
5587  */
5588  if (is_foreign_param(root, grouped_rel, expr))
5589  return false;
5590 
5591  /*
5592  * Pushable, so add to tlist. We need to create a TLE for this
5593  * expression and apply the sortgroupref to it. We cannot use
5594  * add_to_flat_tlist() here because that avoids making duplicate
5595  * entries in the tlist. If there are duplicate entries with
5596  * distinct sortgrouprefs, we have to duplicate that situation in
5597  * the output tlist.
5598  */
5599  tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
5600  tle->ressortgroupref = sgref;
5601  tlist = lappend(tlist, tle);
5602  }
5603  else
5604  {
5605  /*
5606  * Non-grouping expression we need to compute. Can we ship it
5607  * as-is to the foreign server?
5608  */
5609  if (is_foreign_expr(root, grouped_rel, expr) &&
5610  !is_foreign_param(root, grouped_rel, expr))
5611  {
5612  /* Yes, so add to tlist as-is; OK to suppress duplicates */
5613  tlist = add_to_flat_tlist(tlist, list_make1(expr));
5614  }
5615  else
5616  {
5617  /* Not pushable as a whole; extract its Vars and aggregates */
5618  List *aggvars;
5619 
5620  aggvars = pull_var_clause((Node *) expr,
5622 
5623  /*
5624  * If any aggregate expression is not shippable, then we
5625  * cannot push down aggregation to the foreign server. (We
5626  * don't have to check is_foreign_param, since that certainly
5627  * won't return true for any such expression.)
5628  */
5629  if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
5630  return false;
5631 
5632  /*
5633  * Add aggregates, if any, into the targetlist. Plain Vars
5634  * outside an aggregate can be ignored, because they should be
5635  * either same as some GROUP BY column or part of some GROUP
5636  * BY expression. In either case, they are already part of
5637  * the targetlist and thus no need to add them again. In fact
5638  * including plain Vars in the tlist when they do not match a
5639  * GROUP BY column would cause the foreign server to complain
5640  * that the shipped query is invalid.
5641  */
5642  foreach(l, aggvars)
5643  {