PostgreSQL Source Code  git master
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include <limits.h>
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "commands/defrem.h"
22 #include "commands/explain.h"
23 #include "commands/vacuum.h"
24 #include "foreign/fdwapi.h"
25 #include "funcapi.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "nodes/nodeFuncs.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/optimizer.h"
32 #include "optimizer/pathnode.h"
33 #include "optimizer/paths.h"
34 #include "optimizer/planmain.h"
35 #include "optimizer/restrictinfo.h"
36 #include "optimizer/tlist.h"
37 #include "parser/parsetree.h"
38 #include "postgres_fdw.h"
39 #include "utils/builtins.h"
40 #include "utils/float.h"
41 #include "utils/guc.h"
42 #include "utils/lsyscache.h"
43 #include "utils/memutils.h"
44 #include "utils/rel.h"
45 #include "utils/sampling.h"
46 #include "utils/selfuncs.h"
47 
49 
50 /* Default CPU cost to start up a foreign query. */
51 #define DEFAULT_FDW_STARTUP_COST 100.0
52 
53 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
54 #define DEFAULT_FDW_TUPLE_COST 0.01
55 
56 /* If no remote estimates, assume a sort costs 20% extra */
57 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
58 
59 /*
60  * Indexes of FDW-private information stored in fdw_private lists.
61  *
62  * These items are indexed with the enum FdwScanPrivateIndex, so an item
63  * can be fetched with list_nth(). For example, to get the SELECT statement:
64  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
65  */
67 {
68  /* SQL statement to execute remotely (as a String node) */
70  /* Integer list of attribute numbers retrieved by the SELECT */
72  /* Integer representing the desired fetch_size */
74 
75  /*
76  * String describing join i.e. names of relations being joined and types
77  * of join, added when the scan is join
78  */
80 };
81 
82 /*
83  * Similarly, this enum describes what's kept in the fdw_private list for
84  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
85  *
86  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
87  * 2) Integer list of target attribute numbers for INSERT/UPDATE
88  * (NIL for a DELETE)
89  * 3) Boolean flag showing if the remote query has a RETURNING clause
90  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
91  */
93 {
94  /* SQL statement to execute remotely (as a String node) */
96  /* Integer list of target attribute numbers for INSERT/UPDATE */
98  /* has-returning flag (as an integer Value node) */
100  /* Integer list of attribute numbers retrieved by RETURNING */
102 };
103 
104 /*
105  * Similarly, this enum describes what's kept in the fdw_private list for
106  * a ForeignScan node that modifies a foreign table directly. We store:
107  *
108  * 1) UPDATE/DELETE statement text to be sent to the remote server
109  * 2) Boolean flag showing if the remote query has a RETURNING clause
110  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
111  * 4) Boolean flag showing if we set the command es_processed
112  */
114 {
115  /* SQL statement to execute remotely (as a String node) */
117  /* has-returning flag (as an integer Value node) */
119  /* Integer list of attribute numbers retrieved by RETURNING */
121  /* set-processed flag (as an integer Value node) */
123 };
124 
125 /*
126  * Execution state of a foreign scan using postgres_fdw.
127  */
128 typedef struct PgFdwScanState
129 {
130  Relation rel; /* relcache entry for the foreign table. NULL
131  * for a foreign join scan. */
132  TupleDesc tupdesc; /* tuple descriptor of scan */
133  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
134 
135  /* extracted fdw_private data */
136  char *query; /* text of SELECT command */
137  List *retrieved_attrs; /* list of retrieved attribute numbers */
138 
139  /* for remote query execution */
140  PGconn *conn; /* connection for the scan */
141  unsigned int cursor_number; /* quasi-unique ID for my cursor */
142  bool cursor_exists; /* have we created the cursor? */
143  int numParams; /* number of parameters passed to query */
144  FmgrInfo *param_flinfo; /* output conversion functions for them */
145  List *param_exprs; /* executable expressions for param values */
146  const char **param_values; /* textual values of query parameters */
147 
148  /* for storing result tuples */
149  HeapTuple *tuples; /* array of currently-retrieved tuples */
150  int num_tuples; /* # of tuples in array */
151  int next_tuple; /* index of next one to return */
152 
153  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
154  int fetch_ct_2; /* Min(# of fetches done, 2) */
155  bool eof_reached; /* true if last fetch reached EOF */
156 
157  /* working memory contexts */
158  MemoryContext batch_cxt; /* context holding current batch of tuples */
159  MemoryContext temp_cxt; /* context for per-tuple temporary data */
160 
161  int fetch_size; /* number of tuples per fetch */
163 
164 /*
165  * Execution state of a foreign insert/update/delete operation.
166  */
167 typedef struct PgFdwModifyState
168 {
169  Relation rel; /* relcache entry for the foreign table */
170  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
171 
172  /* for remote query execution */
173  PGconn *conn; /* connection for the scan */
174  char *p_name; /* name of prepared statement, if created */
175 
176  /* extracted fdw_private data */
177  char *query; /* text of INSERT/UPDATE/DELETE command */
178  List *target_attrs; /* list of target attribute numbers */
179  bool has_returning; /* is there a RETURNING clause? */
180  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
181 
182  /* info about parameters for prepared statement */
183  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
184  int p_nums; /* number of parameters to transmit */
185  FmgrInfo *p_flinfo; /* output conversion functions for them */
186 
187  /* working memory context */
188  MemoryContext temp_cxt; /* context for per-tuple temporary data */
189 
190  /* for update row movement if subplan result rel */
191  struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
192  * created */
194 
195 /*
196  * Execution state of a foreign scan that modifies a foreign table directly.
197  */
199 {
200  Relation rel; /* relcache entry for the foreign table */
201  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
202 
203  /* extracted fdw_private data */
204  char *query; /* text of UPDATE/DELETE command */
205  bool has_returning; /* is there a RETURNING clause? */
206  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
207  bool set_processed; /* do we set the command es_processed? */
208 
209  /* for remote query execution */
210  PGconn *conn; /* connection for the update */
211  int numParams; /* number of parameters passed to query */
212  FmgrInfo *param_flinfo; /* output conversion functions for them */
213  List *param_exprs; /* executable expressions for param values */
214  const char **param_values; /* textual values of query parameters */
215 
216  /* for storing result tuples */
217  PGresult *result; /* result for query */
218  int num_tuples; /* # of result tuples */
219  int next_tuple; /* index of next one to return */
220  Relation resultRel; /* relcache entry for the target relation */
221  AttrNumber *attnoMap; /* array of attnums of input user columns */
222  AttrNumber ctidAttno; /* attnum of input ctid column */
223  AttrNumber oidAttno; /* attnum of input oid column */
224  bool hasSystemCols; /* are there system columns of resultRel? */
225 
226  /* working memory context */
227  MemoryContext temp_cxt; /* context for per-tuple temporary data */
229 
230 /*
231  * Workspace for analyzing a foreign table.
232  */
233 typedef struct PgFdwAnalyzeState
234 {
235  Relation rel; /* relcache entry for the foreign table */
236  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
237  List *retrieved_attrs; /* attr numbers retrieved by query */
238 
239  /* collected sample rows */
240  HeapTuple *rows; /* array of size targrows */
241  int targrows; /* target # of sample rows */
242  int numrows; /* # of sample rows collected */
243 
244  /* for random sampling */
245  double samplerows; /* # of rows fetched */
246  double rowstoskip; /* # of rows to skip before next sample */
247  ReservoirStateData rstate; /* state for reservoir sampling */
248 
249  /* working memory contexts */
250  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
251  MemoryContext temp_cxt; /* context for per-tuple temporary data */
253 
254 /*
255  * This enum describes what's kept in the fdw_private list for a ForeignPath.
256  * We store:
257  *
258  * 1) Boolean flag showing if the remote query has the final sort
259  * 2) Boolean flag showing if the remote query has the LIMIT clause
260  */
262 {
263  /* has-final-sort flag (as an integer Value node) */
265  /* has-limit flag (as an integer Value node) */
267 };
268 
269 /* Struct for extra information passed to estimate_path_cost_size() */
270 typedef struct
271 {
274  bool has_limit;
275  double limit_tuples;
276  int64 count_est;
277  int64 offset_est;
279 
280 /*
281  * Identify the attribute where data conversion fails.
282  */
283 typedef struct ConversionLocation
284 {
285  Relation rel; /* foreign table's relcache entry. */
286  AttrNumber cur_attno; /* attribute number being processed, or 0 */
287 
288  /*
289  * In case of foreign join push down, fdw_scan_tlist is used to identify
290  * the Var node corresponding to the error location and
291  * fsstate->ss.ps.state gives access to the RTEs of corresponding relation
292  * to get the relation name and attribute name.
293  */
296 
297 /* Callback argument for ec_member_matches_foreign */
298 typedef struct
299 {
300  Expr *current; /* current expr, or NULL if not yet found */
301  List *already_used; /* expressions already dealt with */
303 
304 /*
305  * SQL functions
306  */
308 
309 /*
310  * FDW callback routines
311  */
312 static void postgresGetForeignRelSize(PlannerInfo *root,
313  RelOptInfo *baserel,
314  Oid foreigntableid);
315 static void postgresGetForeignPaths(PlannerInfo *root,
316  RelOptInfo *baserel,
317  Oid foreigntableid);
319  RelOptInfo *foreignrel,
320  Oid foreigntableid,
321  ForeignPath *best_path,
322  List *tlist,
323  List *scan_clauses,
324  Plan *outer_plan);
325 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
328 static void postgresEndForeignScan(ForeignScanState *node);
329 static void postgresAddForeignUpdateTargets(Query *parsetree,
330  RangeTblEntry *target_rte,
331  Relation target_relation);
333  ModifyTable *plan,
334  Index resultRelation,
335  int subplan_index);
336 static void postgresBeginForeignModify(ModifyTableState *mtstate,
337  ResultRelInfo *resultRelInfo,
338  List *fdw_private,
339  int subplan_index,
340  int eflags);
342  ResultRelInfo *resultRelInfo,
343  TupleTableSlot *slot,
344  TupleTableSlot *planSlot);
346  ResultRelInfo *resultRelInfo,
347  TupleTableSlot *slot,
348  TupleTableSlot *planSlot);
350  ResultRelInfo *resultRelInfo,
351  TupleTableSlot *slot,
352  TupleTableSlot *planSlot);
353 static void postgresEndForeignModify(EState *estate,
354  ResultRelInfo *resultRelInfo);
355 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
356  ResultRelInfo *resultRelInfo);
357 static void postgresEndForeignInsert(EState *estate,
358  ResultRelInfo *resultRelInfo);
360 static bool postgresPlanDirectModify(PlannerInfo *root,
361  ModifyTable *plan,
362  Index resultRelation,
363  int subplan_index);
364 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
366 static void postgresEndDirectModify(ForeignScanState *node);
368  ExplainState *es);
370  ResultRelInfo *rinfo,
371  List *fdw_private,
372  int subplan_index,
373  ExplainState *es);
375  ExplainState *es);
376 static bool postgresAnalyzeForeignTable(Relation relation,
377  AcquireSampleRowsFunc *func,
378  BlockNumber *totalpages);
380  Oid serverOid);
381 static void postgresGetForeignJoinPaths(PlannerInfo *root,
382  RelOptInfo *joinrel,
383  RelOptInfo *outerrel,
384  RelOptInfo *innerrel,
385  JoinType jointype,
386  JoinPathExtraData *extra);
388  TupleTableSlot *slot);
389 static void postgresGetForeignUpperPaths(PlannerInfo *root,
390  UpperRelationKind stage,
391  RelOptInfo *input_rel,
392  RelOptInfo *output_rel,
393  void *extra);
394 
395 /*
396  * Helper functions
397  */
398 static void estimate_path_cost_size(PlannerInfo *root,
399  RelOptInfo *foreignrel,
400  List *param_join_conds,
401  List *pathkeys,
402  PgFdwPathExtraData *fpextra,
403  double *p_rows, int *p_width,
404  Cost *p_startup_cost, Cost *p_total_cost);
405 static void get_remote_estimate(const char *sql,
406  PGconn *conn,
407  double *rows,
408  int *width,
409  Cost *startup_cost,
410  Cost *total_cost);
412  List *pathkeys,
413  double retrieved_rows,
414  double width,
415  double limit_tuples,
416  Cost *p_startup_cost,
417  Cost *p_run_cost);
420  void *arg);
421 static void create_cursor(ForeignScanState *node);
422 static void fetch_more_data(ForeignScanState *node);
423 static void close_cursor(PGconn *conn, unsigned int cursor_number);
425  RangeTblEntry *rte,
426  ResultRelInfo *resultRelInfo,
427  CmdType operation,
428  Plan *subplan,
429  char *query,
430  List *target_attrs,
431  bool has_returning,
434  ResultRelInfo *resultRelInfo,
435  CmdType operation,
436  TupleTableSlot *slot,
437  TupleTableSlot *planSlot);
438 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
439 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
440  ItemPointer tupleid,
441  TupleTableSlot *slot);
442 static void store_returning_result(PgFdwModifyState *fmstate,
443  TupleTableSlot *slot, PGresult *res);
444 static void finish_foreign_modify(PgFdwModifyState *fmstate);
445 static List *build_remote_returning(Index rtindex, Relation rel,
446  List *returningList);
447 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
448 static void execute_dml_stmt(ForeignScanState *node);
450 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
451  List *fdw_scan_tlist,
452  Index rtindex);
454  TupleTableSlot *slot,
455  EState *estate);
456 static void prepare_query_params(PlanState *node,
457  List *fdw_exprs,
458  int numParams,
460  List **param_exprs,
461  const char ***param_values);
462 static void process_query_params(ExprContext *econtext,
464  List *param_exprs,
465  const char **param_values);
466 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
467  HeapTuple *rows, int targrows,
468  double *totalrows,
469  double *totaldeadrows);
470 static void analyze_row_processor(PGresult *res, int row,
471  PgFdwAnalyzeState *astate);
473  int row,
474  Relation rel,
477  ForeignScanState *fsstate,
478  MemoryContext temp_context);
479 static void conversion_error_callback(void *arg);
480 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
481  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
482  JoinPathExtraData *extra);
483 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
484  Node *havingQual);
486  RelOptInfo *rel);
489  Path *epq_path);
490 static void add_foreign_grouping_paths(PlannerInfo *root,
491  RelOptInfo *input_rel,
492  RelOptInfo *grouped_rel,
493  GroupPathExtraData *extra);
494 static void add_foreign_ordered_paths(PlannerInfo *root,
495  RelOptInfo *input_rel,
496  RelOptInfo *ordered_rel);
497 static void add_foreign_final_paths(PlannerInfo *root,
498  RelOptInfo *input_rel,
499  RelOptInfo *final_rel,
500  FinalPathExtraData *extra);
501 static void apply_server_options(PgFdwRelationInfo *fpinfo);
502 static void apply_table_options(PgFdwRelationInfo *fpinfo);
503 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
504  const PgFdwRelationInfo *fpinfo_o,
505  const PgFdwRelationInfo *fpinfo_i);
506 
507 
508 /*
509  * Foreign-data wrapper handler function: return a struct with pointers
510  * to my callback routines.
511  */
512 Datum
514 {
515  FdwRoutine *routine = makeNode(FdwRoutine);
516 
517  /* Functions for scanning foreign tables */
525 
526  /* Functions for updating foreign tables */
541 
542  /* Function for EvalPlanQual rechecks */
544  /* Support functions for EXPLAIN */
548 
549  /* Support functions for ANALYZE */
551 
552  /* Support functions for IMPORT FOREIGN SCHEMA */
554 
555  /* Support functions for join push-down */
557 
558  /* Support functions for upper relation push-down */
560 
561  PG_RETURN_POINTER(routine);
562 }
563 
564 /*
565  * postgresGetForeignRelSize
566  * Estimate # of rows and width of the result of the scan
567  *
568  * We should consider the effect of all baserestrictinfo clauses here, but
569  * not any join clauses.
570  */
571 static void
573  RelOptInfo *baserel,
574  Oid foreigntableid)
575 {
576  PgFdwRelationInfo *fpinfo;
577  ListCell *lc;
578  RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
579 
580  /*
581  * We use PgFdwRelationInfo to pass various information to subsequent
582  * functions.
583  */
584  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
585  baserel->fdw_private = (void *) fpinfo;
586 
587  /* Base foreign tables need to be pushed down always. */
588  fpinfo->pushdown_safe = true;
589 
590  /* Look up foreign-table catalog info. */
591  fpinfo->table = GetForeignTable(foreigntableid);
592  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
593 
594  /*
595  * Extract user-settable option values. Note that per-table setting of
596  * use_remote_estimate overrides per-server setting.
597  */
598  fpinfo->use_remote_estimate = false;
601  fpinfo->shippable_extensions = NIL;
602  fpinfo->fetch_size = 100;
603 
604  apply_server_options(fpinfo);
605  apply_table_options(fpinfo);
606 
607  /*
608  * If the table or the server is configured to use remote estimates,
609  * identify which user to do remote access as during planning. This
610  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
611  * permissions, the query would have failed at runtime anyway.
612  */
613  if (fpinfo->use_remote_estimate)
614  {
615  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
616 
617  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
618  }
619  else
620  fpinfo->user = NULL;
621 
622  /*
623  * Identify which baserestrictinfo clauses can be sent to the remote
624  * server and which can't.
625  */
626  classifyConditions(root, baserel, baserel->baserestrictinfo,
627  &fpinfo->remote_conds, &fpinfo->local_conds);
628 
629  /*
630  * Identify which attributes will need to be retrieved from the remote
631  * server. These include all attrs needed for joins or final output, plus
632  * all attrs used in the local_conds. (Note: if we end up using a
633  * parameterized scan, it's possible that some of the join clauses will be
634  * sent to the remote and thus we wouldn't really need to retrieve the
635  * columns used in them. Doesn't seem worth detecting that case though.)
636  */
637  fpinfo->attrs_used = NULL;
638  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
639  &fpinfo->attrs_used);
640  foreach(lc, fpinfo->local_conds)
641  {
642  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
643 
644  pull_varattnos((Node *) rinfo->clause, baserel->relid,
645  &fpinfo->attrs_used);
646  }
647 
648  /*
649  * Compute the selectivity and cost of the local_conds, so we don't have
650  * to do it over again for each path. The best we can do for these
651  * conditions is to estimate selectivity on the basis of local statistics.
652  */
654  fpinfo->local_conds,
655  baserel->relid,
656  JOIN_INNER,
657  NULL);
658 
659  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
660 
661  /*
662  * Set # of retrieved rows and cached relation costs to some negative
663  * value, so that we can detect when they are set to some sensible values,
664  * during one (usually the first) of the calls to estimate_path_cost_size.
665  */
666  fpinfo->retrieved_rows = -1;
667  fpinfo->rel_startup_cost = -1;
668  fpinfo->rel_total_cost = -1;
669 
670  /*
671  * If the table or the server is configured to use remote estimates,
672  * connect to the foreign server and execute EXPLAIN to estimate the
673  * number of rows selected by the restriction clauses, as well as the
674  * average row width. Otherwise, estimate using whatever statistics we
675  * have locally, in a way similar to ordinary tables.
676  */
677  if (fpinfo->use_remote_estimate)
678  {
679  /*
680  * Get cost/size estimates with help of remote server. Save the
681  * values in fpinfo so we don't need to do it again to generate the
682  * basic foreign path.
683  */
684  estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
685  &fpinfo->rows, &fpinfo->width,
686  &fpinfo->startup_cost, &fpinfo->total_cost);
687 
688  /* Report estimated baserel size to planner. */
689  baserel->rows = fpinfo->rows;
690  baserel->reltarget->width = fpinfo->width;
691  }
692  else
693  {
694  /*
695  * If the foreign table has never been ANALYZEd, it will have relpages
696  * and reltuples equal to zero, which most likely has nothing to do
697  * with reality. We can't do a whole lot about that if we're not
698  * allowed to consult the remote server, but we can use a hack similar
699  * to plancat.c's treatment of empty relations: use a minimum size
700  * estimate of 10 pages, and divide by the column-datatype-based width
701  * estimate to get the corresponding number of tuples.
702  */
703  if (baserel->pages == 0 && baserel->tuples == 0)
704  {
705  baserel->pages = 10;
706  baserel->tuples =
707  (10 * BLCKSZ) / (baserel->reltarget->width +
709  }
710 
711  /* Estimate baserel size as best we can with local statistics. */
712  set_baserel_size_estimates(root, baserel);
713 
714  /* Fill in basically-bogus cost estimates for use later. */
715  estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
716  &fpinfo->rows, &fpinfo->width,
717  &fpinfo->startup_cost, &fpinfo->total_cost);
718  }
719 
720  /*
721  * fpinfo->relation_name gets the numeric rangetable index of the foreign
722  * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
723  * human-readable string at that time.)
724  */
725  fpinfo->relation_name = psprintf("%u", baserel->relid);
726 
727  /* No outer and inner relations. */
728  fpinfo->make_outerrel_subquery = false;
729  fpinfo->make_innerrel_subquery = false;
730  fpinfo->lower_subquery_rels = NULL;
731  /* Set the relation index. */
732  fpinfo->relation_index = baserel->relid;
733 }
734 
735 /*
736  * get_useful_ecs_for_relation
737  * Determine which EquivalenceClasses might be involved in useful
738  * orderings of this relation.
739  *
740  * This function is in some respects a mirror image of the core function
741  * pathkeys_useful_for_merging: for a regular table, we know what indexes
742  * we have and want to test whether any of them are useful. For a foreign
743  * table, we don't know what indexes are present on the remote side but
744  * want to speculate about which ones we'd like to use if they existed.
745  *
746  * This function returns a list of potentially-useful equivalence classes,
747  * but it does not guarantee that an EquivalenceMember exists which contains
748  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
749  * ft1.x + t1.x = 0, this function will say that the equivalence class
750  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
751  * t1 is local (or on a different server), it will turn out that no useful
752  * ORDER BY clause can be generated. It's not our job to figure that out
753  * here; we're only interested in identifying relevant ECs.
754  */
755 static List *
757 {
758  List *useful_eclass_list = NIL;
759  ListCell *lc;
760  Relids relids;
761 
762  /*
763  * First, consider whether any active EC is potentially useful for a merge
764  * join against this relation.
765  */
766  if (rel->has_eclass_joins)
767  {
768  foreach(lc, root->eq_classes)
769  {
770  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
771 
772  if (eclass_useful_for_merging(root, cur_ec, rel))
773  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
774  }
775  }
776 
777  /*
778  * Next, consider whether there are any non-EC derivable join clauses that
779  * are merge-joinable. If the joininfo list is empty, we can exit
780  * quickly.
781  */
782  if (rel->joininfo == NIL)
783  return useful_eclass_list;
784 
785  /* If this is a child rel, we must use the topmost parent rel to search. */
786  if (IS_OTHER_REL(rel))
787  {
789  relids = rel->top_parent_relids;
790  }
791  else
792  relids = rel->relids;
793 
794  /* Check each join clause in turn. */
795  foreach(lc, rel->joininfo)
796  {
797  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
798 
799  /* Consider only mergejoinable clauses */
800  if (restrictinfo->mergeopfamilies == NIL)
801  continue;
802 
803  /* Make sure we've got canonical ECs. */
804  update_mergeclause_eclasses(root, restrictinfo);
805 
806  /*
807  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
808  * that left_ec and right_ec will be initialized, per comments in
809  * distribute_qual_to_rels.
810  *
811  * We want to identify which side of this merge-joinable clause
812  * contains columns from the relation produced by this RelOptInfo. We
813  * test for overlap, not containment, because there could be extra
814  * relations on either side. For example, suppose we've got something
815  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
816  * A.y = D.y. The input rel might be the joinrel between A and B, and
817  * we'll consider the join clause A.y = D.y. relids contains a
818  * relation not involved in the join class (B) and the equivalence
819  * class for the left-hand side of the clause contains a relation not
820  * involved in the input rel (C). Despite the fact that we have only
821  * overlap and not containment in either direction, A.y is potentially
822  * useful as a sort column.
823  *
824  * Note that it's even possible that relids overlaps neither side of
825  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
826  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
827  * but overlaps neither side of B. In that case, we just skip this
828  * join clause, since it doesn't suggest a useful sort order for this
829  * relation.
830  */
831  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
832  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
833  restrictinfo->right_ec);
834  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
835  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
836  restrictinfo->left_ec);
837  }
838 
839  return useful_eclass_list;
840 }
841 
842 /*
843  * get_useful_pathkeys_for_relation
844  * Determine which orderings of a relation might be useful.
845  *
846  * Getting data in sorted order can be useful either because the requested
847  * order matches the final output ordering for the overall query we're
848  * planning, or because it enables an efficient merge join. Here, we try
849  * to figure out which pathkeys to consider.
850  */
851 static List *
853 {
854  List *useful_pathkeys_list = NIL;
855  List *useful_eclass_list;
857  EquivalenceClass *query_ec = NULL;
858  ListCell *lc;
859 
860  /*
861  * Pushing the query_pathkeys to the remote server is always worth
862  * considering, because it might let us avoid a local sort.
863  */
864  fpinfo->qp_is_pushdown_safe = false;
865  if (root->query_pathkeys)
866  {
867  bool query_pathkeys_ok = true;
868 
869  foreach(lc, root->query_pathkeys)
870  {
871  PathKey *pathkey = (PathKey *) lfirst(lc);
872  EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
873  Expr *em_expr;
874 
875  /*
876  * The planner and executor don't have any clever strategy for
877  * taking data sorted by a prefix of the query's pathkeys and
878  * getting it to be sorted by all of those pathkeys. We'll just
879  * end up resorting the entire data set. So, unless we can push
880  * down all of the query pathkeys, forget it.
881  *
882  * is_foreign_expr would detect volatile expressions as well, but
883  * checking ec_has_volatile here saves some cycles.
884  */
885  if (pathkey_ec->ec_has_volatile ||
886  !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
887  !is_foreign_expr(root, rel, em_expr))
888  {
889  query_pathkeys_ok = false;
890  break;
891  }
892  }
893 
894  if (query_pathkeys_ok)
895  {
896  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
897  fpinfo->qp_is_pushdown_safe = true;
898  }
899  }
900 
901  /*
902  * Even if we're not using remote estimates, having the remote side do the
903  * sort generally won't be any worse than doing it locally, and it might
904  * be much better if the remote side can generate data in the right order
905  * without needing a sort at all. However, what we're going to do next is
906  * try to generate pathkeys that seem promising for possible merge joins,
907  * and that's more speculative. A wrong choice might hurt quite a bit, so
908  * bail out if we can't use remote estimates.
909  */
910  if (!fpinfo->use_remote_estimate)
911  return useful_pathkeys_list;
912 
913  /* Get the list of interesting EquivalenceClasses. */
914  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
915 
916  /* Extract unique EC for query, if any, so we don't consider it again. */
917  if (list_length(root->query_pathkeys) == 1)
918  {
919  PathKey *query_pathkey = linitial(root->query_pathkeys);
920 
921  query_ec = query_pathkey->pk_eclass;
922  }
923 
924  /*
925  * As a heuristic, the only pathkeys we consider here are those of length
926  * one. It's surely possible to consider more, but since each one we
927  * choose to consider will generate a round-trip to the remote side, we
928  * need to be a bit cautious here. It would sure be nice to have a local
929  * cache of information about remote index definitions...
930  */
931  foreach(lc, useful_eclass_list)
932  {
933  EquivalenceClass *cur_ec = lfirst(lc);
934  Expr *em_expr;
935  PathKey *pathkey;
936 
937  /* If redundant with what we did above, skip it. */
938  if (cur_ec == query_ec)
939  continue;
940 
941  /* If no pushable expression for this rel, skip it. */
942  em_expr = find_em_expr_for_rel(cur_ec, rel);
943  if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
944  continue;
945 
946  /* Looks like we can generate a pathkey, so let's do it. */
947  pathkey = make_canonical_pathkey(root, cur_ec,
948  linitial_oid(cur_ec->ec_opfamilies),
950  false);
951  useful_pathkeys_list = lappend(useful_pathkeys_list,
952  list_make1(pathkey));
953  }
954 
955  return useful_pathkeys_list;
956 }
957 
958 /*
959  * postgresGetForeignPaths
960  * Create possible scan paths for a scan on the foreign table
961  */
962 static void
964  RelOptInfo *baserel,
965  Oid foreigntableid)
966 {
967  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
968  ForeignPath *path;
969  List *ppi_list;
970  ListCell *lc;
971 
972  /*
973  * Create simplest ForeignScan path node and add it to baserel. This path
974  * corresponds to SeqScan path of regular tables (though depending on what
975  * baserestrict conditions we were able to send to remote, there might
976  * actually be an indexscan happening there). We already did all the work
977  * to estimate cost and size of this path.
978  *
979  * Although this path uses no join clauses, it could still have required
980  * parameterization due to LATERAL refs in its tlist.
981  */
982  path = create_foreignscan_path(root, baserel,
983  NULL, /* default pathtarget */
984  fpinfo->rows,
985  fpinfo->startup_cost,
986  fpinfo->total_cost,
987  NIL, /* no pathkeys */
988  baserel->lateral_relids,
989  NULL, /* no extra plan */
990  NIL); /* no fdw_private list */
991  add_path(baserel, (Path *) path);
992 
993  /* Add paths with pathkeys */
994  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
995 
996  /*
997  * If we're not using remote estimates, stop here. We have no way to
998  * estimate whether any join clauses would be worth sending across, so
999  * don't bother building parameterized paths.
1000  */
1001  if (!fpinfo->use_remote_estimate)
1002  return;
1003 
1004  /*
1005  * Thumb through all join clauses for the rel to identify which outer
1006  * relations could supply one or more safe-to-send-to-remote join clauses.
1007  * We'll build a parameterized path for each such outer relation.
1008  *
1009  * It's convenient to manage this by representing each candidate outer
1010  * relation by the ParamPathInfo node for it. We can then use the
1011  * ppi_clauses list in the ParamPathInfo node directly as a list of the
1012  * interesting join clauses for that rel. This takes care of the
1013  * possibility that there are multiple safe join clauses for such a rel,
1014  * and also ensures that we account for unsafe join clauses that we'll
1015  * still have to enforce locally (since the parameterized-path machinery
1016  * insists that we handle all movable clauses).
1017  */
1018  ppi_list = NIL;
1019  foreach(lc, baserel->joininfo)
1020  {
1021  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1022  Relids required_outer;
1023  ParamPathInfo *param_info;
1024 
1025  /* Check if clause can be moved to this rel */
1026  if (!join_clause_is_movable_to(rinfo, baserel))
1027  continue;
1028 
1029  /* See if it is safe to send to remote */
1030  if (!is_foreign_expr(root, baserel, rinfo->clause))
1031  continue;
1032 
1033  /* Calculate required outer rels for the resulting path */
1034  required_outer = bms_union(rinfo->clause_relids,
1035  baserel->lateral_relids);
1036  /* We do not want the foreign rel itself listed in required_outer */
1037  required_outer = bms_del_member(required_outer, baserel->relid);
1038 
1039  /*
1040  * required_outer probably can't be empty here, but if it were, we
1041  * couldn't make a parameterized path.
1042  */
1043  if (bms_is_empty(required_outer))
1044  continue;
1045 
1046  /* Get the ParamPathInfo */
1047  param_info = get_baserel_parampathinfo(root, baserel,
1048  required_outer);
1049  Assert(param_info != NULL);
1050 
1051  /*
1052  * Add it to list unless we already have it. Testing pointer equality
1053  * is OK since get_baserel_parampathinfo won't make duplicates.
1054  */
1055  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1056  }
1057 
1058  /*
1059  * The above scan examined only "generic" join clauses, not those that
1060  * were absorbed into EquivalenceClauses. See if we can make anything out
1061  * of EquivalenceClauses.
1062  */
1063  if (baserel->has_eclass_joins)
1064  {
1065  /*
1066  * We repeatedly scan the eclass list looking for column references
1067  * (or expressions) belonging to the foreign rel. Each time we find
1068  * one, we generate a list of equivalence joinclauses for it, and then
1069  * see if any are safe to send to the remote. Repeat till there are
1070  * no more candidate EC members.
1071  */
1073 
1074  arg.already_used = NIL;
1075  for (;;)
1076  {
1077  List *clauses;
1078 
1079  /* Make clauses, skipping any that join to lateral_referencers */
1080  arg.current = NULL;
1082  baserel,
1084  (void *) &arg,
1085  baserel->lateral_referencers);
1086 
1087  /* Done if there are no more expressions in the foreign rel */
1088  if (arg.current == NULL)
1089  {
1090  Assert(clauses == NIL);
1091  break;
1092  }
1093 
1094  /* Scan the extracted join clauses */
1095  foreach(lc, clauses)
1096  {
1097  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1098  Relids required_outer;
1099  ParamPathInfo *param_info;
1100 
1101  /* Check if clause can be moved to this rel */
1102  if (!join_clause_is_movable_to(rinfo, baserel))
1103  continue;
1104 
1105  /* See if it is safe to send to remote */
1106  if (!is_foreign_expr(root, baserel, rinfo->clause))
1107  continue;
1108 
1109  /* Calculate required outer rels for the resulting path */
1110  required_outer = bms_union(rinfo->clause_relids,
1111  baserel->lateral_relids);
1112  required_outer = bms_del_member(required_outer, baserel->relid);
1113  if (bms_is_empty(required_outer))
1114  continue;
1115 
1116  /* Get the ParamPathInfo */
1117  param_info = get_baserel_parampathinfo(root, baserel,
1118  required_outer);
1119  Assert(param_info != NULL);
1120 
1121  /* Add it to list unless we already have it */
1122  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1123  }
1124 
1125  /* Try again, now ignoring the expression we found this time */
1126  arg.already_used = lappend(arg.already_used, arg.current);
1127  }
1128  }
1129 
1130  /*
1131  * Now build a path for each useful outer relation.
1132  */
1133  foreach(lc, ppi_list)
1134  {
1135  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1136  double rows;
1137  int width;
1138  Cost startup_cost;
1139  Cost total_cost;
1140 
1141  /* Get a cost estimate from the remote */
1142  estimate_path_cost_size(root, baserel,
1143  param_info->ppi_clauses, NIL, NULL,
1144  &rows, &width,
1145  &startup_cost, &total_cost);
1146 
1147  /*
1148  * ppi_rows currently won't get looked at by anything, but still we
1149  * may as well ensure that it matches our idea of the rowcount.
1150  */
1151  param_info->ppi_rows = rows;
1152 
1153  /* Make the path */
1154  path = create_foreignscan_path(root, baserel,
1155  NULL, /* default pathtarget */
1156  rows,
1157  startup_cost,
1158  total_cost,
1159  NIL, /* no pathkeys */
1160  param_info->ppi_req_outer,
1161  NULL,
1162  NIL); /* no fdw_private list */
1163  add_path(baserel, (Path *) path);
1164  }
1165 }
1166 
1167 /*
1168  * postgresGetForeignPlan
1169  * Create ForeignScan plan node which implements selected best path
1170  */
1171 static ForeignScan *
1173  RelOptInfo *foreignrel,
1174  Oid foreigntableid,
1175  ForeignPath *best_path,
1176  List *tlist,
1177  List *scan_clauses,
1178  Plan *outer_plan)
1179 {
1180  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1181  Index scan_relid;
1182  List *fdw_private;
1183  List *remote_exprs = NIL;
1184  List *local_exprs = NIL;
1185  List *params_list = NIL;
1186  List *fdw_scan_tlist = NIL;
1187  List *fdw_recheck_quals = NIL;
1189  StringInfoData sql;
1190  bool has_final_sort = false;
1191  bool has_limit = false;
1192  ListCell *lc;
1193 
1194  /*
1195  * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1196  */
1197  if (best_path->fdw_private)
1198  {
1199  has_final_sort = intVal(list_nth(best_path->fdw_private,
1201  has_limit = intVal(list_nth(best_path->fdw_private,
1203  }
1204 
1205  if (IS_SIMPLE_REL(foreignrel))
1206  {
1207  /*
1208  * For base relations, set scan_relid as the relid of the relation.
1209  */
1210  scan_relid = foreignrel->relid;
1211 
1212  /*
1213  * In a base-relation scan, we must apply the given scan_clauses.
1214  *
1215  * Separate the scan_clauses into those that can be executed remotely
1216  * and those that can't. baserestrictinfo clauses that were
1217  * previously determined to be safe or unsafe by classifyConditions
1218  * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1219  * else in the scan_clauses list will be a join clause, which we have
1220  * to check for remote-safety.
1221  *
1222  * Note: the join clauses we see here should be the exact same ones
1223  * previously examined by postgresGetForeignPaths. Possibly it'd be
1224  * worth passing forward the classification work done then, rather
1225  * than repeating it here.
1226  *
1227  * This code must match "extract_actual_clauses(scan_clauses, false)"
1228  * except for the additional decision about remote versus local
1229  * execution.
1230  */
1231  foreach(lc, scan_clauses)
1232  {
1233  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1234 
1235  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1236  if (rinfo->pseudoconstant)
1237  continue;
1238 
1239  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1240  remote_exprs = lappend(remote_exprs, rinfo->clause);
1241  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1242  local_exprs = lappend(local_exprs, rinfo->clause);
1243  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1244  remote_exprs = lappend(remote_exprs, rinfo->clause);
1245  else
1246  local_exprs = lappend(local_exprs, rinfo->clause);
1247  }
1248 
1249  /*
1250  * For a base-relation scan, we have to support EPQ recheck, which
1251  * should recheck all the remote quals.
1252  */
1253  fdw_recheck_quals = remote_exprs;
1254  }
1255  else
1256  {
1257  /*
1258  * Join relation or upper relation - set scan_relid to 0.
1259  */
1260  scan_relid = 0;
1261 
1262  /*
1263  * For a join rel, baserestrictinfo is NIL and we are not considering
1264  * parameterization right now, so there should be no scan_clauses for
1265  * a joinrel or an upper rel either.
1266  */
1267  Assert(!scan_clauses);
1268 
1269  /*
1270  * Instead we get the conditions to apply from the fdw_private
1271  * structure.
1272  */
1273  remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1274  local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1275 
1276  /*
1277  * We leave fdw_recheck_quals empty in this case, since we never need
1278  * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1279  * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1280  * If we're planning an upperrel (ie, remote grouping or aggregation)
1281  * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1282  * allowed, and indeed we *can't* put the remote clauses into
1283  * fdw_recheck_quals because the unaggregated Vars won't be available
1284  * locally.
1285  */
1286 
1287  /* Build the list of columns to be fetched from the foreign server. */
1288  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1289 
1290  /*
1291  * Ensure that the outer plan produces a tuple whose descriptor
1292  * matches our scan tuple slot. Also, remove the local conditions
1293  * from outer plan's quals, lest they be evaluated twice, once by the
1294  * local plan and once by the scan.
1295  */
1296  if (outer_plan)
1297  {
1298  ListCell *lc;
1299 
1300  /*
1301  * Right now, we only consider grouping and aggregation beyond
1302  * joins. Queries involving aggregates or grouping do not require
1303  * EPQ mechanism, hence should not have an outer plan here.
1304  */
1305  Assert(!IS_UPPER_REL(foreignrel));
1306 
1307  /*
1308  * First, update the plan's qual list if possible. In some cases
1309  * the quals might be enforced below the topmost plan level, in
1310  * which case we'll fail to remove them; it's not worth working
1311  * harder than this.
1312  */
1313  foreach(lc, local_exprs)
1314  {
1315  Node *qual = lfirst(lc);
1316 
1317  outer_plan->qual = list_delete(outer_plan->qual, qual);
1318 
1319  /*
1320  * For an inner join the local conditions of foreign scan plan
1321  * can be part of the joinquals as well. (They might also be
1322  * in the mergequals or hashquals, but we can't touch those
1323  * without breaking the plan.)
1324  */
1325  if (IsA(outer_plan, NestLoop) ||
1326  IsA(outer_plan, MergeJoin) ||
1327  IsA(outer_plan, HashJoin))
1328  {
1329  Join *join_plan = (Join *) outer_plan;
1330 
1331  if (join_plan->jointype == JOIN_INNER)
1332  join_plan->joinqual = list_delete(join_plan->joinqual,
1333  qual);
1334  }
1335  }
1336 
1337  /*
1338  * Now fix the subplan's tlist --- this might result in inserting
1339  * a Result node atop the plan tree.
1340  */
1341  outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1342  best_path->path.parallel_safe);
1343  }
1344  }
1345 
1346  /*
1347  * Build the query string to be sent for execution, and identify
1348  * expressions to be sent as parameters.
1349  */
1350  initStringInfo(&sql);
1351  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1352  remote_exprs, best_path->path.pathkeys,
1353  has_final_sort, has_limit, false,
1354  &retrieved_attrs, &params_list);
1355 
1356  /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1357  fpinfo->final_remote_exprs = remote_exprs;
1358 
1359  /*
1360  * Build the fdw_private list that will be available to the executor.
1361  * Items in the list must match order in enum FdwScanPrivateIndex.
1362  */
1363  fdw_private = list_make3(makeString(sql.data),
1365  makeInteger(fpinfo->fetch_size));
1366  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1367  fdw_private = lappend(fdw_private,
1368  makeString(fpinfo->relation_name));
1369 
1370  /*
1371  * Create the ForeignScan node for the given relation.
1372  *
1373  * Note that the remote parameter expressions are stored in the fdw_exprs
1374  * field of the finished plan node; we can't keep them in private state
1375  * because then they wouldn't be subject to later planner processing.
1376  */
1377  return make_foreignscan(tlist,
1378  local_exprs,
1379  scan_relid,
1380  params_list,
1381  fdw_private,
1382  fdw_scan_tlist,
1383  fdw_recheck_quals,
1384  outer_plan);
1385 }
1386 
1387 /*
1388  * postgresBeginForeignScan
1389  * Initiate an executor scan of a foreign PostgreSQL table.
1390  */
1391 static void
1393 {
1394  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1395  EState *estate = node->ss.ps.state;
1396  PgFdwScanState *fsstate;
1397  RangeTblEntry *rte;
1398  Oid userid;
1399  ForeignTable *table;
1400  UserMapping *user;
1401  int rtindex;
1402  int numParams;
1403 
1404  /*
1405  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1406  */
1407  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1408  return;
1409 
1410  /*
1411  * We'll save private state in node->fdw_state.
1412  */
1413  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1414  node->fdw_state = (void *) fsstate;
1415 
1416  /*
1417  * Identify which user to do the remote access as. This should match what
1418  * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1419  * lowest-numbered member RTE as a representative; we would get the same
1420  * result from any.
1421  */
1422  if (fsplan->scan.scanrelid > 0)
1423  rtindex = fsplan->scan.scanrelid;
1424  else
1425  rtindex = bms_next_member(fsplan->fs_relids, -1);
1426  rte = exec_rt_fetch(rtindex, estate);
1427  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1428 
1429  /* Get info about foreign table. */
1430  table = GetForeignTable(rte->relid);
1431  user = GetUserMapping(userid, table->serverid);
1432 
1433  /*
1434  * Get connection to the foreign server. Connection manager will
1435  * establish new connection if necessary.
1436  */
1437  fsstate->conn = GetConnection(user, false);
1438 
1439  /* Assign a unique ID for my cursor */
1440  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1441  fsstate->cursor_exists = false;
1442 
1443  /* Get private info created by planner functions. */
1444  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1446  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1448  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1450 
1451  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1452  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1453  "postgres_fdw tuple data",
1455  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1456  "postgres_fdw temporary data",
1458 
1459  /*
1460  * Get info we'll need for converting data fetched from the foreign server
1461  * into local representation and error reporting during that process.
1462  */
1463  if (fsplan->scan.scanrelid > 0)
1464  {
1465  fsstate->rel = node->ss.ss_currentRelation;
1466  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1467  }
1468  else
1469  {
1470  fsstate->rel = NULL;
1471  fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1472  }
1473 
1474  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1475 
1476  /*
1477  * Prepare for processing of parameters used in remote query, if any.
1478  */
1479  numParams = list_length(fsplan->fdw_exprs);
1480  fsstate->numParams = numParams;
1481  if (numParams > 0)
1483  fsplan->fdw_exprs,
1484  numParams,
1485  &fsstate->param_flinfo,
1486  &fsstate->param_exprs,
1487  &fsstate->param_values);
1488 }
1489 
1490 /*
1491  * postgresIterateForeignScan
1492  * Retrieve next row from the result set, or clear tuple slot to indicate
1493  * EOF.
1494  */
1495 static TupleTableSlot *
1497 {
1498  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1499  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1500 
1501  /*
1502  * If this is the first call after Begin or ReScan, we need to create the
1503  * cursor on the remote side.
1504  */
1505  if (!fsstate->cursor_exists)
1506  create_cursor(node);
1507 
1508  /*
1509  * Get some more tuples, if we've run out.
1510  */
1511  if (fsstate->next_tuple >= fsstate->num_tuples)
1512  {
1513  /* No point in another fetch if we already detected EOF, though. */
1514  if (!fsstate->eof_reached)
1515  fetch_more_data(node);
1516  /* If we didn't get any tuples, must be end of data. */
1517  if (fsstate->next_tuple >= fsstate->num_tuples)
1518  return ExecClearTuple(slot);
1519  }
1520 
1521  /*
1522  * Return the next tuple.
1523  */
1524  ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1525  slot,
1526  false);
1527 
1528  return slot;
1529 }
1530 
1531 /*
1532  * postgresReScanForeignScan
1533  * Restart the scan.
1534  */
1535 static void
1537 {
1538  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1539  char sql[64];
1540  PGresult *res;
1541 
1542  /* If we haven't created the cursor yet, nothing to do. */
1543  if (!fsstate->cursor_exists)
1544  return;
1545 
1546  /*
1547  * If any internal parameters affecting this node have changed, we'd
1548  * better destroy and recreate the cursor. Otherwise, rewinding it should
1549  * be good enough. If we've only fetched zero or one batch, we needn't
1550  * even rewind the cursor, just rescan what we have.
1551  */
1552  if (node->ss.ps.chgParam != NULL)
1553  {
1554  fsstate->cursor_exists = false;
1555  snprintf(sql, sizeof(sql), "CLOSE c%u",
1556  fsstate->cursor_number);
1557  }
1558  else if (fsstate->fetch_ct_2 > 1)
1559  {
1560  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1561  fsstate->cursor_number);
1562  }
1563  else
1564  {
1565  /* Easy: just rescan what we already have in memory, if anything */
1566  fsstate->next_tuple = 0;
1567  return;
1568  }
1569 
1570  /*
1571  * We don't use a PG_TRY block here, so be careful not to throw error
1572  * without releasing the PGresult.
1573  */
1574  res = pgfdw_exec_query(fsstate->conn, sql);
1575  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1576  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1577  PQclear(res);
1578 
1579  /* Now force a fresh FETCH. */
1580  fsstate->tuples = NULL;
1581  fsstate->num_tuples = 0;
1582  fsstate->next_tuple = 0;
1583  fsstate->fetch_ct_2 = 0;
1584  fsstate->eof_reached = false;
1585 }
1586 
1587 /*
1588  * postgresEndForeignScan
1589  * Finish scanning foreign table and dispose objects used for this scan
1590  */
1591 static void
1593 {
1594  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1595 
1596  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1597  if (fsstate == NULL)
1598  return;
1599 
1600  /* Close the cursor if open, to prevent accumulation of cursors */
1601  if (fsstate->cursor_exists)
1602  close_cursor(fsstate->conn, fsstate->cursor_number);
1603 
1604  /* Release remote connection */
1605  ReleaseConnection(fsstate->conn);
1606  fsstate->conn = NULL;
1607 
1608  /* MemoryContexts will be deleted automatically. */
1609 }
1610 
1611 /*
1612  * postgresAddForeignUpdateTargets
1613  * Add resjunk column(s) needed for update/delete on a foreign table
1614  */
1615 static void
1617  RangeTblEntry *target_rte,
1618  Relation target_relation)
1619 {
1620  Var *var;
1621  const char *attrname;
1622  TargetEntry *tle;
1623 
1624  /*
1625  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1626  */
1627 
1628  /* Make a Var representing the desired value */
1629  var = makeVar(parsetree->resultRelation,
1631  TIDOID,
1632  -1,
1633  InvalidOid,
1634  0);
1635 
1636  /* Wrap it in a resjunk TLE with the right name ... */
1637  attrname = "ctid";
1638 
1639  tle = makeTargetEntry((Expr *) var,
1640  list_length(parsetree->targetList) + 1,
1641  pstrdup(attrname),
1642  true);
1643 
1644  /* ... and add it to the query's targetlist */
1645  parsetree->targetList = lappend(parsetree->targetList, tle);
1646 }
1647 
1648 /*
1649  * postgresPlanForeignModify
1650  * Plan an insert/update/delete operation on a foreign table
1651  */
1652 static List *
1654  ModifyTable *plan,
1655  Index resultRelation,
1656  int subplan_index)
1657 {
1658  CmdType operation = plan->operation;
1659  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1660  Relation rel;
1661  StringInfoData sql;
1662  List *targetAttrs = NIL;
1663  List *withCheckOptionList = NIL;
1664  List *returningList = NIL;
1666  bool doNothing = false;
1667 
1668  initStringInfo(&sql);
1669 
1670  /*
1671  * Core code already has some lock on each rel being planned, so we can
1672  * use NoLock here.
1673  */
1674  rel = table_open(rte->relid, NoLock);
1675 
1676  /*
1677  * In an INSERT, we transmit all columns that are defined in the foreign
1678  * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1679  * foreign table, we transmit all columns like INSERT; else we transmit
1680  * only columns that were explicitly targets of the UPDATE, so as to avoid
1681  * unnecessary data transmission. (We can't do that for INSERT since we
1682  * would miss sending default values for columns not listed in the source
1683  * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1684  * those triggers might change values for non-target columns, in which
1685  * case we would miss sending changed values for those columns.)
1686  */
1687  if (operation == CMD_INSERT ||
1688  (operation == CMD_UPDATE &&
1689  rel->trigdesc &&
1691  {
1693  int attnum;
1694 
1695  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1696  {
1697  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1698 
1699  if (!attr->attisdropped)
1700  targetAttrs = lappend_int(targetAttrs, attnum);
1701  }
1702  }
1703  else if (operation == CMD_UPDATE)
1704  {
1705  int col;
1706  Bitmapset *allUpdatedCols = bms_union(rte->updatedCols, rte->extraUpdatedCols);
1707 
1708  col = -1;
1709  while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1710  {
1711  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1713 
1714  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1715  elog(ERROR, "system-column update is not supported");
1716  targetAttrs = lappend_int(targetAttrs, attno);
1717  }
1718  }
1719 
1720  /*
1721  * Extract the relevant WITH CHECK OPTION list if any.
1722  */
1723  if (plan->withCheckOptionLists)
1724  withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1725  subplan_index);
1726 
1727  /*
1728  * Extract the relevant RETURNING list if any.
1729  */
1730  if (plan->returningLists)
1731  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1732 
1733  /*
1734  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1735  * should have already been rejected in the optimizer, as presently there
1736  * is no way to recognize an arbiter index on a foreign table. Only DO
1737  * NOTHING is supported without an inference specification.
1738  */
1739  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1740  doNothing = true;
1741  else if (plan->onConflictAction != ONCONFLICT_NONE)
1742  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1743  (int) plan->onConflictAction);
1744 
1745  /*
1746  * Construct the SQL command string.
1747  */
1748  switch (operation)
1749  {
1750  case CMD_INSERT:
1751  deparseInsertSql(&sql, rte, resultRelation, rel,
1752  targetAttrs, doNothing,
1753  withCheckOptionList, returningList,
1754  &retrieved_attrs);
1755  break;
1756  case CMD_UPDATE:
1757  deparseUpdateSql(&sql, rte, resultRelation, rel,
1758  targetAttrs,
1759  withCheckOptionList, returningList,
1760  &retrieved_attrs);
1761  break;
1762  case CMD_DELETE:
1763  deparseDeleteSql(&sql, rte, resultRelation, rel,
1764  returningList,
1765  &retrieved_attrs);
1766  break;
1767  default:
1768  elog(ERROR, "unexpected operation: %d", (int) operation);
1769  break;
1770  }
1771 
1772  table_close(rel, NoLock);
1773 
1774  /*
1775  * Build the fdw_private list that will be available to the executor.
1776  * Items in the list must match enum FdwModifyPrivateIndex, above.
1777  */
1778  return list_make4(makeString(sql.data),
1779  targetAttrs,
1780  makeInteger((retrieved_attrs != NIL)),
1781  retrieved_attrs);
1782 }
1783 
1784 /*
1785  * postgresBeginForeignModify
1786  * Begin an insert/update/delete operation on a foreign table
1787  */
1788 static void
1790  ResultRelInfo *resultRelInfo,
1791  List *fdw_private,
1792  int subplan_index,
1793  int eflags)
1794 {
1795  PgFdwModifyState *fmstate;
1796  char *query;
1797  List *target_attrs;
1798  bool has_returning;
1800  RangeTblEntry *rte;
1801 
1802  /*
1803  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1804  * stays NULL.
1805  */
1806  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1807  return;
1808 
1809  /* Deconstruct fdw_private data. */
1810  query = strVal(list_nth(fdw_private,
1812  target_attrs = (List *) list_nth(fdw_private,
1814  has_returning = intVal(list_nth(fdw_private,
1816  retrieved_attrs = (List *) list_nth(fdw_private,
1818 
1819  /* Find RTE. */
1820  rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1821  mtstate->ps.state);
1822 
1823  /* Construct an execution state. */
1824  fmstate = create_foreign_modify(mtstate->ps.state,
1825  rte,
1826  resultRelInfo,
1827  mtstate->operation,
1828  mtstate->mt_plans[subplan_index]->plan,
1829  query,
1830  target_attrs,
1831  has_returning,
1832  retrieved_attrs);
1833 
1834  resultRelInfo->ri_FdwState = fmstate;
1835 }
1836 
1837 /*
1838  * postgresExecForeignInsert
1839  * Insert one row into a foreign table
1840  */
1841 static TupleTableSlot *
1843  ResultRelInfo *resultRelInfo,
1844  TupleTableSlot *slot,
1845  TupleTableSlot *planSlot)
1846 {
1847  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1848  TupleTableSlot *rslot;
1849 
1850  /*
1851  * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1852  * postgresBeginForeignInsert())
1853  */
1854  if (fmstate->aux_fmstate)
1855  resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1856  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1857  slot, planSlot);
1858  /* Revert that change */
1859  if (fmstate->aux_fmstate)
1860  resultRelInfo->ri_FdwState = fmstate;
1861 
1862  return rslot;
1863 }
1864 
1865 /*
1866  * postgresExecForeignUpdate
1867  * Update one row in a foreign table
1868  */
1869 static TupleTableSlot *
1871  ResultRelInfo *resultRelInfo,
1872  TupleTableSlot *slot,
1873  TupleTableSlot *planSlot)
1874 {
1875  return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
1876  slot, planSlot);
1877 }
1878 
1879 /*
1880  * postgresExecForeignDelete
1881  * Delete one row from a foreign table
1882  */
1883 static TupleTableSlot *
1885  ResultRelInfo *resultRelInfo,
1886  TupleTableSlot *slot,
1887  TupleTableSlot *planSlot)
1888 {
1889  return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
1890  slot, planSlot);
1891 }
1892 
1893 /*
1894  * postgresEndForeignModify
1895  * Finish an insert/update/delete operation on a foreign table
1896  */
1897 static void
1899  ResultRelInfo *resultRelInfo)
1900 {
1901  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1902 
1903  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1904  if (fmstate == NULL)
1905  return;
1906 
1907  /* Destroy the execution state */
1908  finish_foreign_modify(fmstate);
1909 }
1910 
1911 /*
1912  * postgresBeginForeignInsert
1913  * Begin an insert operation on a foreign table
1914  */
1915 static void
1917  ResultRelInfo *resultRelInfo)
1918 {
1919  PgFdwModifyState *fmstate;
1920  ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
1921  EState *estate = mtstate->ps.state;
1922  Index resultRelation = resultRelInfo->ri_RangeTableIndex;
1923  Relation rel = resultRelInfo->ri_RelationDesc;
1924  RangeTblEntry *rte;
1926  int attnum;
1927  StringInfoData sql;
1928  List *targetAttrs = NIL;
1930  bool doNothing = false;
1931 
1932  /*
1933  * If the foreign table we are about to insert routed rows into is also an
1934  * UPDATE subplan result rel that will be updated later, proceeding with
1935  * the INSERT will result in the later UPDATE incorrectly modifying those
1936  * routed rows, so prevent the INSERT --- it would be nice if we could
1937  * handle this case; but for now, throw an error for safety.
1938  */
1939  if (plan && plan->operation == CMD_UPDATE &&
1940  (resultRelInfo->ri_usesFdwDirectModify ||
1941  resultRelInfo->ri_FdwState) &&
1942  resultRelInfo > mtstate->resultRelInfo + mtstate->mt_whichplan)
1943  ereport(ERROR,
1944  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1945  errmsg("cannot route tuples into foreign table to be updated \"%s\"",
1946  RelationGetRelationName(rel))));
1947 
1948  initStringInfo(&sql);
1949 
1950  /* We transmit all columns that are defined in the foreign table. */
1951  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1952  {
1953  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1954 
1955  if (!attr->attisdropped)
1956  targetAttrs = lappend_int(targetAttrs, attnum);
1957  }
1958 
1959  /* Check if we add the ON CONFLICT clause to the remote query. */
1960  if (plan)
1961  {
1962  OnConflictAction onConflictAction = plan->onConflictAction;
1963 
1964  /* We only support DO NOTHING without an inference specification. */
1965  if (onConflictAction == ONCONFLICT_NOTHING)
1966  doNothing = true;
1967  else if (onConflictAction != ONCONFLICT_NONE)
1968  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1969  (int) onConflictAction);
1970  }
1971 
1972  /*
1973  * If the foreign table is a partition, we need to create a new RTE
1974  * describing the foreign table for use by deparseInsertSql and
1975  * create_foreign_modify() below, after first copying the parent's RTE and
1976  * modifying some fields to describe the foreign partition to work on.
1977  * However, if this is invoked by UPDATE, the existing RTE may already
1978  * correspond to this partition if it is one of the UPDATE subplan target
1979  * rels; in that case, we can just use the existing RTE as-is.
1980  */
1981  rte = exec_rt_fetch(resultRelation, estate);
1982  if (rte->relid != RelationGetRelid(rel))
1983  {
1984  rte = copyObject(rte);
1985  rte->relid = RelationGetRelid(rel);
1986  rte->relkind = RELKIND_FOREIGN_TABLE;
1987 
1988  /*
1989  * For UPDATE, we must use the RT index of the first subplan target
1990  * rel's RTE, because the core code would have built expressions for
1991  * the partition, such as RETURNING, using that RT index as varno of
1992  * Vars contained in those expressions.
1993  */
1994  if (plan && plan->operation == CMD_UPDATE &&
1995  resultRelation == plan->rootRelation)
1996  resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
1997  }
1998 
1999  /* Construct the SQL command string. */
2000  deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2001  resultRelInfo->ri_WithCheckOptions,
2002  resultRelInfo->ri_returningList,
2003  &retrieved_attrs);
2004 
2005  /* Construct an execution state. */
2006  fmstate = create_foreign_modify(mtstate->ps.state,
2007  rte,
2008  resultRelInfo,
2009  CMD_INSERT,
2010  NULL,
2011  sql.data,
2012  targetAttrs,
2013  retrieved_attrs != NIL,
2014  retrieved_attrs);
2015 
2016  /*
2017  * If the given resultRelInfo already has PgFdwModifyState set, it means
2018  * the foreign table is an UPDATE subplan result rel; in which case, store
2019  * the resulting state into the aux_fmstate of the PgFdwModifyState.
2020  */
2021  if (resultRelInfo->ri_FdwState)
2022  {
2023  Assert(plan && plan->operation == CMD_UPDATE);
2024  Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2025  ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2026  }
2027  else
2028  resultRelInfo->ri_FdwState = fmstate;
2029 }
2030 
2031 /*
2032  * postgresEndForeignInsert
2033  * Finish an insert operation on a foreign table
2034  */
2035 static void
2037  ResultRelInfo *resultRelInfo)
2038 {
2039  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2040 
2041  Assert(fmstate != NULL);
2042 
2043  /*
2044  * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2045  * postgresBeginForeignInsert())
2046  */
2047  if (fmstate->aux_fmstate)
2048  fmstate = fmstate->aux_fmstate;
2049 
2050  /* Destroy the execution state */
2051  finish_foreign_modify(fmstate);
2052 }
2053 
2054 /*
2055  * postgresIsForeignRelUpdatable
2056  * Determine whether a foreign table supports INSERT, UPDATE and/or
2057  * DELETE.
2058  */
2059 static int
2061 {
2062  bool updatable;
2063  ForeignTable *table;
2064  ForeignServer *server;
2065  ListCell *lc;
2066 
2067  /*
2068  * By default, all postgres_fdw foreign tables are assumed updatable. This
2069  * can be overridden by a per-server setting, which in turn can be
2070  * overridden by a per-table setting.
2071  */
2072  updatable = true;
2073 
2074  table = GetForeignTable(RelationGetRelid(rel));
2075  server = GetForeignServer(table->serverid);
2076 
2077  foreach(lc, server->options)
2078  {
2079  DefElem *def = (DefElem *) lfirst(lc);
2080 
2081  if (strcmp(def->defname, "updatable") == 0)
2082  updatable = defGetBoolean(def);
2083  }
2084  foreach(lc, table->options)
2085  {
2086  DefElem *def = (DefElem *) lfirst(lc);
2087 
2088  if (strcmp(def->defname, "updatable") == 0)
2089  updatable = defGetBoolean(def);
2090  }
2091 
2092  /*
2093  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2094  */
2095  return updatable ?
2096  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2097 }
2098 
2099 /*
2100  * postgresRecheckForeignScan
2101  * Execute a local join execution plan for a foreign join
2102  */
2103 static bool
2105 {
2106  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2108  TupleTableSlot *result;
2109 
2110  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2111  if (scanrelid > 0)
2112  return true;
2113 
2114  Assert(outerPlan != NULL);
2115 
2116  /* Execute a local join execution plan */
2117  result = ExecProcNode(outerPlan);
2118  if (TupIsNull(result))
2119  return false;
2120 
2121  /* Store result in the given slot */
2122  ExecCopySlot(slot, result);
2123 
2124  return true;
2125 }
2126 
2127 /*
2128  * postgresPlanDirectModify
2129  * Consider a direct foreign table modification
2130  *
2131  * Decide whether it is safe to modify a foreign table directly, and if so,
2132  * rewrite subplan accordingly.
2133  */
2134 static bool
2136  ModifyTable *plan,
2137  Index resultRelation,
2138  int subplan_index)
2139 {
2140  CmdType operation = plan->operation;
2141  Plan *subplan;
2142  RelOptInfo *foreignrel;
2143  RangeTblEntry *rte;
2144  PgFdwRelationInfo *fpinfo;
2145  Relation rel;
2146  StringInfoData sql;
2147  ForeignScan *fscan;
2148  List *targetAttrs = NIL;
2149  List *remote_exprs;
2150  List *params_list = NIL;
2151  List *returningList = NIL;
2153 
2154  /*
2155  * Decide whether it is safe to modify a foreign table directly.
2156  */
2157 
2158  /*
2159  * The table modification must be an UPDATE or DELETE.
2160  */
2161  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2162  return false;
2163 
2164  /*
2165  * It's unsafe to modify a foreign table directly if there are any local
2166  * joins needed.
2167  */
2168  subplan = (Plan *) list_nth(plan->plans, subplan_index);
2169  if (!IsA(subplan, ForeignScan))
2170  return false;
2171  fscan = (ForeignScan *) subplan;
2172 
2173  /*
2174  * It's unsafe to modify a foreign table directly if there are any quals
2175  * that should be evaluated locally.
2176  */
2177  if (subplan->qual != NIL)
2178  return false;
2179 
2180  /* Safe to fetch data about the target foreign rel */
2181  if (fscan->scan.scanrelid == 0)
2182  {
2183  foreignrel = find_join_rel(root, fscan->fs_relids);
2184  /* We should have a rel for this foreign join. */
2185  Assert(foreignrel);
2186  }
2187  else
2188  foreignrel = root->simple_rel_array[resultRelation];
2189  rte = root->simple_rte_array[resultRelation];
2190  fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2191 
2192  /*
2193  * It's unsafe to update a foreign table directly, if any expressions to
2194  * assign to the target columns are unsafe to evaluate remotely.
2195  */
2196  if (operation == CMD_UPDATE)
2197  {
2198  int col;
2199 
2200  /*
2201  * We transmit only columns that were explicitly targets of the
2202  * UPDATE, so as to avoid unnecessary data transmission.
2203  */
2204  col = -1;
2205  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2206  {
2207  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2209  TargetEntry *tle;
2210 
2211  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2212  elog(ERROR, "system-column update is not supported");
2213 
2214  tle = get_tle_by_resno(subplan->targetlist, attno);
2215 
2216  if (!tle)
2217  elog(ERROR, "attribute number %d not found in subplan targetlist",
2218  attno);
2219 
2220  if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2221  return false;
2222 
2223  targetAttrs = lappend_int(targetAttrs, attno);
2224  }
2225  }
2226 
2227  /*
2228  * Ok, rewrite subplan so as to modify the foreign table directly.
2229  */
2230  initStringInfo(&sql);
2231 
2232  /*
2233  * Core code already has some lock on each rel being planned, so we can
2234  * use NoLock here.
2235  */
2236  rel = table_open(rte->relid, NoLock);
2237 
2238  /*
2239  * Recall the qual clauses that must be evaluated remotely. (These are
2240  * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2241  * doesn't care.)
2242  */
2243  remote_exprs = fpinfo->final_remote_exprs;
2244 
2245  /*
2246  * Extract the relevant RETURNING list if any.
2247  */
2248  if (plan->returningLists)
2249  {
2250  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2251 
2252  /*
2253  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2254  * we fetch from the foreign server any Vars specified in RETURNING
2255  * that refer not only to the target relation but to non-target
2256  * relations. So we'll deparse them into the RETURNING clause of the
2257  * remote query; use a targetlist consisting of them instead, which
2258  * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2259  * node below.
2260  */
2261  if (fscan->scan.scanrelid == 0)
2262  returningList = build_remote_returning(resultRelation, rel,
2263  returningList);
2264  }
2265 
2266  /*
2267  * Construct the SQL command string.
2268  */
2269  switch (operation)
2270  {
2271  case CMD_UPDATE:
2272  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2273  foreignrel,
2274  ((Plan *) fscan)->targetlist,
2275  targetAttrs,
2276  remote_exprs, &params_list,
2277  returningList, &retrieved_attrs);
2278  break;
2279  case CMD_DELETE:
2280  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2281  foreignrel,
2282  remote_exprs, &params_list,
2283  returningList, &retrieved_attrs);
2284  break;
2285  default:
2286  elog(ERROR, "unexpected operation: %d", (int) operation);
2287  break;
2288  }
2289 
2290  /*
2291  * Update the operation info.
2292  */
2293  fscan->operation = operation;
2294 
2295  /*
2296  * Update the fdw_exprs list that will be available to the executor.
2297  */
2298  fscan->fdw_exprs = params_list;
2299 
2300  /*
2301  * Update the fdw_private list that will be available to the executor.
2302  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2303  */
2304  fscan->fdw_private = list_make4(makeString(sql.data),
2305  makeInteger((retrieved_attrs != NIL)),
2306  retrieved_attrs,
2307  makeInteger(plan->canSetTag));
2308 
2309  /*
2310  * Update the foreign-join-related fields.
2311  */
2312  if (fscan->scan.scanrelid == 0)
2313  {
2314  /* No need for the outer subplan. */
2315  fscan->scan.plan.lefttree = NULL;
2316 
2317  /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2318  if (returningList)
2319  rebuild_fdw_scan_tlist(fscan, returningList);
2320  }
2321 
2322  table_close(rel, NoLock);
2323  return true;
2324 }
2325 
2326 /*
2327  * postgresBeginDirectModify
2328  * Prepare a direct foreign table modification
2329  */
2330 static void
2332 {
2333  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2334  EState *estate = node->ss.ps.state;
2335  PgFdwDirectModifyState *dmstate;
2336  Index rtindex;
2337  RangeTblEntry *rte;
2338  Oid userid;
2339  ForeignTable *table;
2340  UserMapping *user;
2341  int numParams;
2342 
2343  /*
2344  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2345  */
2346  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2347  return;
2348 
2349  /*
2350  * We'll save private state in node->fdw_state.
2351  */
2352  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2353  node->fdw_state = (void *) dmstate;
2354 
2355  /*
2356  * Identify which user to do the remote access as. This should match what
2357  * ExecCheckRTEPerms() does.
2358  */
2359  rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2360  rte = exec_rt_fetch(rtindex, estate);
2361  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2362 
2363  /* Get info about foreign table. */
2364  if (fsplan->scan.scanrelid == 0)
2365  dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2366  else
2367  dmstate->rel = node->ss.ss_currentRelation;
2368  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2369  user = GetUserMapping(userid, table->serverid);
2370 
2371  /*
2372  * Get connection to the foreign server. Connection manager will
2373  * establish new connection if necessary.
2374  */
2375  dmstate->conn = GetConnection(user, false);
2376 
2377  /* Update the foreign-join-related fields. */
2378  if (fsplan->scan.scanrelid == 0)
2379  {
2380  /* Save info about foreign table. */
2381  dmstate->resultRel = dmstate->rel;
2382 
2383  /*
2384  * Set dmstate->rel to NULL to teach get_returning_data() and
2385  * make_tuple_from_result_row() that columns fetched from the remote
2386  * server are described by fdw_scan_tlist of the foreign-scan plan
2387  * node, not the tuple descriptor for the target relation.
2388  */
2389  dmstate->rel = NULL;
2390  }
2391 
2392  /* Initialize state variable */
2393  dmstate->num_tuples = -1; /* -1 means not set yet */
2394 
2395  /* Get private info created by planner functions. */
2396  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2398  dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2400  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2402  dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2404 
2405  /* Create context for per-tuple temp workspace. */
2406  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2407  "postgres_fdw temporary data",
2409 
2410  /* Prepare for input conversion of RETURNING results. */
2411  if (dmstate->has_returning)
2412  {
2414 
2415  if (fsplan->scan.scanrelid == 0)
2416  tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2417  else
2418  tupdesc = RelationGetDescr(dmstate->rel);
2419 
2420  dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2421 
2422  /*
2423  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2424  * initialize a filter to extract an updated/deleted tuple from a scan
2425  * tuple.
2426  */
2427  if (fsplan->scan.scanrelid == 0)
2428  init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2429  }
2430 
2431  /*
2432  * Prepare for processing of parameters used in remote query, if any.
2433  */
2434  numParams = list_length(fsplan->fdw_exprs);
2435  dmstate->numParams = numParams;
2436  if (numParams > 0)
2438  fsplan->fdw_exprs,
2439  numParams,
2440  &dmstate->param_flinfo,
2441  &dmstate->param_exprs,
2442  &dmstate->param_values);
2443 }
2444 
2445 /*
2446  * postgresIterateDirectModify
2447  * Execute a direct foreign table modification
2448  */
2449 static TupleTableSlot *
2451 {
2453  EState *estate = node->ss.ps.state;
2454  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2455 
2456  /*
2457  * If this is the first call after Begin, execute the statement.
2458  */
2459  if (dmstate->num_tuples == -1)
2460  execute_dml_stmt(node);
2461 
2462  /*
2463  * If the local query doesn't specify RETURNING, just clear tuple slot.
2464  */
2465  if (!resultRelInfo->ri_projectReturning)
2466  {
2467  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2468  Instrumentation *instr = node->ss.ps.instrument;
2469 
2470  Assert(!dmstate->has_returning);
2471 
2472  /* Increment the command es_processed count if necessary. */
2473  if (dmstate->set_processed)
2474  estate->es_processed += dmstate->num_tuples;
2475 
2476  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2477  if (instr)
2478  instr->tuplecount += dmstate->num_tuples;
2479 
2480  return ExecClearTuple(slot);
2481  }
2482 
2483  /*
2484  * Get the next RETURNING tuple.
2485  */
2486  return get_returning_data(node);
2487 }
2488 
2489 /*
2490  * postgresEndDirectModify
2491  * Finish a direct foreign table modification
2492  */
2493 static void
2495 {
2497 
2498  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2499  if (dmstate == NULL)
2500  return;
2501 
2502  /* Release PGresult */
2503  if (dmstate->result)
2504  PQclear(dmstate->result);
2505 
2506  /* Release remote connection */
2507  ReleaseConnection(dmstate->conn);
2508  dmstate->conn = NULL;
2509 
2510  /* MemoryContext will be deleted automatically. */
2511 }
2512 
2513 /*
2514  * postgresExplainForeignScan
2515  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2516  */
2517 static void
2519 {
2520  ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan);
2521  List *fdw_private = plan->fdw_private;
2522 
2523  /*
2524  * Identify foreign scans that are really joins or upper relations. The
2525  * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2526  * digit string(s), which are RT indexes, with the correct relation names.
2527  * We do that here, not when the plan is created, because we can't know
2528  * what aliases ruleutils.c will assign at plan creation time.
2529  */
2530  if (list_length(fdw_private) > FdwScanPrivateRelations)
2531  {
2532  StringInfo relations;
2533  char *rawrelations;
2534  char *ptr;
2535  int minrti,
2536  rtoffset;
2537 
2538  rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2539 
2540  /*
2541  * A difficulty with using a string representation of RT indexes is
2542  * that setrefs.c won't update the string when flattening the
2543  * rangetable. To find out what rtoffset was applied, identify the
2544  * minimum RT index appearing in the string and compare it to the
2545  * minimum member of plan->fs_relids. (We expect all the relids in
2546  * the join will have been offset by the same amount; the Asserts
2547  * below should catch it if that ever changes.)
2548  */
2549  minrti = INT_MAX;
2550  ptr = rawrelations;
2551  while (*ptr)
2552  {
2553  if (isdigit((unsigned char) *ptr))
2554  {
2555  int rti = strtol(ptr, &ptr, 10);
2556 
2557  if (rti < minrti)
2558  minrti = rti;
2559  }
2560  else
2561  ptr++;
2562  }
2563  rtoffset = bms_next_member(plan->fs_relids, -1) - minrti;
2564 
2565  /* Now we can translate the string */
2566  relations = makeStringInfo();
2567  ptr = rawrelations;
2568  while (*ptr)
2569  {
2570  if (isdigit((unsigned char) *ptr))
2571  {
2572  int rti = strtol(ptr, &ptr, 10);
2573  RangeTblEntry *rte;
2574  char *relname;
2575  char *refname;
2576 
2577  rti += rtoffset;
2578  Assert(bms_is_member(rti, plan->fs_relids));
2579  rte = rt_fetch(rti, es->rtable);
2580  Assert(rte->rtekind == RTE_RELATION);
2581  /* This logic should agree with explain.c's ExplainTargetRel */
2582  relname = get_rel_name(rte->relid);
2583  if (es->verbose)
2584  {
2585  char *namespace;
2586 
2587  namespace = get_namespace_name(get_rel_namespace(rte->relid));
2588  appendStringInfo(relations, "%s.%s",
2589  quote_identifier(namespace),
2590  quote_identifier(relname));
2591  }
2592  else
2593  appendStringInfo(relations, "%s",
2594  quote_identifier(relname));
2595  refname = (char *) list_nth(es->rtable_names, rti - 1);
2596  if (refname == NULL)
2597  refname = rte->eref->aliasname;
2598  if (strcmp(refname, relname) != 0)
2599  appendStringInfo(relations, " %s",
2600  quote_identifier(refname));
2601  }
2602  else
2603  appendStringInfoChar(relations, *ptr++);
2604  }
2605  ExplainPropertyText("Relations", relations->data, es);
2606  }
2607 
2608  /*
2609  * Add remote query, when VERBOSE option is specified.
2610  */
2611  if (es->verbose)
2612  {
2613  char *sql;
2614 
2615  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2616  ExplainPropertyText("Remote SQL", sql, es);
2617  }
2618 }
2619 
2620 /*
2621  * postgresExplainForeignModify
2622  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2623  */
2624 static void
2626  ResultRelInfo *rinfo,
2627  List *fdw_private,
2628  int subplan_index,
2629  ExplainState *es)
2630 {
2631  if (es->verbose)
2632  {
2633  char *sql = strVal(list_nth(fdw_private,
2635 
2636  ExplainPropertyText("Remote SQL", sql, es);
2637  }
2638 }
2639 
2640 /*
2641  * postgresExplainDirectModify
2642  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2643  * foreign table directly
2644  */
2645 static void
2647 {
2648  List *fdw_private;
2649  char *sql;
2650 
2651  if (es->verbose)
2652  {
2653  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2654  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2655  ExplainPropertyText("Remote SQL", sql, es);
2656  }
2657 }
2658 
2659 
2660 /*
2661  * estimate_path_cost_size
2662  * Get cost and size estimates for a foreign scan on given foreign relation
2663  * either a base relation or a join between foreign relations or an upper
2664  * relation containing foreign relations.
2665  *
2666  * param_join_conds are the parameterization clauses with outer relations.
2667  * pathkeys specify the expected sort order if any for given path being costed.
2668  * fpextra specifies additional post-scan/join-processing steps such as the
2669  * final sort and the LIMIT restriction.
2670  *
2671  * The function returns the cost and size estimates in p_rows, p_width,
2672  * p_startup_cost and p_total_cost variables.
2673  */
2674 static void
2676  RelOptInfo *foreignrel,
2677  List *param_join_conds,
2678  List *pathkeys,
2679  PgFdwPathExtraData *fpextra,
2680  double *p_rows, int *p_width,
2681  Cost *p_startup_cost, Cost *p_total_cost)
2682 {
2683  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2684  double rows;
2685  double retrieved_rows;
2686  int width;
2687  Cost startup_cost;
2688  Cost total_cost;
2689 
2690  /* Make sure the core code has set up the relation's reltarget */
2691  Assert(foreignrel->reltarget);
2692 
2693  /*
2694  * If the table or the server is configured to use remote estimates,
2695  * connect to the foreign server and execute EXPLAIN to estimate the
2696  * number of rows selected by the restriction+join clauses. Otherwise,
2697  * estimate rows using whatever statistics we have locally, in a way
2698  * similar to ordinary tables.
2699  */
2700  if (fpinfo->use_remote_estimate)
2701  {
2702  List *remote_param_join_conds;
2703  List *local_param_join_conds;
2704  StringInfoData sql;
2705  PGconn *conn;
2706  Selectivity local_sel;
2707  QualCost local_cost;
2708  List *fdw_scan_tlist = NIL;
2709  List *remote_conds;
2710 
2711  /* Required only to be passed to deparseSelectStmtForRel */
2713 
2714  /*
2715  * param_join_conds might contain both clauses that are safe to send
2716  * across, and clauses that aren't.
2717  */
2718  classifyConditions(root, foreignrel, param_join_conds,
2719  &remote_param_join_conds, &local_param_join_conds);
2720 
2721  /* Build the list of columns to be fetched from the foreign server. */
2722  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2723  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2724  else
2725  fdw_scan_tlist = NIL;
2726 
2727  /*
2728  * The complete list of remote conditions includes everything from
2729  * baserestrictinfo plus any extra join_conds relevant to this
2730  * particular path.
2731  */
2732  remote_conds = list_concat(remote_param_join_conds,
2733  fpinfo->remote_conds);
2734 
2735  /*
2736  * Construct EXPLAIN query including the desired SELECT, FROM, and
2737  * WHERE clauses. Params and other-relation Vars are replaced by dummy
2738  * values, so don't request params_list.
2739  */
2740  initStringInfo(&sql);
2741  appendStringInfoString(&sql, "EXPLAIN ");
2742  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2743  remote_conds, pathkeys,
2744  fpextra ? fpextra->has_final_sort : false,
2745  fpextra ? fpextra->has_limit : false,
2746  false, &retrieved_attrs, NULL);
2747 
2748  /* Get the remote estimate */
2749  conn = GetConnection(fpinfo->user, false);
2750  get_remote_estimate(sql.data, conn, &rows, &width,
2751  &startup_cost, &total_cost);
2752  ReleaseConnection(conn);
2753 
2754  retrieved_rows = rows;
2755 
2756  /* Factor in the selectivity of the locally-checked quals */
2757  local_sel = clauselist_selectivity(root,
2758  local_param_join_conds,
2759  foreignrel->relid,
2760  JOIN_INNER,
2761  NULL);
2762  local_sel *= fpinfo->local_conds_sel;
2763 
2764  rows = clamp_row_est(rows * local_sel);
2765 
2766  /* Add in the eval cost of the locally-checked quals */
2767  startup_cost += fpinfo->local_conds_cost.startup;
2768  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2769  cost_qual_eval(&local_cost, local_param_join_conds, root);
2770  startup_cost += local_cost.startup;
2771  total_cost += local_cost.per_tuple * retrieved_rows;
2772 
2773  /*
2774  * Add in tlist eval cost for each output row. In case of an
2775  * aggregate, some of the tlist expressions such as grouping
2776  * expressions will be evaluated remotely, so adjust the costs.
2777  */
2778  startup_cost += foreignrel->reltarget->cost.startup;
2779  total_cost += foreignrel->reltarget->cost.startup;
2780  total_cost += foreignrel->reltarget->cost.per_tuple * rows;
2781  if (IS_UPPER_REL(foreignrel))
2782  {
2783  QualCost tlist_cost;
2784 
2785  cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
2786  startup_cost -= tlist_cost.startup;
2787  total_cost -= tlist_cost.startup;
2788  total_cost -= tlist_cost.per_tuple * rows;
2789  }
2790  }
2791  else
2792  {
2793  Cost run_cost = 0;
2794 
2795  /*
2796  * We don't support join conditions in this mode (hence, no
2797  * parameterized paths can be made).
2798  */
2799  Assert(param_join_conds == NIL);
2800 
2801  /*
2802  * We will come here again and again with different set of pathkeys or
2803  * additional post-scan/join-processing steps that caller wants to
2804  * cost. We don't need to calculate the cost/size estimates for the
2805  * underlying scan, join, or grouping each time. Instead, use those
2806  * estimates if we have cached them already.
2807  */
2808  if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
2809  {
2810  Assert(fpinfo->retrieved_rows >= 1);
2811 
2812  rows = fpinfo->rows;
2813  retrieved_rows = fpinfo->retrieved_rows;
2814  width = fpinfo->width;
2815  startup_cost = fpinfo->rel_startup_cost;
2816  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2817 
2818  /*
2819  * If we estimate the costs of a foreign scan or a foreign join
2820  * with additional post-scan/join-processing steps, the scan or
2821  * join costs obtained from the cache wouldn't yet contain the
2822  * eval costs for the final scan/join target, which would've been
2823  * updated by apply_scanjoin_target_to_paths(); add the eval costs
2824  * now.
2825  */
2826  if (fpextra && !IS_UPPER_REL(foreignrel))
2827  {
2828  /* Shouldn't get here unless we have LIMIT */
2829  Assert(fpextra->has_limit);
2830  Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
2831  foreignrel->reloptkind == RELOPT_JOINREL);
2832  startup_cost += foreignrel->reltarget->cost.startup;
2833  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2834  }
2835  }
2836  else if (IS_JOIN_REL(foreignrel))
2837  {
2838  PgFdwRelationInfo *fpinfo_i;
2839  PgFdwRelationInfo *fpinfo_o;
2840  QualCost join_cost;
2841  QualCost remote_conds_cost;
2842  double nrows;
2843 
2844  /* Use rows/width estimates made by the core code. */
2845  rows = foreignrel->rows;
2846  width = foreignrel->reltarget->width;
2847 
2848  /* For join we expect inner and outer relations set */
2849  Assert(fpinfo->innerrel && fpinfo->outerrel);
2850 
2851  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2852  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2853 
2854  /* Estimate of number of rows in cross product */
2855  nrows = fpinfo_i->rows * fpinfo_o->rows;
2856 
2857  /*
2858  * Back into an estimate of the number of retrieved rows. Just in
2859  * case this is nuts, clamp to at most nrows.
2860  */
2861  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2862  retrieved_rows = Min(retrieved_rows, nrows);
2863 
2864  /*
2865  * The cost of foreign join is estimated as cost of generating
2866  * rows for the joining relations + cost for applying quals on the
2867  * rows.
2868  */
2869 
2870  /*
2871  * Calculate the cost of clauses pushed down to the foreign server
2872  */
2873  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2874  /* Calculate the cost of applying join clauses */
2875  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2876 
2877  /*
2878  * Startup cost includes startup cost of joining relations and the
2879  * startup cost for join and other clauses. We do not include the
2880  * startup cost specific to join strategy (e.g. setting up hash
2881  * tables) since we do not know what strategy the foreign server
2882  * is going to use.
2883  */
2884  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2885  startup_cost += join_cost.startup;
2886  startup_cost += remote_conds_cost.startup;
2887  startup_cost += fpinfo->local_conds_cost.startup;
2888 
2889  /*
2890  * Run time cost includes:
2891  *
2892  * 1. Run time cost (total_cost - startup_cost) of relations being
2893  * joined
2894  *
2895  * 2. Run time cost of applying join clauses on the cross product
2896  * of the joining relations.
2897  *
2898  * 3. Run time cost of applying pushed down other clauses on the
2899  * result of join
2900  *
2901  * 4. Run time cost of applying nonpushable other clauses locally
2902  * on the result fetched from the foreign server.
2903  */
2904  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2905  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2906  run_cost += nrows * join_cost.per_tuple;
2907  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2908  run_cost += nrows * remote_conds_cost.per_tuple;
2909  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2910 
2911  /* Add in tlist eval cost for each output row */
2912  startup_cost += foreignrel->reltarget->cost.startup;
2913  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
2914  }
2915  else if (IS_UPPER_REL(foreignrel))
2916  {
2917  RelOptInfo *outerrel = fpinfo->outerrel;
2918  PgFdwRelationInfo *ofpinfo;
2919  AggClauseCosts aggcosts;
2920  double input_rows;
2921  int numGroupCols;
2922  double numGroups = 1;
2923 
2924  /* The upper relation should have its outer relation set */
2925  Assert(outerrel);
2926  /* and that outer relation should have its reltarget set */
2927  Assert(outerrel->reltarget);
2928 
2929  /*
2930  * This cost model is mixture of costing done for sorted and
2931  * hashed aggregates in cost_agg(). We are not sure which
2932  * strategy will be considered at remote side, thus for
2933  * simplicity, we put all startup related costs in startup_cost
2934  * and all finalization and run cost are added in total_cost.
2935  */
2936 
2937  ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
2938 
2939  /* Get rows from input rel */
2940  input_rows = ofpinfo->rows;
2941 
2942  /* Collect statistics about aggregates for estimating costs. */
2943  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2944  if (root->parse->hasAggs)
2945  {
2946  get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2947  AGGSPLIT_SIMPLE, &aggcosts);
2948 
2949  /*
2950  * The cost of aggregates in the HAVING qual will be the same
2951  * for each child as it is for the parent, so there's no need
2952  * to use a translated version of havingQual.
2953  */
2954  get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2955  AGGSPLIT_SIMPLE, &aggcosts);
2956  }
2957 
2958  /* Get number of grouping columns and possible number of groups */
2959  numGroupCols = list_length(root->parse->groupClause);
2960  numGroups = estimate_num_groups(root,
2962  fpinfo->grouped_tlist),
2963  input_rows, NULL);
2964 
2965  /*
2966  * Get the retrieved_rows and rows estimates. If there are HAVING
2967  * quals, account for their selectivity.
2968  */
2969  if (root->parse->havingQual)
2970  {
2971  /* Factor in the selectivity of the remotely-checked quals */
2972  retrieved_rows =
2973  clamp_row_est(numGroups *
2975  fpinfo->remote_conds,
2976  0,
2977  JOIN_INNER,
2978  NULL));
2979  /* Factor in the selectivity of the locally-checked quals */
2980  rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
2981  }
2982  else
2983  {
2984  rows = retrieved_rows = numGroups;
2985  }
2986 
2987  /* Use width estimate made by the core code. */
2988  width = foreignrel->reltarget->width;
2989 
2990  /*-----
2991  * Startup cost includes:
2992  * 1. Startup cost for underneath input relation, adjusted for
2993  * tlist replacement by apply_scanjoin_target_to_paths()
2994  * 2. Cost of performing aggregation, per cost_agg()
2995  *-----
2996  */
2997  startup_cost = ofpinfo->rel_startup_cost;
2998  startup_cost += outerrel->reltarget->cost.startup;
2999  startup_cost += aggcosts.transCost.startup;
3000  startup_cost += aggcosts.transCost.per_tuple * input_rows;
3001  startup_cost += aggcosts.finalCost.startup;
3002  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3003 
3004  /*-----
3005  * Run time cost includes:
3006  * 1. Run time cost of underneath input relation, adjusted for
3007  * tlist replacement by apply_scanjoin_target_to_paths()
3008  * 2. Run time cost of performing aggregation, per cost_agg()
3009  *-----
3010  */
3011  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3012  run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3013  run_cost += aggcosts.finalCost.per_tuple * numGroups;
3014  run_cost += cpu_tuple_cost * numGroups;
3015 
3016  /* Account for the eval cost of HAVING quals, if any */
3017  if (root->parse->havingQual)
3018  {
3019  QualCost remote_cost;
3020 
3021  /* Add in the eval cost of the remotely-checked quals */
3022  cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3023  startup_cost += remote_cost.startup;
3024  run_cost += remote_cost.per_tuple * numGroups;
3025  /* Add in the eval cost of the locally-checked quals */
3026  startup_cost += fpinfo->local_conds_cost.startup;
3027  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3028  }
3029 
3030  /* Add in tlist eval cost for each output row */
3031  startup_cost += foreignrel->reltarget->cost.startup;
3032  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3033  }
3034  else
3035  {
3036  Cost cpu_per_tuple;
3037 
3038  /* Use rows/width estimates made by set_baserel_size_estimates. */
3039  rows = foreignrel->rows;
3040  width = foreignrel->reltarget->width;
3041 
3042  /*
3043  * Back into an estimate of the number of retrieved rows. Just in
3044  * case this is nuts, clamp to at most foreignrel->tuples.
3045  */
3046  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3047  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3048 
3049  /*
3050  * Cost as though this were a seqscan, which is pessimistic. We
3051  * effectively imagine the local_conds are being evaluated
3052  * remotely, too.
3053  */
3054  startup_cost = 0;
3055  run_cost = 0;
3056  run_cost += seq_page_cost * foreignrel->pages;
3057 
3058  startup_cost += foreignrel->baserestrictcost.startup;
3059  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3060  run_cost += cpu_per_tuple * foreignrel->tuples;
3061 
3062  /* Add in tlist eval cost for each output row */
3063  startup_cost += foreignrel->reltarget->cost.startup;
3064  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3065  }
3066 
3067  /*
3068  * Without remote estimates, we have no real way to estimate the cost
3069  * of generating sorted output. It could be free if the query plan
3070  * the remote side would have chosen generates properly-sorted output
3071  * anyway, but in most cases it will cost something. Estimate a value
3072  * high enough that we won't pick the sorted path when the ordering
3073  * isn't locally useful, but low enough that we'll err on the side of
3074  * pushing down the ORDER BY clause when it's useful to do so.
3075  */
3076  if (pathkeys != NIL)
3077  {
3078  if (IS_UPPER_REL(foreignrel))
3079  {
3080  Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3081  fpinfo->stage == UPPERREL_GROUP_AGG);
3082  adjust_foreign_grouping_path_cost(root, pathkeys,
3083  retrieved_rows, width,
3084  fpextra->limit_tuples,
3085  &startup_cost, &run_cost);
3086  }
3087  else
3088  {
3089  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3090  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3091  }
3092  }
3093 
3094  total_cost = startup_cost + run_cost;
3095 
3096  /* Adjust the cost estimates if we have LIMIT */
3097  if (fpextra && fpextra->has_limit)
3098  {
3099  adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3100  fpextra->offset_est, fpextra->count_est);
3101  retrieved_rows = rows;
3102  }
3103  }
3104 
3105  /*
3106  * If this includes the final sort step, the given target, which will be
3107  * applied to the resulting path, might have different expressions from
3108  * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3109  * eval costs.
3110  */
3111  if (fpextra && fpextra->has_final_sort &&
3112  fpextra->target != foreignrel->reltarget)
3113  {
3114  QualCost oldcost = foreignrel->reltarget->cost;
3115  QualCost newcost = fpextra->target->cost;
3116 
3117  startup_cost += newcost.startup - oldcost.startup;
3118  total_cost += newcost.startup - oldcost.startup;
3119  total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3120  }
3121 
3122  /*
3123  * Cache the retrieved rows and cost estimates for scans, joins, or
3124  * groupings without any parameterization, pathkeys, or additional
3125  * post-scan/join-processing steps, before adding the costs for
3126  * transferring data from the foreign server. These estimates are useful
3127  * for costing remote joins involving this relation or costing other
3128  * remote operations on this relation such as remote sorts and remote
3129  * LIMIT restrictions, when the costs can not be obtained from the foreign
3130  * server. This function will be called at least once for every foreign
3131  * relation without any parameterization, pathkeys, or additional
3132  * post-scan/join-processing steps.
3133  */
3134  if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3135  {
3136  fpinfo->retrieved_rows = retrieved_rows;
3137  fpinfo->rel_startup_cost = startup_cost;
3138  fpinfo->rel_total_cost = total_cost;
3139  }
3140 
3141  /*
3142  * Add some additional cost factors to account for connection overhead
3143  * (fdw_startup_cost), transferring data across the network
3144  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3145  * (cpu_tuple_cost per retrieved row).
3146  */
3147  startup_cost += fpinfo->fdw_startup_cost;
3148  total_cost += fpinfo->fdw_startup_cost;
3149  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3150  total_cost += cpu_tuple_cost * retrieved_rows;
3151 
3152  /*
3153  * If we have LIMIT, we should prefer performing the restriction remotely
3154  * rather than locally, as the former avoids extra row fetches from the
3155  * remote that the latter might cause. But since the core code doesn't
3156  * account for such fetches when estimating the costs of the local
3157  * restriction (see create_limit_path()), there would be no difference
3158  * between the costs of the local restriction and the costs of the remote
3159  * restriction estimated above if we don't use remote estimates (except
3160  * for the case where the foreignrel is a grouping relation, the given
3161  * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3162  * accounted for in costing the remote restriction). Tweak the costs of
3163  * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3164  * one.
3165  */
3166  if (!fpinfo->use_remote_estimate &&
3167  fpextra && fpextra->has_limit &&
3168  fpextra->limit_tuples > 0 &&
3169  fpextra->limit_tuples < fpinfo->rows)
3170  {
3171  Assert(fpinfo->rows > 0);
3172  total_cost -= (total_cost - startup_cost) * 0.05 *
3173  (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3174  }
3175 
3176  /* Return results. */
3177  *p_rows = rows;
3178  *p_width = width;
3179  *p_startup_cost = startup_cost;
3180  *p_total_cost = total_cost;
3181 }
3182 
3183 /*
3184  * Estimate costs of executing a SQL statement remotely.
3185  * The given "sql" must be an EXPLAIN command.
3186  */
3187 static void
3188 get_remote_estimate(const char *sql, PGconn *conn,
3189  double *rows, int *width,
3190  Cost *startup_cost, Cost *total_cost)
3191 {
3192  PGresult *volatile res = NULL;
3193 
3194  /* PGresult must be released before leaving this function. */
3195  PG_TRY();
3196  {
3197  char *line;
3198  char *p;
3199  int n;
3200 
3201  /*
3202  * Execute EXPLAIN remotely.
3203  */
3204  res = pgfdw_exec_query(conn, sql);
3205  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3206  pgfdw_report_error(ERROR, res, conn, false, sql);
3207 
3208  /*
3209  * Extract cost numbers for topmost plan node. Note we search for a
3210  * left paren from the end of the line to avoid being confused by
3211  * other uses of parentheses.
3212  */
3213  line = PQgetvalue(res, 0, 0);
3214  p = strrchr(line, '(');
3215  if (p == NULL)
3216  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3217  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3218  startup_cost, total_cost, rows, width);
3219  if (n != 4)
3220  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3221  }
3222  PG_FINALLY();
3223  {
3224  if (res)
3225  PQclear(res);
3226  }
3227  PG_END_TRY();
3228 }
3229 
3230 /*
3231  * Adjust the cost estimates of a foreign grouping path to include the cost of
3232  * generating properly-sorted output.
3233  */
3234 static void
3236  List *pathkeys,
3237  double retrieved_rows,
3238  double width,
3239  double limit_tuples,
3240  Cost *p_startup_cost,
3241  Cost *p_run_cost)
3242 {
3243  /*
3244  * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3245  * side is unlikely to generate properly-sorted output, so it would need
3246  * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3247  * if the GROUP BY clause is sort-able but isn't a superset of the given
3248  * pathkeys, adjust the costs with that function. Otherwise, adjust the
3249  * costs by applying the same heuristic as for the scan or join case.
3250  */
3251  if (!grouping_is_sortable(root->parse->groupClause) ||
3252  !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3253  {
3254  Path sort_path; /* dummy for result of cost_sort */
3255 
3256  cost_sort(&sort_path,
3257  root,
3258  pathkeys,
3259  *p_startup_cost + *p_run_cost,
3260  retrieved_rows,
3261  width,
3262  0.0,
3263  work_mem,
3264  limit_tuples);
3265 
3266  *p_startup_cost = sort_path.startup_cost;
3267  *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3268  }
3269  else
3270  {
3271  /*
3272  * The default extra cost seems too large for foreign-grouping cases;
3273  * add 1/4th of that default.
3274  */
3275  double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3276  - 1.0) * 0.25;
3277 
3278  *p_startup_cost *= sort_multiplier;
3279  *p_run_cost *= sort_multiplier;
3280  }
3281 }
3282 
3283 /*
3284  * Detect whether we want to process an EquivalenceClass member.
3285  *
3286  * This is a callback for use by generate_implied_equalities_for_column.
3287  */
3288 static bool
3291  void *arg)
3292 {
3294  Expr *expr = em->em_expr;
3295 
3296  /*
3297  * If we've identified what we're processing in the current scan, we only
3298  * want to match that expression.
3299  */
3300  if (state->current != NULL)
3301  return equal(expr, state->current);
3302 
3303  /*
3304  * Otherwise, ignore anything we've already processed.
3305  */
3306  if (list_member(state->already_used, expr))
3307  return false;
3308 
3309  /* This is the new target to process. */
3310  state->current = expr;
3311  return true;
3312 }
3313 
3314 /*
3315  * Create cursor for node's query with current parameter values.
3316  */
3317 static void
3319 {
3320  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3321  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3322  int numParams = fsstate->numParams;
3323  const char **values = fsstate->param_values;
3324  PGconn *conn = fsstate->conn;
3326  PGresult *res;
3327 
3328  /*
3329  * Construct array of query parameter values in text format. We do the
3330  * conversions in the short-lived per-tuple context, so as not to cause a
3331  * memory leak over repeated scans.
3332  */
3333  if (numParams > 0)
3334  {
3335  MemoryContext oldcontext;
3336 
3337  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3338 
3339  process_query_params(econtext,
3340  fsstate->param_flinfo,
3341  fsstate->param_exprs,
3342  values);
3343 
3344  MemoryContextSwitchTo(oldcontext);
3345  }
3346 
3347  /* Construct the DECLARE CURSOR command */
3348  initStringInfo(&buf);
3349  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3350  fsstate->cursor_number, fsstate->query);
3351 
3352  /*
3353  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3354  * to infer types for all parameters. Since we explicitly cast every
3355  * parameter (see deparse.c), the "inference" is trivial and will produce
3356  * the desired result. This allows us to avoid assuming that the remote
3357  * server has the same OIDs we do for the parameters' types.
3358  */
3359  if (!PQsendQueryParams(conn, buf.data, numParams,
3360  NULL, values, NULL, NULL, 0))
3361  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3362 
3363  /*
3364  * Get the result, and check for success.
3365  *
3366  * We don't use a PG_TRY block here, so be careful not to throw error
3367  * without releasing the PGresult.
3368  */
3369  res = pgfdw_get_result(conn, buf.data);
3370  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3371  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3372  PQclear(res);
3373 
3374  /* Mark the cursor as created, and show no tuples have been retrieved */
3375  fsstate->cursor_exists = true;
3376  fsstate->tuples = NULL;
3377  fsstate->num_tuples = 0;
3378  fsstate->next_tuple = 0;
3379  fsstate->fetch_ct_2 = 0;
3380  fsstate->eof_reached = false;
3381 
3382  /* Clean up */
3383  pfree(buf.data);
3384 }
3385 
3386 /*
3387  * Fetch some more rows from the node's cursor.
3388  */
3389 static void
3391 {
3392  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3393  PGresult *volatile res = NULL;
3394  MemoryContext oldcontext;
3395 
3396  /*
3397  * We'll store the tuples in the batch_cxt. First, flush the previous
3398  * batch.
3399  */
3400  fsstate->tuples = NULL;
3401  MemoryContextReset(fsstate->batch_cxt);
3402  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3403 
3404  /* PGresult must be released before leaving this function. */
3405  PG_TRY();
3406  {
3407  PGconn *conn = fsstate->conn;
3408  char sql[64];
3409  int numrows;
3410  int i;
3411 
3412  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3413  fsstate->fetch_size, fsstate->cursor_number);
3414 
3415  res = pgfdw_exec_query(conn, sql);
3416  /* On error, report the original query, not the FETCH. */
3417  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3418  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3419 
3420  /* Convert the data into HeapTuples */
3421  numrows = PQntuples(res);
3422  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3423  fsstate->num_tuples = numrows;
3424  fsstate->next_tuple = 0;
3425 
3426  for (i = 0; i < numrows; i++)
3427  {
3428  Assert(IsA(node->ss.ps.plan, ForeignScan));
3429 
3430  fsstate->tuples[i] =
3432  fsstate->rel,
3433  fsstate->attinmeta,
3434  fsstate->retrieved_attrs,
3435  node,
3436  fsstate->temp_cxt);
3437  }
3438 
3439  /* Update fetch_ct_2 */
3440  if (fsstate->fetch_ct_2 < 2)
3441  fsstate->fetch_ct_2++;
3442 
3443  /* Must be EOF if we didn't get as many tuples as we asked for. */
3444  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3445  }
3446  PG_FINALLY();
3447  {
3448  if (res)
3449  PQclear(res);
3450  }
3451  PG_END_TRY();
3452 
3453  MemoryContextSwitchTo(oldcontext);
3454 }
3455 
3456 /*
3457  * Force assorted GUC parameters to settings that ensure that we'll output
3458  * data values in a form that is unambiguous to the remote server.
3459  *
3460  * This is rather expensive and annoying to do once per row, but there's
3461  * little choice if we want to be sure values are transmitted accurately;
3462  * we can't leave the settings in place between rows for fear of affecting
3463  * user-visible computations.
3464  *
3465  * We use the equivalent of a function SET option to allow the settings to
3466  * persist only until the caller calls reset_transmission_modes(). If an
3467  * error is thrown in between, guc.c will take care of undoing the settings.
3468  *
3469  * The return value is the nestlevel that must be passed to
3470  * reset_transmission_modes() to undo things.
3471  */
3472 int
3474 {
3475  int nestlevel = NewGUCNestLevel();
3476 
3477  /*
3478  * The values set here should match what pg_dump does. See also
3479  * configure_remote_session in connection.c.
3480  */
3481  if (DateStyle != USE_ISO_DATES)
3482  (void) set_config_option("datestyle", "ISO",
3484  GUC_ACTION_SAVE, true, 0, false);
3486  (void) set_config_option("intervalstyle", "postgres",
3488  GUC_ACTION_SAVE, true, 0, false);
3489  if (extra_float_digits < 3)
3490  (void) set_config_option("extra_float_digits", "3",
3492  GUC_ACTION_SAVE, true, 0, false);
3493 
3494  return nestlevel;
3495 }
3496 
3497 /*
3498  * Undo the effects of set_transmission_modes().
3499  */
3500 void
3502 {
3503  AtEOXact_GUC(true, nestlevel);
3504 }
3505 
3506 /*
3507  * Utility routine to close a cursor.
3508  */
3509 static void
3511 {
3512  char sql[64];
3513  PGresult *res;
3514 
3515  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3516 
3517  /*
3518  * We don't use a PG_TRY block here, so be careful not to throw error
3519  * without releasing the PGresult.
3520  */
3521  res = pgfdw_exec_query(conn, sql);
3522  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3523  pgfdw_report_error(ERROR, res, conn, true, sql);
3524  PQclear(res);
3525 }
3526 
3527 /*
3528  * create_foreign_modify
3529  * Construct an execution state of a foreign insert/update/delete
3530  * operation
3531  */
3532 static PgFdwModifyState *
3534  RangeTblEntry *rte,
3535  ResultRelInfo *resultRelInfo,
3536  CmdType operation,
3537  Plan *subplan,
3538  char *query,
3539  List *target_attrs,
3540  bool has_returning,
3542 {
3543  PgFdwModifyState *fmstate;
3544  Relation rel = resultRelInfo->ri_RelationDesc;
3546  Oid userid;
3547  ForeignTable *table;
3548  UserMapping *user;
3549  AttrNumber n_params;
3550  Oid typefnoid;
3551  bool isvarlena;
3552  ListCell *lc;
3553 
3554  /* Begin constructing PgFdwModifyState. */
3555  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3556  fmstate->rel = rel;
3557 
3558  /*
3559  * Identify which user to do the remote access as. This should match what
3560  * ExecCheckRTEPerms() does.
3561  */
3562  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3563 
3564  /* Get info about foreign table. */
3565  table = GetForeignTable(RelationGetRelid(rel));
3566  user = GetUserMapping(userid, table->serverid);
3567 
3568  /* Open connection; report that we'll create a prepared statement. */
3569  fmstate->conn = GetConnection(user, true);
3570  fmstate->p_name = NULL; /* prepared statement not made yet */
3571 
3572  /* Set up remote query information. */
3573  fmstate->query = query;
3574  fmstate->target_attrs = target_attrs;
3575  fmstate->has_returning = has_returning;
3576  fmstate->retrieved_attrs = retrieved_attrs;
3577 
3578  /* Create context for per-tuple temp workspace. */
3579  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3580  "postgres_fdw temporary data",
3582 
3583  /* Prepare for input conversion of RETURNING results. */
3584  if (fmstate->has_returning)
3585  fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3586 
3587  /* Prepare for output conversion of parameters used in prepared stmt. */
3588  n_params = list_length(fmstate->target_attrs) + 1;
3589  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3590  fmstate->p_nums = 0;
3591 
3592  if (operation == CMD_UPDATE || operation == CMD_DELETE)
3593  {
3594  Assert(subplan != NULL);
3595 
3596  /* Find the ctid resjunk column in the subplan's result */
3598  "ctid");
3599  if (!AttributeNumberIsValid(fmstate->ctidAttno))
3600  elog(ERROR, "could not find junk ctid column");
3601 
3602  /* First transmittable parameter will be ctid */
3603  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3604  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3605  fmstate->p_nums++;
3606  }
3607 
3608  if (operation == CMD_INSERT || operation == CMD_UPDATE)
3609  {
3610  /* Set up for remaining transmittable parameters */
3611  foreach(lc, fmstate->target_attrs)
3612  {
3613  int attnum = lfirst_int(lc);
3614  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3615 
3616  Assert(!attr->attisdropped);
3617 
3618  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3619  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3620  fmstate->p_nums++;
3621  }
3622  }
3623 
3624  Assert(fmstate->p_nums <= n_params);
3625 
3626  /* Initialize auxiliary state */
3627  fmstate->aux_fmstate = NULL;
3628 
3629  return fmstate;
3630 }
3631 
3632 /*
3633  * execute_foreign_modify
3634  * Perform foreign-table modification as required, and fetch RETURNING
3635  * result if any. (This is the shared guts of postgresExecForeignInsert,
3636  * postgresExecForeignUpdate, and postgresExecForeignDelete.)
3637  */
3638 static TupleTableSlot *
3640  ResultRelInfo *resultRelInfo,
3641  CmdType operation,
3642  TupleTableSlot *slot,
3643  TupleTableSlot *planSlot)
3644 {
3645  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
3646  ItemPointer ctid = NULL;
3647  const char **p_values;
3648  PGresult *res;
3649  int n_rows;
3650 
3651  /* The operation should be INSERT, UPDATE, or DELETE */
3652  Assert(operation == CMD_INSERT ||
3653  operation == CMD_UPDATE ||
3654  operation == CMD_DELETE);
3655 
3656  /* Set up the prepared statement on the remote server, if we didn't yet */
3657  if (!fmstate->p_name)
3658  prepare_foreign_modify(fmstate);
3659 
3660  /*
3661  * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
3662  */
3663  if (operation == CMD_UPDATE || operation == CMD_DELETE)
3664  {
3665  Datum datum;
3666  bool isNull;
3667 
3668  datum = ExecGetJunkAttribute(planSlot,
3669  fmstate->ctidAttno,
3670  &isNull);
3671  /* shouldn't ever get a null result... */
3672  if (isNull)
3673  elog(ERROR, "ctid is NULL");
3674  ctid = (ItemPointer) DatumGetPointer(datum);
3675  }
3676 
3677  /* Convert parameters needed by prepared statement to text form */
3678  p_values = convert_prep_stmt_params(fmstate, ctid, slot);
3679 
3680  /*
3681  * Execute the prepared statement.
3682  */
3683  if (!PQsendQueryPrepared(fmstate->conn,
3684  fmstate->p_name,
3685  fmstate->p_nums,
3686  p_values,
3687  NULL,
3688  NULL,
3689  0))
3690  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3691 
3692  /*
3693  * Get the result, and check for success.
3694  *
3695  * We don't use a PG_TRY block here, so be careful not to throw error
3696  * without releasing the PGresult.
3697  */
3698  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3699  if (PQresultStatus(res) !=
3701  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3702 
3703  /* Check number of rows affected, and fetch RETURNING tuple if any */
3704  if (fmstate->has_returning)
3705  {
3706  n_rows = PQntuples(res);
3707  if (n_rows > 0)
3708  store_returning_result(fmstate, slot, res);
3709  }
3710  else
3711  n_rows = atoi(PQcmdTuples(res));
3712 
3713  /* And clean up */
3714  PQclear(res);
3715 
3716  MemoryContextReset(fmstate->temp_cxt);
3717 
3718  /*
3719  * Return NULL if nothing was inserted/updated/deleted on the remote end
3720  */
3721  return (n_rows > 0) ? slot : NULL;
3722 }
3723 
3724 /*
3725  * prepare_foreign_modify
3726  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3727  */
3728 static void
3730 {
3731  char prep_name[NAMEDATALEN];
3732  char *p_name;
3733  PGresult *res;
3734 
3735  /* Construct name we'll use for the prepared statement. */
3736  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3737  GetPrepStmtNumber(fmstate->conn));
3738  p_name = pstrdup(prep_name);
3739 
3740  /*
3741  * We intentionally do not specify parameter types here, but leave the
3742  * remote server to derive them by default. This avoids possible problems
3743  * with the remote server using different type OIDs than we do. All of
3744  * the prepared statements we use in this module are simple enough that
3745  * the remote server will make the right choices.
3746  */
3747  if (!PQsendPrepare(fmstate->conn,
3748  p_name,
3749  fmstate->query,
3750  0,
3751  NULL))
3752  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3753 
3754  /*
3755  * Get the result, and check for success.
3756  *
3757  * We don't use a PG_TRY block here, so be careful not to throw error
3758  * without releasing the PGresult.
3759  */
3760  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3761  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3762  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3763  PQclear(res);
3764 
3765  /* This action shows that the prepare has been done. */
3766  fmstate->p_name = p_name;
3767 }
3768 
3769 /*
3770  * convert_prep_stmt_params
3771  * Create array of text strings representing parameter values
3772  *
3773  * tupleid is ctid to send, or NULL if none
3774  * slot is slot to get remaining parameters from, or NULL if none
3775  *
3776  * Data is constructed in temp_cxt; caller should reset that after use.
3777  */
3778 static const char **
3780  ItemPointer tupleid,
3781  TupleTableSlot *slot)
3782 {
3783  const char **p_values;
3784  int pindex = 0;
3785  MemoryContext oldcontext;
3786 
3787  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3788 
3789  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3790 
3791  /* 1st parameter should be ctid, if it's in use */
3792  if (tupleid != NULL)
3793  {
3794  /* don't need set_transmission_modes for TID output */
3795  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3796  PointerGetDatum(tupleid));
3797  pindex++;
3798  }
3799 
3800  /* get following parameters from slot */
3801  if (slot != NULL && fmstate->target_attrs != NIL)
3802  {
3803  int nestlevel;
3804  ListCell *lc;
3805 
3806  nestlevel = set_transmission_modes();
3807 
3808  foreach(lc, fmstate->target_attrs)
3809  {
3810  int attnum = lfirst_int(lc);
3811  Datum value;
3812  bool isnull;
3813 
3814  value = slot_getattr(slot, attnum, &isnull);
3815  if (isnull)
3816  p_values[pindex] = NULL;
3817  else
3818  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3819  value);
3820  pindex++;
3821  }
3822 
3823  reset_transmission_modes(nestlevel);
3824  }
3825 
3826  Assert(pindex == fmstate->p_nums);
3827 
3828  MemoryContextSwitchTo(oldcontext);
3829 
3830  return p_values;
3831 }
3832 
3833 /*
3834  * store_returning_result
3835  * Store the result of a RETURNING clause
3836  *
3837  * On error, be sure to release the PGresult on the way out. Callers do not
3838  * have PG_TRY blocks to ensure this happens.
3839  */
3840 static void
3842  TupleTableSlot *slot, PGresult *res)
3843 {
3844  PG_TRY();
3845  {
3846  HeapTuple newtup;
3847 
3848  newtup = make_tuple_from_result_row(res, 0,
3849  fmstate->rel,
3850  fmstate->attinmeta,
3851  fmstate->retrieved_attrs,
3852  NULL,
3853  fmstate->temp_cxt);
3854 
3855  /*
3856  * The returning slot will not necessarily be suitable to store
3857  * heaptuples directly, so allow for conversion.
3858  */
3859  ExecForceStoreHeapTuple(newtup, slot, true);
3860  }
3861  PG_CATCH();
3862  {
3863  if (res)
3864  PQclear(res);
3865  PG_RE_THROW();
3866  }
3867  PG_END_TRY();
3868 }
3869 
3870 /*
3871  * finish_foreign_modify
3872  * Release resources for a foreign insert/update/delete operation
3873  */
3874 static void
3876 {
3877  Assert(fmstate != NULL);
3878 
3879  /* If we created a prepared statement, destroy it */
3880  if (fmstate->p_name)
3881  {
3882  char sql[64];
3883  PGresult *res;
3884 
3885  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3886 
3887  /*
3888  * We don't use a PG_TRY block here, so be careful not to throw error
3889  * without releasing the PGresult.
3890  */
3891  res = pgfdw_exec_query(fmstate->conn, sql);
3892  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3893  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3894  PQclear(res);
3895  fmstate->p_name = NULL;
3896  }
3897 
3898  /* Release remote connection */
3899  ReleaseConnection(fmstate->conn);
3900  fmstate->conn = NULL;
3901 }
3902 
3903 /*
3904  * build_remote_returning
3905  * Build a RETURNING targetlist of a remote query for performing an
3906  * UPDATE/DELETE .. RETURNING on a join directly
3907  */
3908 static List *
3909 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3910 {
3911  bool have_wholerow = false;
3912  List *tlist = NIL;
3913  List *vars;
3914  ListCell *lc;
3915 
3916  Assert(returningList);
3917 
3918  vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3919 
3920  /*
3921  * If there's a whole-row reference to the target relation, then we'll
3922  * need all the columns of the relation.
3923  */
3924  foreach(lc, vars)
3925  {
3926  Var *var = (Var *) lfirst(lc);
3927 
3928  if (IsA(var, Var) &&
3929  var->varno == rtindex &&
3930  var->varattno == InvalidAttrNumber)
3931  {
3932  have_wholerow = true;
3933  break;
3934  }
3935  }
3936 
3937  if (have_wholerow)
3938  {
3940  int i;
3941 
3942  for (i = 1; i <= tupdesc->natts; i++)
3943  {
3944  Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3945  Var *var;
3946 
3947  /* Ignore dropped attributes. */
3948  if (attr->attisdropped)
3949  continue;
3950 
3951  var = makeVar(rtindex,
3952  i,
3953  attr->atttypid,
3954  attr->atttypmod,
3955  attr->attcollation,
3956  0);
3957 
3958  tlist = lappend(tlist,
3959  makeTargetEntry((Expr *) var,
3960  list_length(tlist) + 1,
3961  NULL,
3962  false));
3963  }
3964  }
3965 
3966  /* Now add any remaining columns to tlist. */
3967  foreach(lc, vars)
3968  {
3969  Var *var = (Var *) lfirst(lc);
3970 
3971  /*
3972  * No need for whole-row references to the target relation. We don't
3973  * need system columns other than ctid and oid either, since those are
3974  * set locally.
3975  */
3976  if (IsA(var, Var) &&
3977  var->varno == rtindex &&
3978  var->varattno <= InvalidAttrNumber &&
3980  continue; /* don't need it */
3981 
3982  if (tlist_member((Expr *) var, tlist))
3983  continue; /* already got it */
3984 
3985  tlist = lappend(tlist,
3986  makeTargetEntry((Expr *) var,
3987  list_length(tlist) + 1,
3988  NULL,
3989  false));
3990  }
3991 
3992  list_free(vars);
3993 
3994  return tlist;
3995 }
3996 
3997 /*
3998  * rebuild_fdw_scan_tlist
3999  * Build new fdw_scan_tlist of given foreign-scan plan node from given
4000  * tlist
4001  *
4002  * There might be columns that the fdw_scan_tlist of the given foreign-scan
4003  * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4004  * have contained resjunk columns such as 'ctid' of the target relation and
4005  * 'wholerow' of non-target relations, but the tlist might not contain them,
4006  * for example. So, adjust the tlist so it contains all the columns specified
4007  * in the fdw_scan_tlist; else setrefs.c will get confused.
4008  */
4009 static void
4011 {
4012  List *new_tlist = tlist;
4013  List *old_tlist = fscan->fdw_scan_tlist;
4014  ListCell *lc;
4015 
4016  foreach(lc, old_tlist)
4017  {
4018  TargetEntry *tle = (TargetEntry *) lfirst(lc);
4019 
4020  if (tlist_member(tle->expr, new_tlist))
4021  continue; /* already got it */
4022 
4023  new_tlist = lappend(new_tlist,
4024  makeTargetEntry(tle->expr,
4025  list_length(new_tlist) + 1,
4026  NULL,
4027  false));
4028  }
4029  fscan->fdw_scan_tlist = new_tlist;
4030 }
4031 
4032 /*
4033  * Execute a direct UPDATE/DELETE statement.
4034  */
4035 static void
4037 {
4039  ExprContext *econtext = node->ss.ps.ps_ExprContext;
4040  int numParams = dmstate->numParams;
4041  const char **values = dmstate->param_values;
4042 
4043  /*
4044  * Construct array of query parameter values in text format.
4045  */
4046  if (numParams > 0)
4047  process_query_params(econtext,
4048  dmstate->param_flinfo,
4049  dmstate->param_exprs,
4050  values);
4051 
4052  /*
4053  * Notice that we pass NULL for paramTypes, thus forcing the remote server
4054  * to infer types for all parameters. Since we explicitly cast every
4055  * parameter (see deparse.c), the "inference" is trivial and will produce
4056  * the desired result. This allows us to avoid assuming that the remote
4057  * server has the same OIDs we do for the parameters' types.
4058  */
4059  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4060  NULL, values, NULL, NULL, 0))
4061  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4062 
4063  /*
4064  * Get the result, and check for success.
4065  *
4066  * We don't use a PG_TRY block here, so be careful not to throw error
4067  * without releasing the PGresult.
4068  */
4069  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4070  if (PQresultStatus(dmstate->result) !=
4072  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4073  dmstate->query);
4074 
4075  /* Get the number of rows affected. */
4076  if (dmstate->has_returning)
4077  dmstate->num_tuples = PQntuples(dmstate->result);
4078  else
4079  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4080 }
4081 
4082 /*
4083  * Get the result of a RETURNING clause.
4084  */
4085 static TupleTableSlot *
4087 {
4089  EState *estate = node->ss.ps.state;
4090  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
4091  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4092  TupleTableSlot *resultSlot;
4093 
4094  Assert(resultRelInfo->ri_projectReturning);
4095 
4096  /* If we didn't get any tuples, must be end of data. */
4097  if (dmstate->next_tuple >= dmstate->num_tuples)
4098  return ExecClearTuple(slot);
4099 
4100  /* Increment the command es_processed count if necessary. */
4101  if (dmstate->set_processed)
4102  estate->es_processed += 1;
4103 
4104  /*
4105  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4106  * tuple. (has_returning is false when the local query is of the form
4107  * "UPDATE/DELETE .. RETURNING 1" for example.)
4108  */
4109  if (!dmstate->has_returning)
4110  {
4111  ExecStoreAllNullTuple(slot);
4112  resultSlot = slot;
4113  }
4114  else
4115  {
4116  /*
4117  * On error, be sure to release the PGresult on the way out. Callers
4118  * do not have PG_TRY blocks to ensure this happens.
4119  */
4120  PG_TRY();
4121  {
4122  HeapTuple newtup;
4123 
4124  newtup = make_tuple_from_result_row(dmstate->result,
4125  dmstate->next_tuple,
4126  dmstate->rel,
4127  dmstate->attinmeta,
4128  dmstate->retrieved_attrs,
4129  node,
4130  dmstate->temp_cxt);
4131  ExecStoreHeapTuple(newtup, slot, false);
4132  }
4133  PG_CATCH();
4134  {
4135  if (dmstate->result)
4136  PQclear(dmstate->result);
4137  PG_RE_THROW();
4138  }
4139  PG_END_TRY();
4140 
4141  /* Get the updated/deleted tuple. */
4142  if (dmstate->rel)
4143  resultSlot = slot;
4144  else
4145  resultSlot = apply_returning_filter(dmstate, slot, estate);
4146  }
4147  dmstate->next_tuple++;
4148 
4149  /* Make slot available for evaluation of the local query RETURNING list. */
4150  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4151  resultSlot;
4152 
4153  return slot;
4154 }
4155 
4156 /*
4157  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4158  */
4159 static void
4161  List *fdw_scan_tlist,
4162  Index rtindex)
4163 {
4164  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4165  ListCell *lc;
4166  int i;
4167 
4168  /*
4169  * Calculate the mapping between the fdw_scan_tlist's entries and the
4170  * result tuple's attributes.
4171  *
4172  * The "map" is an array of indexes of the result tuple's attributes in
4173  * fdw_scan_tlist, i.e., one entry for every attribute of the result
4174  * tuple. We store zero for any attributes that don't have the
4175  * corresponding entries in that list, marking that a NULL is needed in
4176  * the result tuple.
4177  *
4178  * Also get the indexes of the entries for ctid and oid if any.
4179  */
4180  dmstate->attnoMap = (AttrNumber *)
4181  palloc0(resultTupType->natts * sizeof(AttrNumber));
4182 
4183  dmstate->ctidAttno = dmstate->oidAttno = 0;
4184 
4185  i = 1;
4186  dmstate->hasSystemCols = false;
4187  foreach(lc, fdw_scan_tlist)
4188  {
4189  TargetEntry *tle = (TargetEntry *) lfirst(lc);
4190  Var *var = (Var *) tle->expr;
4191 
4192  Assert(IsA(var, Var));
4193 
4194  /*
4195  * If the Var is a column of the target relation to be retrieved from
4196  * the foreign server, get the index of the entry.
4197  */
4198  if (var->varno == rtindex &&
4199  list_member_int(dmstate->retrieved_attrs, i))
4200  {
4201  int attrno = var->varattno;
4202 
4203  if (attrno < 0)
4204  {
4205  /*
4206  * We don't retrieve system columns other than ctid and oid.
4207  */
4208  if (attrno == SelfItemPointerAttributeNumber)
4209  dmstate->ctidAttno = i;
4210  else
4211  Assert(false);
4212  dmstate->hasSystemCols = true;
4213  }
4214  else
4215  {
4216  /*
4217  * We don't retrieve whole-row references to the target
4218  * relation either.
4219  */
4220  Assert(attrno > 0);
4221 
4222  dmstate->attnoMap[attrno - 1] = i;
4223  }
4224  }
4225  i++;
4226  }
4227 }
4228 
4229 /*
4230  * Extract and return an updated/deleted tuple from a scan tuple.
4231  */
4232 static TupleTableSlot *
4234  TupleTableSlot *slot,
4235  EState *estate)
4236 {
4237  ResultRelInfo *relInfo = estate->es_result_relation_info;
4238  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4239  TupleTableSlot *resultSlot;
4240  Datum *values;
4241  bool *isnull;
4242  Datum *old_values;
4243  bool *old_isnull;
4244  int i;
4245 
4246  /*
4247  * Use the return tuple slot as a place to store the result tuple.
4248  */
4249  resultSlot = ExecGetReturningSlot(estate, relInfo);
4250 
4251  /*
4252  * Extract all the values of the scan tuple.
4253  */
4254  slot_getallattrs(slot);
4255  old_values = slot->tts_values;
4256  old_isnull = slot->tts_isnull;
4257 
4258  /*
4259  * Prepare to build the result tuple.
4260  */
4261  ExecClearTuple(resultSlot);
4262  values = resultSlot->tts_values;
4263  isnull = resultSlot->tts_isnull;
4264 
4265  /*
4266  * Transpose data into proper fields of the result tuple.
4267  */
4268  for (i = 0; i < resultTupType->natts; i++)
4269  {
4270  int j = dmstate->attnoMap[i];
4271 
4272  if (j == 0)
4273  {
4274  values[i] = (Datum) 0;
4275  isnull[i] = true;
4276  }
4277  else
4278  {
4279  values[i] = old_values[j - 1];
4280  isnull[i] = old_isnull[j - 1];
4281  }
4282  }
4283 
4284  /*
4285  * Build the virtual tuple.
4286  */
4287  ExecStoreVirtualTuple(resultSlot);
4288 
4289  /*
4290  * If we have any system columns to return, materialize a heap tuple in
4291  * the slot from column values set above and install system columns in
4292  * that tuple.
4293  */
4294  if (dmstate->hasSystemCols)
4295  {
4296  HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4297 
4298  /* ctid */
4299  if (dmstate->ctidAttno)
4300  {
4301  ItemPointer ctid = NULL;
4302 
4303  ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4304  resultTup->t_self = *ctid;
4305  }
4306 
4307  /*
4308  * And remaining columns
4309  *
4310  * Note: since we currently don't allow the target relation to appear
4311  * on the nullable side of an outer join, any system columns wouldn't
4312  * go to NULL.
4313  *
4314  * Note: no need to care about tableoid here because it will be
4315  * initialized in ExecProcessReturning().
4316  */
4320  }
4321 
4322  /*
4323  * And return the result tuple.
4324  */
4325  return resultSlot;
4326 }
4327 
4328 /*
4329  * Prepare for processing of parameters used in remote query.
4330  */
4331 static void
4333  List *fdw_exprs,
4334  int numParams,
4336  List **param_exprs,
4337  const char ***param_values)
4338 {
4339  int i;
4340  ListCell *lc;
4341 
4342  Assert(numParams > 0);
4343 
4344  /* Prepare for output conversion of parameters used in remote query. */
4345  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4346 
4347  i = 0;
4348  foreach(lc, fdw_exprs)
4349  {
4350  Node *param_expr = (Node *) lfirst(lc);
4351  Oid typefnoid;
4352  bool isvarlena;
4353 
4354  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4355  fmgr_info(typefnoid, &(*param_flinfo)[i]);
4356  i++;
4357  }
4358 
4359  /*
4360  * Prepare remote-parameter expressions for evaluation. (Note: in
4361  * practice, we expect that all these expressions will be just Params, so
4362  * we could possibly do something more efficient than using the full
4363  * expression-eval machinery for this. But probably there would be little
4364  * benefit, and it'd require postgres_fdw to know more than is desirable
4365  * about Param evaluation.)
4366  */
4367  *param_exprs = ExecInitExprList(fdw_exprs, node);
4368 
4369  /* Allocate buffer for text form of query parameters. */
4370  *param_values = (const char **) palloc0(numParams * sizeof(char *));
4371 }
4372 
4373 /*
4374  * Construct array of query parameter values in text format.
4375  */
4376 static void
4379  List *param_exprs,
4380  const char **param_values)
4381 {
4382  int nestlevel;
4383  int i;
4384  ListCell *lc;
4385 
4386  nestlevel = set_transmission_modes();
4387 
4388  i = 0;
4389  foreach(lc, param_exprs)
4390  {
4391  ExprState *expr_state = (ExprState *) lfirst(lc);
4392  Datum expr_value;
4393  bool isNull;
4394 
4395  /* Evaluate the parameter expression */
4396  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4397 
4398  /*
4399  * Get string representation of each parameter value by invoking
4400  * type-specific output function, unless the value is null.
4401  */
4402  if (isNull)
4403  param_values[i] = NULL;
4404  else
4405  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4406 
4407  i++;
4408  }
4409 
4410  reset_transmission_modes(nestlevel);
4411 }
4412 
4413 /*
4414  * postgresAnalyzeForeignTable
4415  * Test whether analyzing this foreign table is supported
4416  */
4417 static bool
4419  AcquireSampleRowsFunc *func,
4420  BlockNumber *totalpages)
4421 {
4422  ForeignTable *table;
4423  UserMapping *user;
4424  PGconn *conn;
4425  StringInfoData sql;
4426  PGresult *volatile res = NULL;
4427 
4428  /* Return the row-analysis function pointer */
4430 
4431  /*
4432  * Now we have to get the number of pages. It's annoying that the ANALYZE
4433  * API requires us to return that now, because it forces some duplication
4434  * of effort between this routine and postgresAcquireSampleRowsFunc. But
4435  * it's probably not worth redefining that API at this point.
4436  */
4437 
4438  /*
4439  * Get the connection to use. We do the remote access as the table's
4440  * owner, even if the ANALYZE was started by some other user.
4441  */
4442  table = GetForeignTable(RelationGetRelid(relation));
4443  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4444  conn = GetConnection(user, false);
4445 
4446  /*
4447  * Construct command to get page count for relation.
4448  */
4449  initStringInfo(&sql);
4450  deparseAnalyzeSizeSql(&sql, relation);
4451 
4452  /* In what follows, do not risk leaking any PGresults. */
4453  PG_TRY();
4454  {
4455  res = pgfdw_exec_query(conn, sql.data);
4456  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4457  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4458 
4459  if (PQntuples(res) != 1 || PQnfields(res) != 1)
4460  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4461  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4462  }
4463  PG_FINALLY();
4464  {
4465  if (res)
4466  PQclear(res);
4467  }
4468  PG_END_TRY();
4469 
4470  ReleaseConnection(conn);
4471 
4472  return true;
4473 }
4474 
4475 /*
4476  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4477  *
4478  * We fetch the whole table from the remote side and pick out some sample rows.
4479  *
4480  * Selected rows are returned in the caller-allocated array rows[],
4481  * which must have at least targrows entries.
4482  * The actual number of rows selected is returned as the function result.
4483  * We also count the total number of rows in the table and return it into
4484  * *totalrows. Note that *totaldeadrows is always set to 0.
4485  *
4486  * Note that the returned list of rows is not always in order by physical
4487  * position in the table. Therefore, correlation estimates derived later
4488  * may be meaningless, but it's OK because we don't use the estimates
4489  * currently (the planner only pays attention to correlation for indexscans).
4490  */
4491 static int
4493  HeapTuple *rows, int targrows,
4494  double *totalrows,
4495  double *totaldeadrows)
4496 {
4497  PgFdwAnalyzeState astate;
4498  ForeignTable *table;
4499  ForeignServer *server;
4500  UserMapping *user;
4501  PGconn *conn;
4502  unsigned int cursor_number;
4503  StringInfoData sql;
4504  PGresult *volatile res = NULL;
4505 
4506  /* Initialize workspace state */
4507  astate.rel = relation;
4509 
4510  astate.rows = rows;
4511  astate.targrows = targrows;
4512  astate.numrows = 0;
4513  astate.samplerows = 0;
4514  astate.rowstoskip = -1; /* -1 means not set yet */
4515  reservoir_init_selection_state(&astate.rstate, targrows);
4516 
4517  /* Remember ANALYZE context, and create a per-tuple temp context */
4518  astate.anl_cxt = CurrentMemoryContext;
4520  "postgres_fdw temporary data",
4522 
4523  /*
4524  * Get the connection to use. We do the remote access as the table's
4525  * owner, even if the ANALYZE was started by some other user.
4526  */
4527  table = GetForeignTable(RelationGetRelid(relation));
4528  server = GetForeignServer(table->serverid);
4529  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4530  conn = GetConnection(user, false);
4531 
4532  /*
4533  * Construct cursor that retrieves whole rows from remote.
4534  */
4535  cursor_number = GetCursorNumber(conn);
4536  initStringInfo(&sql);
4537  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4538  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4539 
4540  /* In what follows, do not risk leaking any PGresults. */
4541  PG_TRY();
4542  {
4543  char fetch_sql[64];
4544  int fetch_size;
4545  ListCell *lc;
4546 
4547  res = pgfdw_exec_query(conn, sql.data);
4548  if (PQresultStatus(res) != PGRES_COMMAND_OK)
4549  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4550  PQclear(res);
4551  res = NULL;
4552 
4553  /*
4554  * Determine the fetch size. The default is arbitrary, but shouldn't
4555  * be enormous.
4556  */
4557  fetch_size = 100;
4558  foreach(lc, server->options)
4559  {
4560  DefElem *def = (DefElem *) lfirst(lc);
4561 
4562  if (strcmp(def->defname, "fetch_size") == 0)
4563  {
4564  fetch_size = strtol(defGetString(def), NULL, 10);
4565  break;
4566  }
4567  }
4568  foreach(lc, table->options)
4569  {
4570  DefElem *def = (DefElem *) lfirst(lc);
4571 
4572  if (strcmp(def->defname, "fetch_size") == 0)
4573  {
4574  fetch_size = strtol(defGetString(def), NULL, 10);
4575  break;
4576  }
4577  }
4578 
4579  /* Construct command to fetch rows from remote. */
4580  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4581  fetch_size, cursor_number);
4582 
4583  /* Retrieve and process rows a batch at a time. */
4584  for (;;)
4585  {
4586  int numrows;
4587  int i;
4588 
4589  /* Allow users to cancel long query */
4591 
4592  /*
4593  * XXX possible future improvement: if rowstoskip is large, we
4594  * could issue a MOVE rather than physically fetching the rows,
4595  * then just adjust rowstoskip and samplerows appropriately.
4596  */
4597 
4598  /* Fetch some rows */
4599  res = pgfdw_exec_query(conn, fetch_sql);
4600  /* On error, report the original query, not the FETCH. */
4601  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4602  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4603 
4604  /* Process whatever we got. */
4605  numrows = PQntuples(res);
4606  for (i = 0; i < numrows; i++)
4607  analyze_row_processor(res, i, &astate);
4608 
4609  PQclear(res);
4610  res = NULL;
4611 
4612  /* Must be EOF if we didn't get all the rows requested. */
4613  if (numrows < fetch_size)
4614  break;
4615  }
4616 
4617  /* Close the cursor, just to be tidy. */
4618  close_cursor(conn, cursor_number);
4619  }
4620  PG_CATCH();
4621  {
4622  if (res)
4623  PQclear(res);
4624  PG_RE_THROW();
4625  }
4626  PG_END_TRY();
4627 
4628  ReleaseConnection(conn);
4629 
4630  /* We assume that we have no dead tuple. */
4631  *totaldeadrows = 0.0;
4632 
4633  /* We've retrieved all living tuples from foreign server. */
4634  *totalrows = astate.samplerows;
4635 
4636  /*
4637  * Emit some interesting relation info
4638  */
4639  ereport(elevel,
4640  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4641  RelationGetRelationName(relation),
4642  astate.samplerows, astate.numrows)));
4643 
4644  return astate.numrows;
4645 }
4646 
4647 /*
4648  * Collect sample rows from the result of query.
4649  * - Use all tuples in sample until target # of samples are collected.
4650  * - Subsequently, replace already-sampled tuples randomly.
4651  */
4652 static void
4654 {
4655  int targrows = astate->targrows;
4656  int pos; /* array index to store tuple in */
4657  MemoryContext oldcontext;
4658 
4659  /* Always increment sample row counter. */
4660  astate->samplerows += 1;
4661 
4662  /*
4663  * Determine the slot where this sample row should be stored. Set pos to
4664  * negative value to indicate the row should be skipped.
4665  */
4666  if (astate->numrows < targrows)
4667  {
4668  /* First targrows rows are always included into the sample */
4669  pos = astate->numrows++;
4670  }
4671  else
4672  {
4673  /*
4674  * Now we start replacing tuples in the sample until we reach the end
4675  * of the relation. Same algorithm as in acquire_sample_rows in
4676  * analyze.c; see Jeff Vitter's paper.
4677  */
4678  if (astate->rowstoskip < 0)
4679  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4680 
4681  if (astate->rowstoskip <= 0)
4682  {
4683  /* Choose a random reservoir element to replace. */
4684  pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4685  Assert(pos >= 0 && pos < targrows);
4686  heap_freetuple(astate->rows[pos]);
4687  }
4688  else
4689  {
4690  /* Skip this tuple. */
4691  pos = -1;
4692  }
4693 
4694  astate->rowstoskip -= 1;
4695  }
4696 
4697  if (pos >= 0)
4698  {
4699  /*
4700  * Create sample tuple from current result row, and store it in the
4701  * position determined above. The tuple has to be created in anl_cxt.
4702  */
4703  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4704 
4705  astate->rows[pos] = make_tuple_from_result_row(res, row,
4706  astate->rel,
4707  astate->attinmeta,
4708  astate->retrieved_attrs,
4709  NULL,
4710  astate->temp_cxt);
4711 
4712  MemoryContextSwitchTo(oldcontext);
4713  }
4714 }
4715 
4716 /*
4717  * Import a foreign schema
4718  */
4719 static List *
4721 {
4722  List *commands = NIL;
4723  bool import_collate = true;
4724  bool import_default = false;
4725  bool import_not_null = true;
4726  ForeignServer *server;
4727  UserMapping *mapping;
4728  PGconn *conn;
4730  PGresult *volatile res = NULL;
4731  int numrows,
4732  i;
4733  ListCell *lc;
4734 
4735  /* Parse statement options */
4736  foreach(lc, stmt->options)
4737  {
4738  DefElem *def = (DefElem *) lfirst(lc);
4739 
4740  if (strcmp(def->defname, "import_collate") == 0)
4741  import_collate = defGetBoolean(def);
4742  else if (strcmp(def->defname, "import_default") == 0)
4743  import_default = defGetBoolean(def);
4744  else if (strcmp(def->defname, "import_not_null") == 0)
4745  import_not_null = defGetBoolean(def);
4746  else
4747  ereport(ERROR,
4748  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4749  errmsg("invalid option \"%s\"", def->defname)));
4750  }
4751 
4752  /*
4753  * Get connection to the foreign server. Connection manager will
4754  * establish new connection if necessary.
4755  */
4756  server = GetForeignServer(serverOid);
4757  mapping = GetUserMapping(GetUserId(), server->serverid);
4758  conn = GetConnection(mapping, false);
4759 
4760  /* Don't attempt to import collation if remote server hasn't got it */
4761  if (PQserverVersion(conn) < 90100)
4762  import_collate = false;
4763 
4764  /* Create workspace for strings */
4765  initStringInfo(&buf);
4766 
4767  /* In what follows, do not risk leaking any PGresults. */
4768  PG_TRY();
4769  {
4770  /* Check that the schema really exists */
4771  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4772  deparseStringLiteral(&buf, stmt->remote_schema);
4773 
4774  res = pgfdw_exec_query(conn, buf.data);
4775  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4776  pgfdw_report_error(ERROR, res, conn, false, buf.data);
4777 
4778  if (PQntuples(res) != 1)
4779  ereport(ERROR,
4780  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4781  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4782  stmt->remote_schema, server->servername)));
4783 
4784  PQclear(res);
4785  res = NULL;
4786  resetStringInfo(&buf);
4787 
4788  /*
4789  * Fetch all table data from this schema, possibly restricted by
4790  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
4791  * to EXCEPT/LIMIT TO here, because the core code will filter the
4792  * statements we return according to those lists anyway. But it
4793  * should save a few cycles to not process excluded tables in the
4794  * first place.)
4795  *
4796  * Ignore table data for partitions and only include the definitions
4797  * of the root partitioned tables to allow access to the complete
4798  * remote data set locally in the schema imported.
4799  *
4800  * Note: because we run the connection with search_path restricted to
4801  * pg_catalog, the format_type() and pg_get_expr() outputs will always
4802  * include a schema name for types/functions in other schemas, which
4803  * is what we want.
4804  */
4805  if (import_collate)
4807  "SELECT relname, "
4808  " attname, "
4809  " format_type(atttypid, atttypmod), "
4810  " attnotnull, "
4811  " pg_get_expr(adbin, adrelid), "
4812  " collname, "
4813  " collnsp.nspname "
4814  "FROM pg_class c "
4815  " JOIN pg_namespace n ON "
4816  " relnamespace = n.oid "
4817  " LEFT JOIN pg_attribute a ON "
4818  " attrelid = c.oid AND attnum > 0 "
4819  " AND NOT attisdropped "
4820  " LEFT JOIN pg_attrdef ad ON "
4821  " adrelid = c.oid AND adnum = attnum "
4822  " LEFT JOIN pg_collation coll ON "
4823  " coll.oid = attcollation "
4824  " LEFT JOIN pg_namespace collnsp ON "
4825  " collnsp.oid = collnamespace ");
4826  else
4828  "SELECT relname, "
4829  " attname, "
4830  " format_type(atttypid, atttypmod), "
4831  " attnotnull, "
4832  " pg_get_expr(adbin, adrelid), "
4833  " NULL, NULL "
4834  "FROM pg_class c "
4835  " JOIN pg_namespace n ON "
4836  " relnamespace = n.oid "
4837  " LEFT JOIN pg_attribute a ON "
4838  " attrelid = c.oid AND attnum > 0 "
4839  " AND NOT attisdropped "
4840  " LEFT JOIN pg_attrdef ad ON "
4841  " adrelid = c.oid AND adnum = attnum ");
4842 
4844  "WHERE c.relkind IN ("
4845  CppAsString2(RELKIND_RELATION) ","
4846  CppAsString2(RELKIND_VIEW) ","
4847  CppAsString2(RELKIND_FOREIGN_TABLE) ","
4848  CppAsString2(RELKIND_MATVIEW) ","
4849  CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
4850  " AND n.nspname = ");
4851  deparseStringLiteral(&buf, stmt->remote_schema);
4852 
4853  /* Partitions are supported since Postgres 10 */
4854  if (PQserverVersion(conn) >= 100000)
4855  appendStringInfoString(&buf, " AND NOT c.relispartition ");
4856 
4857  /* Apply restrictions for LIMIT TO and EXCEPT */
4858  if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4860  {
4861  bool first_item = true;
4862 
4863  appendStringInfoString(&buf, " AND c.relname ");
4864  if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4865  appendStringInfoString(&buf, "NOT ");
4866  appendStringInfoString(&buf, "IN (");
4867 
4868  /* Append list of table names within IN clause */
4869  foreach(lc, stmt->table_list)
4870  {
4871  RangeVar *rv = (RangeVar *) lfirst(lc);
4872 
4873  if (first_item)
4874  first_item = false;
4875  else
4876  appendStringInfoString(&buf, ", ");
4877  deparseStringLiteral(&buf, rv->relname);
4878  }
4879  appendStringInfoChar(&buf, ')');
4880  }
4881 
4882  /* Append ORDER BY at the end of query to ensure output ordering */
4883  appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4884 
4885  /* Fetch the data */
4886  res = pgfdw_exec_query(conn, buf.data);
4887  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4888  pgfdw_report_error(ERROR, res, conn, false, buf.data);
4889 
4890  /* Process results */
4891  numrows = PQntuples(res);
4892  /* note: incrementation of i happens in inner loop's while() test */
4893  for (i = 0; i < numrows;)
4894  {
4895  char *tablename = PQgetvalue(res, i, 0);
4896  bool first_item = true;
4897 
4898  resetStringInfo(&buf);
4899  appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4900  quote_identifier(tablename));
4901 
4902  /* Scan all rows for this table */
4903  do
4904  {
4905  char *attname;
4906  char *typename;
4907  char *attnotnull;
4908  char *attdefault;
4909  char *collname;
4910  char *collnamespace;
4911 
4912  /* If table has no columns, we'll see nulls here */
4913  if (PQgetisnull(res, i, 1))
4914  continue;
4915 
4916  attname = PQgetvalue(res, i, 1);
4917  typename = PQgetvalue(res, i, 2);
4918  attnotnull = PQgetvalue(res, i, 3);
4919  attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
4920  PQgetvalue(res, i, 4);
4921  collname = PQgetisnull(res, i, 5) ? (char *) NULL :
4922  PQgetvalue(res, i, 5);
4923  collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
4924  PQgetvalue(res, i, 6);
4925 
4926  if (first_item)
4927  first_item = false;
4928  else
4929  appendStringInfoString(&buf, ",\n");
4930 
4931  /* Print column name and type */
4932  appendStringInfo(&buf, " %s %s",
4933  quote_identifier(attname),
4934  typename);
4935 
4936  /*
4937  * Add column_name option so that renaming the foreign table's
4938  * column doesn't break the association to the underlying
4939  * column.
4940  */
4941  appendStringInfoString(&buf, " OPTIONS (column_name ");
4942  deparseStringLiteral(&buf, attname);
4943  appendStringInfoChar(&buf, ')');
4944 
4945  /* Add COLLATE if needed */
4946  if (import_collate && collname != NULL && collnamespace != NULL)
4947  appendStringInfo(&buf, " COLLATE %s.%s",
4948  quote_identifier(collnamespace),
4949  quote_identifier(collname));
4950 
4951  /* Add DEFAULT if needed */
4952  if (import_default && attdefault != NULL)
4953  appendStringInfo(&buf, " DEFAULT %s", attdefault);
4954 
4955  /* Add NOT NULL if needed */
4956  if (import_not_null && attnotnull[0] == 't')
4957  appendStringInfoString(&buf, " NOT NULL");
4958  }
4959  while (++i < numrows &&
4960  strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4961 
4962  /*
4963  * Add server name and table-level options. We specify remote
4964  * schema and table name as options (the latter to ensure that
4965  * renaming the foreign table doesn't break the association).
4966  */
4967  appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4968  quote_identifier(server->servername));
4969 
4970  appendStringInfoString(&buf, "schema_name ");
4971  deparseStringLiteral(&buf, stmt->remote_schema);
4972  appendStringInfoString(&buf, ", table_name ");
4973  deparseStringLiteral(&buf, tablename);
4974 
4975  appendStringInfoString(&buf, ");");
4976 
4977  commands = lappend(commands, pstrdup(buf.data));
4978  }
4979  }
4980  PG_FINALLY();
4981  {
4982  if (res)
4983  PQclear(res);
4984  }
4985  PG_END_TRY();
4986 
4987  ReleaseConnection(conn);
4988 
4989  return commands;
4990 }
4991 
4992 /*
4993  * Assess whether the join between inner and outer relations can be pushed down
4994  * to the foreign server. As a side effect, save information we obtain in this
4995  * function to PgFdwRelationInfo passed in.
4996  */
4997 static bool
4999  RelOptInfo *outerrel, RelOptInfo *innerrel,
5000  JoinPathExtraData *extra)
5001 {
5002  PgFdwRelationInfo *fpinfo;
5003  PgFdwRelationInfo *fpinfo_o;
5004  PgFdwRelationInfo *fpinfo_i;
5005  ListCell *lc;
5006  List *joinclauses;
5007 
5008  /*
5009  * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
5010  * Constructing queries representing SEMI and ANTI joins is hard, hence
5011  * not considered right now.
5012  */
5013  if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
5014  jointype != JOIN_RIGHT && jointype != JOIN_FULL)
5015  return false;
5016 
5017  /*
5018  * If either of the joining relations is marked as unsafe to pushdown, the
5019  * join can not be pushed down.
5020  */
5021  fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
5022  fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
5023  fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
5024  if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
5025  !fpinfo_i || !fpinfo_i->pushdown_safe)
5026  return false;
5027 
5028  /*
5029  * If joining relations have local conditions, those conditions are
5030  * required to be applied before joining the relations. Hence the join can
5031  * not be pushed down.
5032  */
5033  if (fpinfo_o->local_conds || fpinfo_i->local_conds)
5034  return false;
5035 
5036  /*
5037  * Merge FDW options. We might be tempted to do this after we have deemed
5038  * the foreign join to be OK. But we must do this beforehand so that we
5039  * know which quals can be evaluated on the foreign server, which might
5040  * depend on shippable_extensions.
5041  */
5042  fpinfo->server = fpinfo_o->server;
5043  merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
5044 
5045  /*
5046  * Separate restrict list into join quals and pushed-down (other) quals.
5047  *
5048  * Join quals belonging to an outer join must all be shippable, else we
5049  * cannot execute the join remotely. Add such quals to 'joinclauses'.
5050  *
5051  * Add other quals to fpinfo->remote_conds if they are shippable, else to
5052  * fpinfo->local_conds. In an inner join it's okay to execute conditions
5053  * either locally or remotely; the same is true for pushed-down conditions
5054  * at an outer join.
5055  *
5056  * Note we might return failure after having already scribbled on
5057  * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
5058  * won't consult those lists again if we deem the join unshippable.
5059  */
5060  joinclauses = NIL;
5061  foreach(lc, extra->restrictlist)
5062  {
5063  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5064  bool is_remote_clause = is_foreign_expr(root, joinrel,
5065  rinfo->clause);
5066 
5067  if (IS_OUTER_JOIN(jointype) &&
5068  !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
5069  {
5070  if (!is_remote_clause)
5071  return false;
5072  joinclauses = lappend(joinclauses, rinfo);
5073  }
5074  else
5075  {
5076  if (is_remote_clause)
5077  fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5078  else
5079  fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5080  }
5081  }
5082 
5083  /*
5084  * deparseExplicitTargetList() isn't smart enough to handle anything other
5085  * than a Var. In particular, if there's some PlaceHolderVar that would
5086  * need to be evaluated within this join tree (because there's an upper
5087  * reference to a quantity that may go to NULL as a result of an outer
5088  * join), then we can't try to push the join down because we'll fail when
5089  * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
5090  * needs to be evaluated *at the top* of this join tree is OK, because we
5091  * can do that locally after fetching the results from the remote side.
5092  */
5093  foreach(lc, root->placeholder_list)
5094  {
5095  PlaceHolderInfo *phinfo = lfirst(lc);
5096  Relids relids;
5097 
5098  /* PlaceHolderInfo refers to parent relids, not child relids. */
5099  relids = IS_OTHER_REL(joinrel) ?
5100  joinrel->top_parent_relids : joinrel->relids;
5101 
5102  if (bms_is_subset(phinfo->ph_eval_at, relids) &&
5103  bms_nonempty_difference(relids, phinfo->ph_eval_at))
5104  return false;
5105  }
5106 
5107  /* Save the join clauses, for later use. */
5108  fpinfo->joinclauses = joinclauses;
5109 
5110  fpinfo->outerrel = outerrel;
5111  fpinfo->innerrel = innerrel;
5112  fpinfo->jointype = jointype;
5113 
5114  /*
5115  * By default, both the input relations are not required to be deparsed as
5116  * subqueries, but there might be some relations covered by the input
5117  * relations that are required to be deparsed as subqueries, so save the
5118  * relids of those relations for later use by the deparser.
5119  */
5120  fpinfo->make_outerrel_subquery = false;
5121  fpinfo->make_innerrel_subquery = false;
5122  Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
5123  Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
5125  fpinfo_i->lower_subquery_rels);
5126 
5127  /*
5128  * Pull the other remote conditions from the joining relations into join
5129  * clauses or other remote clauses (remote_conds) of this relation
5130  * wherever possible. This avoids building subqueries at every join step.
5131  *
5132  * For an inner join, clauses from both the relations are added to the
5133  * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5134  * the outer side are added to remote_conds since those can be evaluated
5135  * after the join is evaluated. The clauses from inner side are added to
5136  * the joinclauses, since they need to be evaluated while constructing the
5137  * join.
5138  *
5139  * For a FULL OUTER JOIN, the other clauses from either relation can not
5140  * be added to the joinclauses or remote_conds, since each relation acts
5141  * as an outer relation for the other.
5142  *
5143  * The joining sides can not have local conditions, thus no need to test
5144  * shippability of the clauses being pulled up.
5145  */
5146  switch (jointype)
5147  {
5148  case JOIN_INNER:
5149  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5150  fpinfo_i->remote_conds);
5151  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5152  fpinfo_o->remote_conds);
5153  break;
5154 
5155  case JOIN_LEFT:
5156  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5157  fpinfo_i->remote_conds);
5158  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5159  fpinfo_o->remote_conds);
5160  break;
5161 
5162  case JOIN_RIGHT:
5163  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5164  fpinfo_o->remote_conds);
5165  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5166  fpinfo_i->remote_conds);
5167  break;
5168 
5169  case JOIN_FULL:
5170 
5171  /*
5172  * In this case, if any of the input relations has conditions, we
5173  * need to deparse that relation as a subquery so that the
5174  * conditions can be evaluated before the join. Remember it in
5175  * the fpinfo of this relation so that the deparser can take
5176  * appropriate action. Also, save the relids of base relations
5177  * covered by that relation for later use by the deparser.
5178  */
5179  if (fpinfo_o->remote_conds)
5180  {
5181  fpinfo->make_outerrel_subquery = true;
5182  fpinfo->lower_subquery_rels =
5184  outerrel->relids);
5185  }
5186  if (fpinfo_i->remote_conds)
5187  {
5188  fpinfo->make_innerrel_subquery = true;
5189  fpinfo->lower_subquery_rels =
5191  innerrel->relids);
5192  }
5193  break;
5194 
5195  default:
5196  /* Should not happen, we have just checked this above */
5197  elog(ERROR, "unsupported join type %d", jointype);
5198  }
5199 
5200  /*
5201  * For an inner join, all restrictions can be treated alike. Treating the
5202  * pushed down conditions as join conditions allows a top level full outer
5203  * join to be deparsed without requiring subqueries.
5204  */
5205  if (jointype == JOIN_INNER)
5206  {
5207  Assert(!fpinfo->joinclauses);
5208  fpinfo->joinclauses = fpinfo->remote_conds;
5209  fpinfo->remote_conds = NIL;
5210  }
5211 
5212  /* Mark that this join can be pushed down safely */
5213  fpinfo->pushdown_safe = true;
5214 
5215  /* Get user mapping */
5216  if (fpinfo->use_remote_estimate)
5217  {
5218  if (fpinfo_o->use_remote_estimate)
5219  fpinfo->user = fpinfo_o->user;
5220  else
5221  fpinfo->user = fpinfo_i->user;
5222  }
5223  else
5224  fpinfo->user = NULL;
5225 
5226  /*
5227  * Set # of retrieved rows and cached relation costs to some negative
5228  * value, so that we can detect when they are set to some sensible values,
5229  * during one (usually the first) of the calls to estimate_path_cost_size.
5230  */
5231  fpinfo->retrieved_rows = -1;
5232  fpinfo->rel_startup_cost = -1;
5233  fpinfo->rel_total_cost = -1;
5234 
5235  /*
5236  * Set the string describing this join relation to be used in EXPLAIN
5237  * output of corresponding ForeignScan. Note that the decoration we add
5238  * to the base relation names mustn't include any digits, or it'll confuse
5239  * postgresExplainForeignScan.
5240  */
5241  fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)",
5242  fpinfo_o->relation_name,
5243  get_jointype_name(fpinfo->jointype),
5244  fpinfo_i->relation_name);
5245 
5246  /*
5247  * Set the relation index. This is defined as the position of this
5248  * joinrel in the join_rel_list list plus the length of the rtable list.
5249  * Note that since this joinrel is at the end of the join_rel_list list
5250  * when we are called, we can get the position by list_length.
5251  */
5252  Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
5253  fpinfo->relation_index =
5255 
5256  return true;
5257 }
5258 
5259 static void
5261  Path *epq_path)
5262 {
5263  List *useful_pathkeys_list = NIL; /* List of all pathkeys */
5264  ListCell *lc;
5265 
5266  useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
5267 
5268  /* Create one path for each set of pathkeys we found above. */
5269  foreach(lc, useful_pathkeys_list)
5270  {
5271  double rows;
5272  int width;
5273  Cost startup_cost;
5274  Cost total_cost;
5275  List *useful_pathkeys = lfirst(lc);
5276  Path *sorted_epq_path;
5277 
5278  estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL,
5279  &rows, &width, &startup_cost, &total_cost);
5280 
5281  /*
5282  * The EPQ path must be at least as well sorted as the path itself, in
5283  * case it gets used as input to a mergejoin.
5284  */
5285  sorted_epq_path = epq_path;
5286  if (sorted_epq_path != NULL &&
5287  !pathkeys_contained_in(useful_pathkeys,
5288  sorted_epq_path->pathkeys))
5289  sorted_epq_path = (Path *)
5290  create_sort_path(root,
5291  rel,
5292  sorted_epq_path,
5293  useful_pathkeys,
5294  -1.0);
5295 
5296  if (IS_SIMPLE_REL(rel))
5297  add_path(rel, (Path *)
5298  create_foreignscan_path(root, rel,
5299  NULL,
5300  rows,
5301  startup_cost,
5302  total_cost,
5303  useful_pathkeys,
5304  rel->lateral_relids,
5305  sorted_epq_path,
5306  NIL));
5307  else
5308  add_path(rel, (Path *)
5309  create_foreign_join_path(root, rel,
5310  NULL,
5311  rows,
5312  startup_cost,
5313  total_cost,
5314  useful_pathkeys,
5315  rel->lateral_relids,
5316  sorted_epq_path,
5317  NIL));
5318  }
5319 }
5320 
5321 /*
5322  * Parse options from foreign server and apply them to fpinfo.
5323  *
5324  * New options might also require tweaking merge_fdw_options().
5325  */
5326 static void
5328 {
5329  ListCell *lc;
5330 
5331  foreach(lc, fpinfo->server->options)
5332  {
5333  DefElem *def = (DefElem *) lfirst(lc);
5334 
5335  if (strcmp(def->defname, "use_remote_estimate") == 0)
5336  fpinfo->use_remote_estimate = defGetBoolean(def);
5337  else if (strcmp(def->defname, "fdw_startup_cost") == 0)
5338  fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
5339  else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
5340  fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
5341  else if (strcmp(def->defname, "extensions") == 0)
5342  fpinfo->shippable_extensions =
5343  ExtractExtensionList(defGetString(def), false);
5344  else if (strcmp(def->defname, "fetch_size") == 0)
5345  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5346  }
5347 }
5348 
5349 /*
5350  * Parse options from foreign table and apply them to fpinfo.
5351  *
5352  * New options might also require tweaking merge_fdw_options().
5353  */
5354 static void
5356 {
5357  ListCell *lc;
5358 
5359  foreach(lc, fpinfo->table->options)
5360  {
5361  DefElem *def = (DefElem *) lfirst(lc);
5362 
5363  if (strcmp(def->defname, "use_remote_estimate") == 0)
5364  fpinfo->use_remote_estimate = defGetBoolean(def);
5365  else if (strcmp(def->defname, "fetch_size") == 0)
5366  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5367  }
5368 }
5369 
5370 /*
5371  * Merge FDW options from input relations into a new set of options for a join
5372  * or an upper rel.
5373  *
5374  * For a join relation, FDW-specific information about the inner and outer
5375  * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
5376  * fpinfo_o provides the information for the input relation; fpinfo_i is
5377  * expected to NULL.
5378  */
5379 static void
5381  const PgFdwRelationInfo *fpinfo_o,
5382  const PgFdwRelationInfo *fpinfo_i)
5383 {
5384  /* We must always have fpinfo_o. */
5385  Assert(fpinfo_o);
5386 
5387  /* fpinfo_i may be NULL, but if present the servers must both match. */
5388  Assert(!fpinfo_i ||
5389  fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5390 
5391  /*
5392  * Copy the server specific FDW options. (For a join, both relations come
5393  * from the same server, so the server options should have the same value
5394  * for both relations.)
5395  */
5396  fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5397  fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5398  fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5399  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5400  fpinfo->fetch_size = fpinfo_o->fetch_size;
5401 
5402  /* Merge the table level options from either side of the join. */
5403  if (fpinfo_i)
5404  {
5405  /*
5406  * We'll prefer to use remote estimates for this join if any table
5407  * from either side of the join is using remote estimates. This is
5408  * most likely going to be preferred since they're already willing to
5409  * pay the price of a round trip to get the remote EXPLAIN. In any
5410  * case it's not entirely clear how we might otherwise handle this
5411  * best.
5412  */
5413  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5414  fpinfo_i->use_remote_estimate;
5415 
5416  /*
5417  * Set fetch size to maximum of the joining sides, since we are
5418  * expecting the rows returned by the join to be proportional to the
5419  * relation sizes.
5420  */
5421  fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5422  }
5423 }
5424 
5425 /*
5426  * postgresGetForeignJoinPaths
5427  * Add possible ForeignPath to joinrel, if join is safe to push down.
5428  */
5429 static void
5431  RelOptInfo *joinrel,
5432  RelOptInfo *outerrel,
5433  RelOptInfo *innerrel,
5434  JoinType jointype,
5435  JoinPathExtraData *extra)
5436 {
5437  PgFdwRelationInfo *fpinfo;
5438  ForeignPath *joinpath;
5439  double rows;
5440  int width;
5441  Cost startup_cost;
5442  Cost total_cost;
5443  Path *epq_path; /* Path to create plan to be executed when
5444  * EvalPlanQual gets triggered. */
5445 
5446  /*
5447  * Skip if this join combination has been considered already.
5448  */
5449  if (joinrel->fdw_private)
5450  return;
5451 
5452  /*
5453  * This code does not work for joins with lateral references, since those
5454  * must have parameterized paths, which we don't generate yet.
5455  */
5456  if (!bms_is_empty(joinrel->lateral_relids))
5457  return;
5458 
5459  /*
5460  * Create unfinished PgFdwRelationInfo entry which is used to indicate
5461  * that the join relation is already considered, so that we won't waste
5462  * time in judging safety of join pushdown and adding the same paths again
5463  * if found safe. Once we know that this join can be pushed down, we fill
5464  * the entry.
5465  */
5466  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5467  fpinfo->pushdown_safe = false;
5468  joinrel->fdw_private = fpinfo;
5469  /* attrs_used is only for base relations. */
5470  fpinfo->attrs_used = NULL;
5471 
5472  /*
5473  * If there is a possibility that EvalPlanQual will be executed, we need
5474  * to be able to reconstruct the row using scans of the base relations.
5475  * GetExistingLocalJoinPath will find a suitable path for this purpose in
5476  * the path list of the joinrel, if one exists. We must be careful to
5477  * call it before adding any ForeignPath, since the ForeignPath might
5478  * dominate the only suitable local path available. We also do it before
5479  * calling foreign_join_ok(), since that function updates fpinfo and marks
5480  * it as pushable if the join is found to be pushable.
5481  */
5482  if (root->parse->commandType == CMD_DELETE ||
5483  root->parse->commandType == CMD_UPDATE ||
5484  root->rowMarks)
5485  {
5486  epq_path = GetExistingLocalJoinPath(joinrel);
5487  if (!epq_path)
5488  {
5489  elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
5490  return;
5491  }
5492  }
5493  else
5494  epq_path = NULL;
5495 
5496  if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
5497  {
5498  /* Free path required for EPQ if we copied one; we don't need it now */
5499  if (epq_path)
5500  pfree(epq_path);
5501  return;
5502  }
5503 
5504  /*
5505  * Compute the selectivity and cost of the local_conds, so we don't have
5506  * to do it over again for each path. The best we can do for these
5507  * conditions is to estimate selectivity on the basis of local statistics.
5508  * The local conditions are applied after the join has been computed on
5509  * the remote side like quals in WHERE clause, so pass jointype as
5510  * JOIN_INNER.
5511  */
5512  fpinfo->local_conds_sel = clauselist_selectivity(root,
5513  fpinfo->local_conds,
5514  0,
5515  JOIN_INNER,
5516  NULL);
5517  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5518 
5519  /*
5520  * If we are going to estimate costs locally, estimate the join clause
5521  * selectivity here while we have special join info.
5522  */
5523  if (!fpinfo->use_remote_estimate)
5524  fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
5525  0, fpinfo->jointype,
5526  extra->sjinfo);
5527 
5528  /* Estimate costs for bare join relation */
5529  estimate_path_cost_size(root, joinrel, NIL, NIL, NULL,
5530  &rows, &width, &startup_cost, &total_cost);
5531  /* Now update this information in the joinrel */
5532  joinrel->rows = rows;
5533  joinrel->reltarget->width = width;
5534  fpinfo->rows = rows;
5535  fpinfo->width = width;
5536  fpinfo->startup_cost = startup_cost;
5537  fpinfo->total_cost = total_cost;
5538 
5539  /*
5540  * Create a new join path and add it to the joinrel which represents a
5541  * join between foreign tables.
5542  */
5543  joinpath = create_foreign_join_path(root,
5544  joinrel,
5545  NULL, /* default pathtarget */
5546  rows,
5547  startup_cost,
5548  total_cost,
5549  NIL, /* no pathkeys */
5550  joinrel->lateral_relids,
5551  epq_path,
5552  NIL); /* no fdw_private */
5553 
5554  /* Add generated path into joinrel by add_path(). */
5555  add_path(joinrel, (Path *) joinpath);
5556 
5557  /* Consider pathkeys for the join relation */
5558  add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5559 
5560  /* XXX Consider parameterized paths for the join relation */
5561 }
5562 
5563 /*
5564  * Assess whether the aggregation, grouping and having operations can be pushed
5565  * down to the foreign server. As a side effect, save information we obtain in
5566  * this function to PgFdwRelationInfo of the input relation.
5567  */
5568 static bool
5570  Node *havingQual)
5571 {
5572  Query *query = root->parse;
5573  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5574  PathTarget *grouping_target = grouped_rel->reltarget;
5575  PgFdwRelationInfo *ofpinfo;
5576  ListCell *lc;
5577  int i;
5578  List *tlist = NIL;
5579 
5580  /* We currently don't support pushing Grouping Sets. */
5581  if (query->groupingSets)
5582  return false;
5583 
5584  /* Get the fpinfo of the underlying scan relation. */
5585  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5586 
5587  /*
5588  * If underlying scan relation has any local conditions, those conditions
5589  * are required to be applied before performing aggregation. Hence the
5590  * aggregate cannot be pushed down.
5591  */
5592  if (ofpinfo->local_conds)
5593  return false;
5594 
5595  /*
5596  * Examine grouping expressions, as well as other expressions we'd need to
5597  * compute, and check whether they are safe to push down to the foreign
5598  * server. All GROUP BY expressions will be part of the grouping target
5599  * and thus there is no need to search for them separately. Add grouping
5600  * expressions into target list which will be passed to foreign server.
5601  *
5602  * A tricky fine point is that we must not put any expression into the
5603  * target list that is just a foreign param (that is, something that
5604  * deparse.c would conclude has to be sent to the foreign server). If we
5605  * do, the expression will also appear in the fdw_exprs list of the plan
5606  * node, and setrefs.c will get confused and decide that the fdw_exprs
5607  * entry is actually a reference to the fdw_scan_tlist entry, resulting in
5608  * a broken plan. Somewhat oddly, it's OK if the expression contains such
5609  * a node, as long as it's not at top level; then no match is possible.
5610  */
5611  i = 0;
5612  foreach(lc, grouping_target->exprs)
5613  {
5614  Expr *expr = (Expr *) lfirst(lc);
5615  Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
5616  ListCell *l;
5617 
5618  /* Check whether this expression is part of GROUP BY clause */
5619  if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5620  {
5621  TargetEntry *tle;
5622 
5623  /*
5624  * If any GROUP BY expression is not shippable, then we cannot
5625  * push down aggregation to the foreign server.
5626  */
5627  if (!is_foreign_expr(root, grouped_rel, expr))
5628  return false;
5629 
5630  /*
5631  * If it would be a foreign param, we can't put it into the tlist,
5632  * so we have to fail.
5633  */
5634  if (is_foreign_param(root, grouped_rel, expr))
5635  return false;
5636 
5637  /*
5638  * Pushable, so add to tlist. We need to create a TLE for this
5639  * expression and apply the sortgroupref to it. We cannot use
5640  * add_to_flat_tlist() here because that avoids making duplicate
5641  * entries in the tlist. If there are duplicate entries with
5642  * distinct sortgrouprefs, we have to duplicate that situation in
5643  * the output tlist.