PostgreSQL Source Code  git master
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "commands/defrem.h"
21 #include "commands/explain.h"
22 #include "commands/vacuum.h"
23 #include "foreign/fdwapi.h"
24 #include "funcapi.h"
25 #include "miscadmin.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/cost.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/pathnode.h"
31 #include "optimizer/paths.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "optimizer/tlist.h"
36 #include "parser/parsetree.h"
37 #include "utils/builtins.h"
38 #include "utils/guc.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/rel.h"
42 #include "utils/sampling.h"
43 #include "utils/selfuncs.h"
44 
46 
47 /* Default CPU cost to start up a foreign query. */
48 #define DEFAULT_FDW_STARTUP_COST 100.0
49 
50 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
51 #define DEFAULT_FDW_TUPLE_COST 0.01
52 
53 /* If no remote estimates, assume a sort costs 20% extra */
54 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
55 
56 /*
57  * Indexes of FDW-private information stored in fdw_private lists.
58  *
59  * These items are indexed with the enum FdwScanPrivateIndex, so an item
60  * can be fetched with list_nth(). For example, to get the SELECT statement:
61  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
62  */
64 {
65  /* SQL statement to execute remotely (as a String node) */
67  /* Integer list of attribute numbers retrieved by the SELECT */
69  /* Integer representing the desired fetch_size */
71 
72  /*
73  * String describing join i.e. names of relations being joined and types
74  * of join, added when the scan is join
75  */
77 };
78 
79 /*
80  * Similarly, this enum describes what's kept in the fdw_private list for
81  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
82  *
83  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
84  * 2) Integer list of target attribute numbers for INSERT/UPDATE
85  * (NIL for a DELETE)
86  * 3) Boolean flag showing if the remote query has a RETURNING clause
87  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
88  */
90 {
91  /* SQL statement to execute remotely (as a String node) */
93  /* Integer list of target attribute numbers for INSERT/UPDATE */
95  /* has-returning flag (as an integer Value node) */
97  /* Integer list of attribute numbers retrieved by RETURNING */
99 };
100 
101 /*
102  * Similarly, this enum describes what's kept in the fdw_private list for
103  * a ForeignScan node that modifies a foreign table directly. We store:
104  *
105  * 1) UPDATE/DELETE statement text to be sent to the remote server
106  * 2) Boolean flag showing if the remote query has a RETURNING clause
107  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
108  * 4) Boolean flag showing if we set the command es_processed
109  */
111 {
112  /* SQL statement to execute remotely (as a String node) */
114  /* has-returning flag (as an integer Value node) */
116  /* Integer list of attribute numbers retrieved by RETURNING */
118  /* set-processed flag (as an integer Value node) */
120 };
121 
122 /*
123  * Execution state of a foreign scan using postgres_fdw.
124  */
125 typedef struct PgFdwScanState
126 {
127  Relation rel; /* relcache entry for the foreign table. NULL
128  * for a foreign join scan. */
129  TupleDesc tupdesc; /* tuple descriptor of scan */
130  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
131 
132  /* extracted fdw_private data */
133  char *query; /* text of SELECT command */
134  List *retrieved_attrs; /* list of retrieved attribute numbers */
135 
136  /* for remote query execution */
137  PGconn *conn; /* connection for the scan */
138  unsigned int cursor_number; /* quasi-unique ID for my cursor */
139  bool cursor_exists; /* have we created the cursor? */
140  int numParams; /* number of parameters passed to query */
141  FmgrInfo *param_flinfo; /* output conversion functions for them */
142  List *param_exprs; /* executable expressions for param values */
143  const char **param_values; /* textual values of query parameters */
144 
145  /* for storing result tuples */
146  HeapTuple *tuples; /* array of currently-retrieved tuples */
147  int num_tuples; /* # of tuples in array */
148  int next_tuple; /* index of next one to return */
149 
150  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
151  int fetch_ct_2; /* Min(# of fetches done, 2) */
152  bool eof_reached; /* true if last fetch reached EOF */
153 
154  /* working memory contexts */
155  MemoryContext batch_cxt; /* context holding current batch of tuples */
156  MemoryContext temp_cxt; /* context for per-tuple temporary data */
157 
158  int fetch_size; /* number of tuples per fetch */
160 
161 /*
162  * Execution state of a foreign insert/update/delete operation.
163  */
164 typedef struct PgFdwModifyState
165 {
166  Relation rel; /* relcache entry for the foreign table */
167  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
168 
169  /* for remote query execution */
170  PGconn *conn; /* connection for the scan */
171  char *p_name; /* name of prepared statement, if created */
172 
173  /* extracted fdw_private data */
174  char *query; /* text of INSERT/UPDATE/DELETE command */
175  List *target_attrs; /* list of target attribute numbers */
176  bool has_returning; /* is there a RETURNING clause? */
177  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
178 
179  /* info about parameters for prepared statement */
180  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
181  int p_nums; /* number of parameters to transmit */
182  FmgrInfo *p_flinfo; /* output conversion functions for them */
183 
184  /* working memory context */
185  MemoryContext temp_cxt; /* context for per-tuple temporary data */
187 
188 /*
189  * Execution state of a foreign scan that modifies a foreign table directly.
190  */
192 {
193  Relation rel; /* relcache entry for the foreign table */
194  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
195 
196  /* extracted fdw_private data */
197  char *query; /* text of UPDATE/DELETE command */
198  bool has_returning; /* is there a RETURNING clause? */
199  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
200  bool set_processed; /* do we set the command es_processed? */
201 
202  /* for remote query execution */
203  PGconn *conn; /* connection for the update */
204  int numParams; /* number of parameters passed to query */
205  FmgrInfo *param_flinfo; /* output conversion functions for them */
206  List *param_exprs; /* executable expressions for param values */
207  const char **param_values; /* textual values of query parameters */
208 
209  /* for storing result tuples */
210  PGresult *result; /* result for query */
211  int num_tuples; /* # of result tuples */
212  int next_tuple; /* index of next one to return */
213  Relation resultRel; /* relcache entry for the target relation */
214  AttrNumber *attnoMap; /* array of attnums of input user columns */
215  AttrNumber ctidAttno; /* attnum of input ctid column */
216  AttrNumber oidAttno; /* attnum of input oid column */
217  bool hasSystemCols; /* are there system columns of resultRel? */
218 
219  /* working memory context */
220  MemoryContext temp_cxt; /* context for per-tuple temporary data */
222 
223 /*
224  * Workspace for analyzing a foreign table.
225  */
226 typedef struct PgFdwAnalyzeState
227 {
228  Relation rel; /* relcache entry for the foreign table */
229  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
230  List *retrieved_attrs; /* attr numbers retrieved by query */
231 
232  /* collected sample rows */
233  HeapTuple *rows; /* array of size targrows */
234  int targrows; /* target # of sample rows */
235  int numrows; /* # of sample rows collected */
236 
237  /* for random sampling */
238  double samplerows; /* # of rows fetched */
239  double rowstoskip; /* # of rows to skip before next sample */
240  ReservoirStateData rstate; /* state for reservoir sampling */
241 
242  /* working memory contexts */
243  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
244  MemoryContext temp_cxt; /* context for per-tuple temporary data */
246 
247 /*
248  * Identify the attribute where data conversion fails.
249  */
250 typedef struct ConversionLocation
251 {
252  Relation rel; /* foreign table's relcache entry. */
253  AttrNumber cur_attno; /* attribute number being processed, or 0 */
254 
255  /*
256  * In case of foreign join push down, fdw_scan_tlist is used to identify
257  * the Var node corresponding to the error location and
258  * fsstate->ss.ps.state gives access to the RTEs of corresponding relation
259  * to get the relation name and attribute name.
260  */
263 
264 /* Callback argument for ec_member_matches_foreign */
265 typedef struct
266 {
267  Expr *current; /* current expr, or NULL if not yet found */
268  List *already_used; /* expressions already dealt with */
270 
271 /*
272  * SQL functions
273  */
275 
276 /*
277  * FDW callback routines
278  */
279 static void postgresGetForeignRelSize(PlannerInfo *root,
280  RelOptInfo *baserel,
281  Oid foreigntableid);
282 static void postgresGetForeignPaths(PlannerInfo *root,
283  RelOptInfo *baserel,
284  Oid foreigntableid);
286  RelOptInfo *foreignrel,
287  Oid foreigntableid,
288  ForeignPath *best_path,
289  List *tlist,
290  List *scan_clauses,
291  Plan *outer_plan);
292 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
295 static void postgresEndForeignScan(ForeignScanState *node);
296 static void postgresAddForeignUpdateTargets(Query *parsetree,
297  RangeTblEntry *target_rte,
298  Relation target_relation);
300  ModifyTable *plan,
301  Index resultRelation,
302  int subplan_index);
303 static void postgresBeginForeignModify(ModifyTableState *mtstate,
304  ResultRelInfo *resultRelInfo,
305  List *fdw_private,
306  int subplan_index,
307  int eflags);
309  ResultRelInfo *resultRelInfo,
310  TupleTableSlot *slot,
311  TupleTableSlot *planSlot);
313  ResultRelInfo *resultRelInfo,
314  TupleTableSlot *slot,
315  TupleTableSlot *planSlot);
317  ResultRelInfo *resultRelInfo,
318  TupleTableSlot *slot,
319  TupleTableSlot *planSlot);
320 static void postgresEndForeignModify(EState *estate,
321  ResultRelInfo *resultRelInfo);
323 static bool postgresPlanDirectModify(PlannerInfo *root,
324  ModifyTable *plan,
325  Index resultRelation,
326  int subplan_index);
327 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
329 static void postgresEndDirectModify(ForeignScanState *node);
331  ExplainState *es);
333  ResultRelInfo *rinfo,
334  List *fdw_private,
335  int subplan_index,
336  ExplainState *es);
338  ExplainState *es);
339 static bool postgresAnalyzeForeignTable(Relation relation,
340  AcquireSampleRowsFunc *func,
341  BlockNumber *totalpages);
343  Oid serverOid);
344 static void postgresGetForeignJoinPaths(PlannerInfo *root,
345  RelOptInfo *joinrel,
346  RelOptInfo *outerrel,
347  RelOptInfo *innerrel,
348  JoinType jointype,
349  JoinPathExtraData *extra);
351  TupleTableSlot *slot);
352 static void postgresGetForeignUpperPaths(PlannerInfo *root,
353  UpperRelationKind stage,
354  RelOptInfo *input_rel,
355  RelOptInfo *output_rel);
356 
357 /*
358  * Helper functions
359  */
360 static void estimate_path_cost_size(PlannerInfo *root,
361  RelOptInfo *foreignrel,
362  List *param_join_conds,
363  List *pathkeys,
364  double *p_rows, int *p_width,
365  Cost *p_startup_cost, Cost *p_total_cost);
366 static void get_remote_estimate(const char *sql,
367  PGconn *conn,
368  double *rows,
369  int *width,
370  Cost *startup_cost,
371  Cost *total_cost);
374  void *arg);
375 static void create_cursor(ForeignScanState *node);
376 static void fetch_more_data(ForeignScanState *node);
377 static void close_cursor(PGconn *conn, unsigned int cursor_number);
378 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
379 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
380  ItemPointer tupleid,
381  TupleTableSlot *slot);
382 static void store_returning_result(PgFdwModifyState *fmstate,
383  TupleTableSlot *slot, PGresult *res);
384 static List *build_remote_returning(Index rtindex, Relation rel,
385  List *returningList);
386 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
387 static void execute_dml_stmt(ForeignScanState *node);
389 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
390  List *fdw_scan_tlist,
391  Index rtindex);
393  TupleTableSlot *slot,
394  EState *estate);
395 static void prepare_query_params(PlanState *node,
396  List *fdw_exprs,
397  int numParams,
399  List **param_exprs,
400  const char ***param_values);
401 static void process_query_params(ExprContext *econtext,
403  List *param_exprs,
404  const char **param_values);
405 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
406  HeapTuple *rows, int targrows,
407  double *totalrows,
408  double *totaldeadrows);
409 static void analyze_row_processor(PGresult *res, int row,
410  PgFdwAnalyzeState *astate);
412  int row,
413  Relation rel,
416  ForeignScanState *fsstate,
417  MemoryContext temp_context);
418 static void conversion_error_callback(void *arg);
419 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
420  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
421  JoinPathExtraData *extra);
422 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel);
424  RelOptInfo *rel);
427  Path *epq_path);
428 static void add_foreign_grouping_paths(PlannerInfo *root,
429  RelOptInfo *input_rel,
430  RelOptInfo *grouped_rel);
431 static void apply_server_options(PgFdwRelationInfo *fpinfo);
432 static void apply_table_options(PgFdwRelationInfo *fpinfo);
433 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
434  const PgFdwRelationInfo *fpinfo_o,
435  const PgFdwRelationInfo *fpinfo_i);
436 
437 
438 /*
439  * Foreign-data wrapper handler function: return a struct with pointers
440  * to my callback routines.
441  */
442 Datum
444 {
445  FdwRoutine *routine = makeNode(FdwRoutine);
446 
447  /* Functions for scanning foreign tables */
455 
456  /* Functions for updating foreign tables */
469 
470  /* Function for EvalPlanQual rechecks */
472  /* Support functions for EXPLAIN */
476 
477  /* Support functions for ANALYZE */
479 
480  /* Support functions for IMPORT FOREIGN SCHEMA */
482 
483  /* Support functions for join push-down */
485 
486  /* Support functions for upper relation push-down */
488 
489  PG_RETURN_POINTER(routine);
490 }
491 
492 /*
493  * postgresGetForeignRelSize
494  * Estimate # of rows and width of the result of the scan
495  *
496  * We should consider the effect of all baserestrictinfo clauses here, but
497  * not any join clauses.
498  */
499 static void
501  RelOptInfo *baserel,
502  Oid foreigntableid)
503 {
504  PgFdwRelationInfo *fpinfo;
505  ListCell *lc;
506  RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
507  const char *namespace;
508  const char *relname;
509  const char *refname;
510 
511  /*
512  * We use PgFdwRelationInfo to pass various information to subsequent
513  * functions.
514  */
515  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
516  baserel->fdw_private = (void *) fpinfo;
517 
518  /* Base foreign tables need to be pushed down always. */
519  fpinfo->pushdown_safe = true;
520 
521  /* Look up foreign-table catalog info. */
522  fpinfo->table = GetForeignTable(foreigntableid);
523  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
524 
525  /*
526  * Extract user-settable option values. Note that per-table setting of
527  * use_remote_estimate overrides per-server setting.
528  */
529  fpinfo->use_remote_estimate = false;
532  fpinfo->shippable_extensions = NIL;
533  fpinfo->fetch_size = 100;
534 
535  apply_server_options(fpinfo);
536  apply_table_options(fpinfo);
537 
538  /*
539  * If the table or the server is configured to use remote estimates,
540  * identify which user to do remote access as during planning. This
541  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
542  * permissions, the query would have failed at runtime anyway.
543  */
544  if (fpinfo->use_remote_estimate)
545  {
546  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
547 
548  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
549  }
550  else
551  fpinfo->user = NULL;
552 
553  /*
554  * Identify which baserestrictinfo clauses can be sent to the remote
555  * server and which can't.
556  */
557  classifyConditions(root, baserel, baserel->baserestrictinfo,
558  &fpinfo->remote_conds, &fpinfo->local_conds);
559 
560  /*
561  * Identify which attributes will need to be retrieved from the remote
562  * server. These include all attrs needed for joins or final output, plus
563  * all attrs used in the local_conds. (Note: if we end up using a
564  * parameterized scan, it's possible that some of the join clauses will be
565  * sent to the remote and thus we wouldn't really need to retrieve the
566  * columns used in them. Doesn't seem worth detecting that case though.)
567  */
568  fpinfo->attrs_used = NULL;
569  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
570  &fpinfo->attrs_used);
571  foreach(lc, fpinfo->local_conds)
572  {
573  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
574 
575  pull_varattnos((Node *) rinfo->clause, baserel->relid,
576  &fpinfo->attrs_used);
577  }
578 
579  /*
580  * Compute the selectivity and cost of the local_conds, so we don't have
581  * to do it over again for each path. The best we can do for these
582  * conditions is to estimate selectivity on the basis of local statistics.
583  */
585  fpinfo->local_conds,
586  baserel->relid,
587  JOIN_INNER,
588  NULL);
589 
590  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
591 
592  /*
593  * Set cached relation costs to some negative value, so that we can detect
594  * when they are set to some sensible costs during one (usually the first)
595  * of the calls to estimate_path_cost_size().
596  */
597  fpinfo->rel_startup_cost = -1;
598  fpinfo->rel_total_cost = -1;
599 
600  /*
601  * If the table or the server is configured to use remote estimates,
602  * connect to the foreign server and execute EXPLAIN to estimate the
603  * number of rows selected by the restriction clauses, as well as the
604  * average row width. Otherwise, estimate using whatever statistics we
605  * have locally, in a way similar to ordinary tables.
606  */
607  if (fpinfo->use_remote_estimate)
608  {
609  /*
610  * Get cost/size estimates with help of remote server. Save the
611  * values in fpinfo so we don't need to do it again to generate the
612  * basic foreign path.
613  */
614  estimate_path_cost_size(root, baserel, NIL, NIL,
615  &fpinfo->rows, &fpinfo->width,
616  &fpinfo->startup_cost, &fpinfo->total_cost);
617 
618  /* Report estimated baserel size to planner. */
619  baserel->rows = fpinfo->rows;
620  baserel->reltarget->width = fpinfo->width;
621  }
622  else
623  {
624  /*
625  * If the foreign table has never been ANALYZEd, it will have relpages
626  * and reltuples equal to zero, which most likely has nothing to do
627  * with reality. We can't do a whole lot about that if we're not
628  * allowed to consult the remote server, but we can use a hack similar
629  * to plancat.c's treatment of empty relations: use a minimum size
630  * estimate of 10 pages, and divide by the column-datatype-based width
631  * estimate to get the corresponding number of tuples.
632  */
633  if (baserel->pages == 0 && baserel->tuples == 0)
634  {
635  baserel->pages = 10;
636  baserel->tuples =
637  (10 * BLCKSZ) / (baserel->reltarget->width +
639  }
640 
641  /* Estimate baserel size as best we can with local statistics. */
642  set_baserel_size_estimates(root, baserel);
643 
644  /* Fill in basically-bogus cost estimates for use later. */
645  estimate_path_cost_size(root, baserel, NIL, NIL,
646  &fpinfo->rows, &fpinfo->width,
647  &fpinfo->startup_cost, &fpinfo->total_cost);
648  }
649 
650  /*
651  * Set the name of relation in fpinfo, while we are constructing it here.
652  * It will be used to build the string describing the join relation in
653  * EXPLAIN output. We can't know whether VERBOSE option is specified or
654  * not, so always schema-qualify the foreign table name.
655  */
656  fpinfo->relation_name = makeStringInfo();
657  namespace = get_namespace_name(get_rel_namespace(foreigntableid));
658  relname = get_rel_name(foreigntableid);
659  refname = rte->eref->aliasname;
660  appendStringInfo(fpinfo->relation_name, "%s.%s",
661  quote_identifier(namespace),
662  quote_identifier(relname));
663  if (*refname && strcmp(refname, relname) != 0)
664  appendStringInfo(fpinfo->relation_name, " %s",
666 
667  /* No outer and inner relations. */
668  fpinfo->make_outerrel_subquery = false;
669  fpinfo->make_innerrel_subquery = false;
670  fpinfo->lower_subquery_rels = NULL;
671  /* Set the relation index. */
672  fpinfo->relation_index = baserel->relid;
673 }
674 
675 /*
676  * get_useful_ecs_for_relation
677  * Determine which EquivalenceClasses might be involved in useful
678  * orderings of this relation.
679  *
680  * This function is in some respects a mirror image of the core function
681  * pathkeys_useful_for_merging: for a regular table, we know what indexes
682  * we have and want to test whether any of them are useful. For a foreign
683  * table, we don't know what indexes are present on the remote side but
684  * want to speculate about which ones we'd like to use if they existed.
685  *
686  * This function returns a list of potentially-useful equivalence classes,
687  * but it does not guarantee that an EquivalenceMember exists which contains
688  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
689  * ft1.x + t1.x = 0, this function will say that the equivalence class
690  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
691  * t1 is local (or on a different server), it will turn out that no useful
692  * ORDER BY clause can be generated. It's not our job to figure that out
693  * here; we're only interested in identifying relevant ECs.
694  */
695 static List *
697 {
698  List *useful_eclass_list = NIL;
699  ListCell *lc;
700  Relids relids;
701 
702  /*
703  * First, consider whether any active EC is potentially useful for a merge
704  * join against this relation.
705  */
706  if (rel->has_eclass_joins)
707  {
708  foreach(lc, root->eq_classes)
709  {
710  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
711 
712  if (eclass_useful_for_merging(root, cur_ec, rel))
713  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
714  }
715  }
716 
717  /*
718  * Next, consider whether there are any non-EC derivable join clauses that
719  * are merge-joinable. If the joininfo list is empty, we can exit
720  * quickly.
721  */
722  if (rel->joininfo == NIL)
723  return useful_eclass_list;
724 
725  /* If this is a child rel, we must use the topmost parent rel to search. */
726  if (IS_OTHER_REL(rel))
727  {
729  relids = rel->top_parent_relids;
730  }
731  else
732  relids = rel->relids;
733 
734  /* Check each join clause in turn. */
735  foreach(lc, rel->joininfo)
736  {
737  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
738 
739  /* Consider only mergejoinable clauses */
740  if (restrictinfo->mergeopfamilies == NIL)
741  continue;
742 
743  /* Make sure we've got canonical ECs. */
744  update_mergeclause_eclasses(root, restrictinfo);
745 
746  /*
747  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
748  * that left_ec and right_ec will be initialized, per comments in
749  * distribute_qual_to_rels.
750  *
751  * We want to identify which side of this merge-joinable clause
752  * contains columns from the relation produced by this RelOptInfo. We
753  * test for overlap, not containment, because there could be extra
754  * relations on either side. For example, suppose we've got something
755  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
756  * A.y = D.y. The input rel might be the joinrel between A and B, and
757  * we'll consider the join clause A.y = D.y. relids contains a
758  * relation not involved in the join class (B) and the equivalence
759  * class for the left-hand side of the clause contains a relation not
760  * involved in the input rel (C). Despite the fact that we have only
761  * overlap and not containment in either direction, A.y is potentially
762  * useful as a sort column.
763  *
764  * Note that it's even possible that relids overlaps neither side of
765  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
766  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
767  * but overlaps neither side of B. In that case, we just skip this
768  * join clause, since it doesn't suggest a useful sort order for this
769  * relation.
770  */
771  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
772  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
773  restrictinfo->right_ec);
774  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
775  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
776  restrictinfo->left_ec);
777  }
778 
779  return useful_eclass_list;
780 }
781 
782 /*
783  * get_useful_pathkeys_for_relation
784  * Determine which orderings of a relation might be useful.
785  *
786  * Getting data in sorted order can be useful either because the requested
787  * order matches the final output ordering for the overall query we're
788  * planning, or because it enables an efficient merge join. Here, we try
789  * to figure out which pathkeys to consider.
790  */
791 static List *
793 {
794  List *useful_pathkeys_list = NIL;
795  List *useful_eclass_list;
797  EquivalenceClass *query_ec = NULL;
798  ListCell *lc;
799 
800  /*
801  * Pushing the query_pathkeys to the remote server is always worth
802  * considering, because it might let us avoid a local sort.
803  */
804  if (root->query_pathkeys)
805  {
806  bool query_pathkeys_ok = true;
807 
808  foreach(lc, root->query_pathkeys)
809  {
810  PathKey *pathkey = (PathKey *) lfirst(lc);
811  EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
812  Expr *em_expr;
813 
814  /*
815  * The planner and executor don't have any clever strategy for
816  * taking data sorted by a prefix of the query's pathkeys and
817  * getting it to be sorted by all of those pathkeys. We'll just
818  * end up resorting the entire data set. So, unless we can push
819  * down all of the query pathkeys, forget it.
820  *
821  * is_foreign_expr would detect volatile expressions as well, but
822  * checking ec_has_volatile here saves some cycles.
823  */
824  if (pathkey_ec->ec_has_volatile ||
825  !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
826  !is_foreign_expr(root, rel, em_expr))
827  {
828  query_pathkeys_ok = false;
829  break;
830  }
831  }
832 
833  if (query_pathkeys_ok)
834  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
835  }
836 
837  /*
838  * Even if we're not using remote estimates, having the remote side do the
839  * sort generally won't be any worse than doing it locally, and it might
840  * be much better if the remote side can generate data in the right order
841  * without needing a sort at all. However, what we're going to do next is
842  * try to generate pathkeys that seem promising for possible merge joins,
843  * and that's more speculative. A wrong choice might hurt quite a bit, so
844  * bail out if we can't use remote estimates.
845  */
846  if (!fpinfo->use_remote_estimate)
847  return useful_pathkeys_list;
848 
849  /* Get the list of interesting EquivalenceClasses. */
850  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
851 
852  /* Extract unique EC for query, if any, so we don't consider it again. */
853  if (list_length(root->query_pathkeys) == 1)
854  {
855  PathKey *query_pathkey = linitial(root->query_pathkeys);
856 
857  query_ec = query_pathkey->pk_eclass;
858  }
859 
860  /*
861  * As a heuristic, the only pathkeys we consider here are those of length
862  * one. It's surely possible to consider more, but since each one we
863  * choose to consider will generate a round-trip to the remote side, we
864  * need to be a bit cautious here. It would sure be nice to have a local
865  * cache of information about remote index definitions...
866  */
867  foreach(lc, useful_eclass_list)
868  {
869  EquivalenceClass *cur_ec = lfirst(lc);
870  Expr *em_expr;
871  PathKey *pathkey;
872 
873  /* If redundant with what we did above, skip it. */
874  if (cur_ec == query_ec)
875  continue;
876 
877  /* If no pushable expression for this rel, skip it. */
878  em_expr = find_em_expr_for_rel(cur_ec, rel);
879  if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
880  continue;
881 
882  /* Looks like we can generate a pathkey, so let's do it. */
883  pathkey = make_canonical_pathkey(root, cur_ec,
884  linitial_oid(cur_ec->ec_opfamilies),
886  false);
887  useful_pathkeys_list = lappend(useful_pathkeys_list,
888  list_make1(pathkey));
889  }
890 
891  return useful_pathkeys_list;
892 }
893 
894 /*
895  * postgresGetForeignPaths
896  * Create possible scan paths for a scan on the foreign table
897  */
898 static void
900  RelOptInfo *baserel,
901  Oid foreigntableid)
902 {
903  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
904  ForeignPath *path;
905  List *ppi_list;
906  ListCell *lc;
907 
908  /*
909  * Create simplest ForeignScan path node and add it to baserel. This path
910  * corresponds to SeqScan path of regular tables (though depending on what
911  * baserestrict conditions we were able to send to remote, there might
912  * actually be an indexscan happening there). We already did all the work
913  * to estimate cost and size of this path.
914  */
915  path = create_foreignscan_path(root, baserel,
916  NULL, /* default pathtarget */
917  fpinfo->rows,
918  fpinfo->startup_cost,
919  fpinfo->total_cost,
920  NIL, /* no pathkeys */
921  NULL, /* no outer rel either */
922  NULL, /* no extra plan */
923  NIL); /* no fdw_private list */
924  add_path(baserel, (Path *) path);
925 
926  /* Add paths with pathkeys */
927  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
928 
929  /*
930  * If we're not using remote estimates, stop here. We have no way to
931  * estimate whether any join clauses would be worth sending across, so
932  * don't bother building parameterized paths.
933  */
934  if (!fpinfo->use_remote_estimate)
935  return;
936 
937  /*
938  * Thumb through all join clauses for the rel to identify which outer
939  * relations could supply one or more safe-to-send-to-remote join clauses.
940  * We'll build a parameterized path for each such outer relation.
941  *
942  * It's convenient to manage this by representing each candidate outer
943  * relation by the ParamPathInfo node for it. We can then use the
944  * ppi_clauses list in the ParamPathInfo node directly as a list of the
945  * interesting join clauses for that rel. This takes care of the
946  * possibility that there are multiple safe join clauses for such a rel,
947  * and also ensures that we account for unsafe join clauses that we'll
948  * still have to enforce locally (since the parameterized-path machinery
949  * insists that we handle all movable clauses).
950  */
951  ppi_list = NIL;
952  foreach(lc, baserel->joininfo)
953  {
954  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
955  Relids required_outer;
956  ParamPathInfo *param_info;
957 
958  /* Check if clause can be moved to this rel */
959  if (!join_clause_is_movable_to(rinfo, baserel))
960  continue;
961 
962  /* See if it is safe to send to remote */
963  if (!is_foreign_expr(root, baserel, rinfo->clause))
964  continue;
965 
966  /* Calculate required outer rels for the resulting path */
967  required_outer = bms_union(rinfo->clause_relids,
968  baserel->lateral_relids);
969  /* We do not want the foreign rel itself listed in required_outer */
970  required_outer = bms_del_member(required_outer, baserel->relid);
971 
972  /*
973  * required_outer probably can't be empty here, but if it were, we
974  * couldn't make a parameterized path.
975  */
976  if (bms_is_empty(required_outer))
977  continue;
978 
979  /* Get the ParamPathInfo */
980  param_info = get_baserel_parampathinfo(root, baserel,
981  required_outer);
982  Assert(param_info != NULL);
983 
984  /*
985  * Add it to list unless we already have it. Testing pointer equality
986  * is OK since get_baserel_parampathinfo won't make duplicates.
987  */
988  ppi_list = list_append_unique_ptr(ppi_list, param_info);
989  }
990 
991  /*
992  * The above scan examined only "generic" join clauses, not those that
993  * were absorbed into EquivalenceClauses. See if we can make anything out
994  * of EquivalenceClauses.
995  */
996  if (baserel->has_eclass_joins)
997  {
998  /*
999  * We repeatedly scan the eclass list looking for column references
1000  * (or expressions) belonging to the foreign rel. Each time we find
1001  * one, we generate a list of equivalence joinclauses for it, and then
1002  * see if any are safe to send to the remote. Repeat till there are
1003  * no more candidate EC members.
1004  */
1006 
1007  arg.already_used = NIL;
1008  for (;;)
1009  {
1010  List *clauses;
1011 
1012  /* Make clauses, skipping any that join to lateral_referencers */
1013  arg.current = NULL;
1015  baserel,
1017  (void *) &arg,
1018  baserel->lateral_referencers);
1019 
1020  /* Done if there are no more expressions in the foreign rel */
1021  if (arg.current == NULL)
1022  {
1023  Assert(clauses == NIL);
1024  break;
1025  }
1026 
1027  /* Scan the extracted join clauses */
1028  foreach(lc, clauses)
1029  {
1030  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1031  Relids required_outer;
1032  ParamPathInfo *param_info;
1033 
1034  /* Check if clause can be moved to this rel */
1035  if (!join_clause_is_movable_to(rinfo, baserel))
1036  continue;
1037 
1038  /* See if it is safe to send to remote */
1039  if (!is_foreign_expr(root, baserel, rinfo->clause))
1040  continue;
1041 
1042  /* Calculate required outer rels for the resulting path */
1043  required_outer = bms_union(rinfo->clause_relids,
1044  baserel->lateral_relids);
1045  required_outer = bms_del_member(required_outer, baserel->relid);
1046  if (bms_is_empty(required_outer))
1047  continue;
1048 
1049  /* Get the ParamPathInfo */
1050  param_info = get_baserel_parampathinfo(root, baserel,
1051  required_outer);
1052  Assert(param_info != NULL);
1053 
1054  /* Add it to list unless we already have it */
1055  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1056  }
1057 
1058  /* Try again, now ignoring the expression we found this time */
1059  arg.already_used = lappend(arg.already_used, arg.current);
1060  }
1061  }
1062 
1063  /*
1064  * Now build a path for each useful outer relation.
1065  */
1066  foreach(lc, ppi_list)
1067  {
1068  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1069  double rows;
1070  int width;
1071  Cost startup_cost;
1072  Cost total_cost;
1073 
1074  /* Get a cost estimate from the remote */
1075  estimate_path_cost_size(root, baserel,
1076  param_info->ppi_clauses, NIL,
1077  &rows, &width,
1078  &startup_cost, &total_cost);
1079 
1080  /*
1081  * ppi_rows currently won't get looked at by anything, but still we
1082  * may as well ensure that it matches our idea of the rowcount.
1083  */
1084  param_info->ppi_rows = rows;
1085 
1086  /* Make the path */
1087  path = create_foreignscan_path(root, baserel,
1088  NULL, /* default pathtarget */
1089  rows,
1090  startup_cost,
1091  total_cost,
1092  NIL, /* no pathkeys */
1093  param_info->ppi_req_outer,
1094  NULL,
1095  NIL); /* no fdw_private list */
1096  add_path(baserel, (Path *) path);
1097  }
1098 }
1099 
1100 /*
1101  * postgresGetForeignPlan
1102  * Create ForeignScan plan node which implements selected best path
1103  */
1104 static ForeignScan *
1106  RelOptInfo *foreignrel,
1107  Oid foreigntableid,
1108  ForeignPath *best_path,
1109  List *tlist,
1110  List *scan_clauses,
1111  Plan *outer_plan)
1112 {
1113  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1114  Index scan_relid;
1115  List *fdw_private;
1116  List *remote_exprs = NIL;
1117  List *local_exprs = NIL;
1118  List *params_list = NIL;
1119  List *fdw_scan_tlist = NIL;
1120  List *fdw_recheck_quals = NIL;
1122  StringInfoData sql;
1123  ListCell *lc;
1124 
1125  if (IS_SIMPLE_REL(foreignrel))
1126  {
1127  /*
1128  * For base relations, set scan_relid as the relid of the relation.
1129  */
1130  scan_relid = foreignrel->relid;
1131 
1132  /*
1133  * In a base-relation scan, we must apply the given scan_clauses.
1134  *
1135  * Separate the scan_clauses into those that can be executed remotely
1136  * and those that can't. baserestrictinfo clauses that were
1137  * previously determined to be safe or unsafe by classifyConditions
1138  * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1139  * else in the scan_clauses list will be a join clause, which we have
1140  * to check for remote-safety.
1141  *
1142  * Note: the join clauses we see here should be the exact same ones
1143  * previously examined by postgresGetForeignPaths. Possibly it'd be
1144  * worth passing forward the classification work done then, rather
1145  * than repeating it here.
1146  *
1147  * This code must match "extract_actual_clauses(scan_clauses, false)"
1148  * except for the additional decision about remote versus local
1149  * execution.
1150  */
1151  foreach(lc, scan_clauses)
1152  {
1153  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1154 
1155  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1156  if (rinfo->pseudoconstant)
1157  continue;
1158 
1159  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1160  remote_exprs = lappend(remote_exprs, rinfo->clause);
1161  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1162  local_exprs = lappend(local_exprs, rinfo->clause);
1163  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1164  remote_exprs = lappend(remote_exprs, rinfo->clause);
1165  else
1166  local_exprs = lappend(local_exprs, rinfo->clause);
1167  }
1168 
1169  /*
1170  * For a base-relation scan, we have to support EPQ recheck, which
1171  * should recheck all the remote quals.
1172  */
1173  fdw_recheck_quals = remote_exprs;
1174  }
1175  else
1176  {
1177  /*
1178  * Join relation or upper relation - set scan_relid to 0.
1179  */
1180  scan_relid = 0;
1181 
1182  /*
1183  * For a join rel, baserestrictinfo is NIL and we are not considering
1184  * parameterization right now, so there should be no scan_clauses for
1185  * a joinrel or an upper rel either.
1186  */
1187  Assert(!scan_clauses);
1188 
1189  /*
1190  * Instead we get the conditions to apply from the fdw_private
1191  * structure.
1192  */
1193  remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1194  local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1195 
1196  /*
1197  * We leave fdw_recheck_quals empty in this case, since we never need
1198  * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1199  * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1200  * If we're planning an upperrel (ie, remote grouping or aggregation)
1201  * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1202  * allowed, and indeed we *can't* put the remote clauses into
1203  * fdw_recheck_quals because the unaggregated Vars won't be available
1204  * locally.
1205  */
1206 
1207  /* Build the list of columns to be fetched from the foreign server. */
1208  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1209 
1210  /*
1211  * Ensure that the outer plan produces a tuple whose descriptor
1212  * matches our scan tuple slot. This is safe because all scans and
1213  * joins support projection, so we never need to insert a Result node.
1214  * Also, remove the local conditions from outer plan's quals, lest
1215  * they will be evaluated twice, once by the local plan and once by
1216  * the scan.
1217  */
1218  if (outer_plan)
1219  {
1220  ListCell *lc;
1221 
1222  /*
1223  * Right now, we only consider grouping and aggregation beyond
1224  * joins. Queries involving aggregates or grouping do not require
1225  * EPQ mechanism, hence should not have an outer plan here.
1226  */
1227  Assert(!IS_UPPER_REL(foreignrel));
1228 
1229  outer_plan->targetlist = fdw_scan_tlist;
1230 
1231  foreach(lc, local_exprs)
1232  {
1233  Join *join_plan = (Join *) outer_plan;
1234  Node *qual = lfirst(lc);
1235 
1236  outer_plan->qual = list_delete(outer_plan->qual, qual);
1237 
1238  /*
1239  * For an inner join the local conditions of foreign scan plan
1240  * can be part of the joinquals as well.
1241  */
1242  if (join_plan->jointype == JOIN_INNER)
1243  join_plan->joinqual = list_delete(join_plan->joinqual,
1244  qual);
1245  }
1246  }
1247  }
1248 
1249  /*
1250  * Build the query string to be sent for execution, and identify
1251  * expressions to be sent as parameters.
1252  */
1253  initStringInfo(&sql);
1254  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1255  remote_exprs, best_path->path.pathkeys,
1256  false, &retrieved_attrs, &params_list);
1257 
1258  /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1259  fpinfo->final_remote_exprs = remote_exprs;
1260 
1261  /*
1262  * Build the fdw_private list that will be available to the executor.
1263  * Items in the list must match order in enum FdwScanPrivateIndex.
1264  */
1265  fdw_private = list_make3(makeString(sql.data),
1267  makeInteger(fpinfo->fetch_size));
1268  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1269  fdw_private = lappend(fdw_private,
1270  makeString(fpinfo->relation_name->data));
1271 
1272  /*
1273  * Create the ForeignScan node for the given relation.
1274  *
1275  * Note that the remote parameter expressions are stored in the fdw_exprs
1276  * field of the finished plan node; we can't keep them in private state
1277  * because then they wouldn't be subject to later planner processing.
1278  */
1279  return make_foreignscan(tlist,
1280  local_exprs,
1281  scan_relid,
1282  params_list,
1283  fdw_private,
1284  fdw_scan_tlist,
1285  fdw_recheck_quals,
1286  outer_plan);
1287 }
1288 
1289 /*
1290  * postgresBeginForeignScan
1291  * Initiate an executor scan of a foreign PostgreSQL table.
1292  */
1293 static void
1295 {
1296  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1297  EState *estate = node->ss.ps.state;
1298  PgFdwScanState *fsstate;
1299  RangeTblEntry *rte;
1300  Oid userid;
1301  ForeignTable *table;
1302  UserMapping *user;
1303  int rtindex;
1304  int numParams;
1305 
1306  /*
1307  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1308  */
1309  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1310  return;
1311 
1312  /*
1313  * We'll save private state in node->fdw_state.
1314  */
1315  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1316  node->fdw_state = (void *) fsstate;
1317 
1318  /*
1319  * Identify which user to do the remote access as. This should match what
1320  * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1321  * lowest-numbered member RTE as a representative; we would get the same
1322  * result from any.
1323  */
1324  if (fsplan->scan.scanrelid > 0)
1325  rtindex = fsplan->scan.scanrelid;
1326  else
1327  rtindex = bms_next_member(fsplan->fs_relids, -1);
1328  rte = rt_fetch(rtindex, estate->es_range_table);
1329  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1330 
1331  /* Get info about foreign table. */
1332  table = GetForeignTable(rte->relid);
1333  user = GetUserMapping(userid, table->serverid);
1334 
1335  /*
1336  * Get connection to the foreign server. Connection manager will
1337  * establish new connection if necessary.
1338  */
1339  fsstate->conn = GetConnection(user, false);
1340 
1341  /* Assign a unique ID for my cursor */
1342  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1343  fsstate->cursor_exists = false;
1344 
1345  /* Get private info created by planner functions. */
1346  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1348  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1350  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1352 
1353  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1354  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1355  "postgres_fdw tuple data",
1357  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1358  "postgres_fdw temporary data",
1360 
1361  /*
1362  * Get info we'll need for converting data fetched from the foreign server
1363  * into local representation and error reporting during that process.
1364  */
1365  if (fsplan->scan.scanrelid > 0)
1366  {
1367  fsstate->rel = node->ss.ss_currentRelation;
1368  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1369  }
1370  else
1371  {
1372  fsstate->rel = NULL;
1373  fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1374  }
1375 
1376  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1377 
1378  /*
1379  * Prepare for processing of parameters used in remote query, if any.
1380  */
1381  numParams = list_length(fsplan->fdw_exprs);
1382  fsstate->numParams = numParams;
1383  if (numParams > 0)
1385  fsplan->fdw_exprs,
1386  numParams,
1387  &fsstate->param_flinfo,
1388  &fsstate->param_exprs,
1389  &fsstate->param_values);
1390 }
1391 
1392 /*
1393  * postgresIterateForeignScan
1394  * Retrieve next row from the result set, or clear tuple slot to indicate
1395  * EOF.
1396  */
1397 static TupleTableSlot *
1399 {
1400  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1401  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1402 
1403  /*
1404  * If this is the first call after Begin or ReScan, we need to create the
1405  * cursor on the remote side.
1406  */
1407  if (!fsstate->cursor_exists)
1408  create_cursor(node);
1409 
1410  /*
1411  * Get some more tuples, if we've run out.
1412  */
1413  if (fsstate->next_tuple >= fsstate->num_tuples)
1414  {
1415  /* No point in another fetch if we already detected EOF, though. */
1416  if (!fsstate->eof_reached)
1417  fetch_more_data(node);
1418  /* If we didn't get any tuples, must be end of data. */
1419  if (fsstate->next_tuple >= fsstate->num_tuples)
1420  return ExecClearTuple(slot);
1421  }
1422 
1423  /*
1424  * Return the next tuple.
1425  */
1426  ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1427  slot,
1428  InvalidBuffer,
1429  false);
1430 
1431  return slot;
1432 }
1433 
1434 /*
1435  * postgresReScanForeignScan
1436  * Restart the scan.
1437  */
1438 static void
1440 {
1441  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1442  char sql[64];
1443  PGresult *res;
1444 
1445  /* If we haven't created the cursor yet, nothing to do. */
1446  if (!fsstate->cursor_exists)
1447  return;
1448 
1449  /*
1450  * If any internal parameters affecting this node have changed, we'd
1451  * better destroy and recreate the cursor. Otherwise, rewinding it should
1452  * be good enough. If we've only fetched zero or one batch, we needn't
1453  * even rewind the cursor, just rescan what we have.
1454  */
1455  if (node->ss.ps.chgParam != NULL)
1456  {
1457  fsstate->cursor_exists = false;
1458  snprintf(sql, sizeof(sql), "CLOSE c%u",
1459  fsstate->cursor_number);
1460  }
1461  else if (fsstate->fetch_ct_2 > 1)
1462  {
1463  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1464  fsstate->cursor_number);
1465  }
1466  else
1467  {
1468  /* Easy: just rescan what we already have in memory, if anything */
1469  fsstate->next_tuple = 0;
1470  return;
1471  }
1472 
1473  /*
1474  * We don't use a PG_TRY block here, so be careful not to throw error
1475  * without releasing the PGresult.
1476  */
1477  res = pgfdw_exec_query(fsstate->conn, sql);
1478  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1479  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1480  PQclear(res);
1481 
1482  /* Now force a fresh FETCH. */
1483  fsstate->tuples = NULL;
1484  fsstate->num_tuples = 0;
1485  fsstate->next_tuple = 0;
1486  fsstate->fetch_ct_2 = 0;
1487  fsstate->eof_reached = false;
1488 }
1489 
1490 /*
1491  * postgresEndForeignScan
1492  * Finish scanning foreign table and dispose objects used for this scan
1493  */
1494 static void
1496 {
1497  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1498 
1499  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1500  if (fsstate == NULL)
1501  return;
1502 
1503  /* Close the cursor if open, to prevent accumulation of cursors */
1504  if (fsstate->cursor_exists)
1505  close_cursor(fsstate->conn, fsstate->cursor_number);
1506 
1507  /* Release remote connection */
1508  ReleaseConnection(fsstate->conn);
1509  fsstate->conn = NULL;
1510 
1511  /* MemoryContexts will be deleted automatically. */
1512 }
1513 
1514 /*
1515  * postgresAddForeignUpdateTargets
1516  * Add resjunk column(s) needed for update/delete on a foreign table
1517  */
1518 static void
1520  RangeTblEntry *target_rte,
1521  Relation target_relation)
1522 {
1523  Var *var;
1524  const char *attrname;
1525  TargetEntry *tle;
1526 
1527  /*
1528  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1529  */
1530 
1531  /* Make a Var representing the desired value */
1532  var = makeVar(parsetree->resultRelation,
1534  TIDOID,
1535  -1,
1536  InvalidOid,
1537  0);
1538 
1539  /* Wrap it in a resjunk TLE with the right name ... */
1540  attrname = "ctid";
1541 
1542  tle = makeTargetEntry((Expr *) var,
1543  list_length(parsetree->targetList) + 1,
1544  pstrdup(attrname),
1545  true);
1546 
1547  /* ... and add it to the query's targetlist */
1548  parsetree->targetList = lappend(parsetree->targetList, tle);
1549 }
1550 
1551 /*
1552  * postgresPlanForeignModify
1553  * Plan an insert/update/delete operation on a foreign table
1554  */
1555 static List *
1557  ModifyTable *plan,
1558  Index resultRelation,
1559  int subplan_index)
1560 {
1561  CmdType operation = plan->operation;
1562  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1563  Relation rel;
1564  StringInfoData sql;
1565  List *targetAttrs = NIL;
1566  List *returningList = NIL;
1568  bool doNothing = false;
1569 
1570  initStringInfo(&sql);
1571 
1572  /*
1573  * Core code already has some lock on each rel being planned, so we can
1574  * use NoLock here.
1575  */
1576  rel = heap_open(rte->relid, NoLock);
1577 
1578  /*
1579  * In an INSERT, we transmit all columns that are defined in the foreign
1580  * table. In an UPDATE, we transmit only columns that were explicitly
1581  * targets of the UPDATE, so as to avoid unnecessary data transmission.
1582  * (We can't do that for INSERT since we would miss sending default values
1583  * for columns not listed in the source statement.)
1584  */
1585  if (operation == CMD_INSERT)
1586  {
1588  int attnum;
1589 
1590  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1591  {
1592  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1593 
1594  if (!attr->attisdropped)
1595  targetAttrs = lappend_int(targetAttrs, attnum);
1596  }
1597  }
1598  else if (operation == CMD_UPDATE)
1599  {
1600  int col;
1601 
1602  col = -1;
1603  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1604  {
1605  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1607 
1608  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1609  elog(ERROR, "system-column update is not supported");
1610  targetAttrs = lappend_int(targetAttrs, attno);
1611  }
1612  }
1613 
1614  /*
1615  * Extract the relevant RETURNING list if any.
1616  */
1617  if (plan->returningLists)
1618  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1619 
1620  /*
1621  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1622  * should have already been rejected in the optimizer, as presently there
1623  * is no way to recognize an arbiter index on a foreign table. Only DO
1624  * NOTHING is supported without an inference specification.
1625  */
1626  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1627  doNothing = true;
1628  else if (plan->onConflictAction != ONCONFLICT_NONE)
1629  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1630  (int) plan->onConflictAction);
1631 
1632  /*
1633  * Construct the SQL command string.
1634  */
1635  switch (operation)
1636  {
1637  case CMD_INSERT:
1638  deparseInsertSql(&sql, root, resultRelation, rel,
1639  targetAttrs, doNothing, returningList,
1640  &retrieved_attrs);
1641  break;
1642  case CMD_UPDATE:
1643  deparseUpdateSql(&sql, root, resultRelation, rel,
1644  targetAttrs, returningList,
1645  &retrieved_attrs);
1646  break;
1647  case CMD_DELETE:
1648  deparseDeleteSql(&sql, root, resultRelation, rel,
1649  returningList,
1650  &retrieved_attrs);
1651  break;
1652  default:
1653  elog(ERROR, "unexpected operation: %d", (int) operation);
1654  break;
1655  }
1656 
1657  heap_close(rel, NoLock);
1658 
1659  /*
1660  * Build the fdw_private list that will be available to the executor.
1661  * Items in the list must match enum FdwModifyPrivateIndex, above.
1662  */
1663  return list_make4(makeString(sql.data),
1664  targetAttrs,
1665  makeInteger((retrieved_attrs != NIL)),
1666  retrieved_attrs);
1667 }
1668 
1669 /*
1670  * postgresBeginForeignModify
1671  * Begin an insert/update/delete operation on a foreign table
1672  */
1673 static void
1675  ResultRelInfo *resultRelInfo,
1676  List *fdw_private,
1677  int subplan_index,
1678  int eflags)
1679 {
1680  PgFdwModifyState *fmstate;
1681  EState *estate = mtstate->ps.state;
1682  CmdType operation = mtstate->operation;
1683  Relation rel = resultRelInfo->ri_RelationDesc;
1684  RangeTblEntry *rte;
1685  Oid userid;
1686  ForeignTable *table;
1687  UserMapping *user;
1688  AttrNumber n_params;
1689  Oid typefnoid;
1690  bool isvarlena;
1691  ListCell *lc;
1693 
1694  /*
1695  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1696  * stays NULL.
1697  */
1698  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1699  return;
1700 
1701  /* Begin constructing PgFdwModifyState. */
1702  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1703  fmstate->rel = rel;
1704 
1705  /*
1706  * Identify which user to do the remote access as. This should match what
1707  * ExecCheckRTEPerms() does.
1708  */
1709  rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1710  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1711 
1712  /* Get info about foreign table. */
1713  table = GetForeignTable(RelationGetRelid(rel));
1714  user = GetUserMapping(userid, table->serverid);
1715 
1716  /* Open connection; report that we'll create a prepared statement. */
1717  fmstate->conn = GetConnection(user, true);
1718  fmstate->p_name = NULL; /* prepared statement not made yet */
1719 
1720  /* Deconstruct fdw_private data. */
1721  fmstate->query = strVal(list_nth(fdw_private,
1723  fmstate->target_attrs = (List *) list_nth(fdw_private,
1725  fmstate->has_returning = intVal(list_nth(fdw_private,
1727  fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1729 
1730  /* Create context for per-tuple temp workspace. */
1731  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1732  "postgres_fdw temporary data",
1734 
1735  /* Prepare for input conversion of RETURNING results. */
1736  if (fmstate->has_returning)
1737  fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1738 
1739  /* Prepare for output conversion of parameters used in prepared stmt. */
1740  n_params = list_length(fmstate->target_attrs) + 1;
1741  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1742  fmstate->p_nums = 0;
1743 
1744  if (operation == CMD_UPDATE || operation == CMD_DELETE)
1745  {
1746  /* Find the ctid resjunk column in the subplan's result */
1747  Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1748 
1750  "ctid");
1751  if (!AttributeNumberIsValid(fmstate->ctidAttno))
1752  elog(ERROR, "could not find junk ctid column");
1753 
1754  /* First transmittable parameter will be ctid */
1755  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1756  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1757  fmstate->p_nums++;
1758  }
1759 
1760  if (operation == CMD_INSERT || operation == CMD_UPDATE)
1761  {
1762  /* Set up for remaining transmittable parameters */
1763  foreach(lc, fmstate->target_attrs)
1764  {
1765  int attnum = lfirst_int(lc);
1766  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1767 
1768  Assert(!attr->attisdropped);
1769 
1770  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1771  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1772  fmstate->p_nums++;
1773  }
1774  }
1775 
1776  Assert(fmstate->p_nums <= n_params);
1777 
1778  resultRelInfo->ri_FdwState = fmstate;
1779 }
1780 
1781 /*
1782  * postgresExecForeignInsert
1783  * Insert one row into a foreign table
1784  */
1785 static TupleTableSlot *
1787  ResultRelInfo *resultRelInfo,
1788  TupleTableSlot *slot,
1789  TupleTableSlot *planSlot)
1790 {
1791  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1792  const char **p_values;
1793  PGresult *res;
1794  int n_rows;
1795 
1796  /* Set up the prepared statement on the remote server, if we didn't yet */
1797  if (!fmstate->p_name)
1798  prepare_foreign_modify(fmstate);
1799 
1800  /* Convert parameters needed by prepared statement to text form */
1801  p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1802 
1803  /*
1804  * Execute the prepared statement.
1805  */
1806  if (!PQsendQueryPrepared(fmstate->conn,
1807  fmstate->p_name,
1808  fmstate->p_nums,
1809  p_values,
1810  NULL,
1811  NULL,
1812  0))
1813  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1814 
1815  /*
1816  * Get the result, and check for success.
1817  *
1818  * We don't use a PG_TRY block here, so be careful not to throw error
1819  * without releasing the PGresult.
1820  */
1821  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1822  if (PQresultStatus(res) !=
1824  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1825 
1826  /* Check number of rows affected, and fetch RETURNING tuple if any */
1827  if (fmstate->has_returning)
1828  {
1829  n_rows = PQntuples(res);
1830  if (n_rows > 0)
1831  store_returning_result(fmstate, slot, res);
1832  }
1833  else
1834  n_rows = atoi(PQcmdTuples(res));
1835 
1836  /* And clean up */
1837  PQclear(res);
1838 
1839  MemoryContextReset(fmstate->temp_cxt);
1840 
1841  /* Return NULL if nothing was inserted on the remote end */
1842  return (n_rows > 0) ? slot : NULL;
1843 }
1844 
1845 /*
1846  * postgresExecForeignUpdate
1847  * Update one row in a foreign table
1848  */
1849 static TupleTableSlot *
1851  ResultRelInfo *resultRelInfo,
1852  TupleTableSlot *slot,
1853  TupleTableSlot *planSlot)
1854 {
1855  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1856  Datum datum;
1857  bool isNull;
1858  const char **p_values;
1859  PGresult *res;
1860  int n_rows;
1861 
1862  /* Set up the prepared statement on the remote server, if we didn't yet */
1863  if (!fmstate->p_name)
1864  prepare_foreign_modify(fmstate);
1865 
1866  /* Get the ctid that was passed up as a resjunk column */
1867  datum = ExecGetJunkAttribute(planSlot,
1868  fmstate->ctidAttno,
1869  &isNull);
1870  /* shouldn't ever get a null result... */
1871  if (isNull)
1872  elog(ERROR, "ctid is NULL");
1873 
1874  /* Convert parameters needed by prepared statement to text form */
1875  p_values = convert_prep_stmt_params(fmstate,
1876  (ItemPointer) DatumGetPointer(datum),
1877  slot);
1878 
1879  /*
1880  * Execute the prepared statement.
1881  */
1882  if (!PQsendQueryPrepared(fmstate->conn,
1883  fmstate->p_name,
1884  fmstate->p_nums,
1885  p_values,
1886  NULL,
1887  NULL,
1888  0))
1889  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1890 
1891  /*
1892  * Get the result, and check for success.
1893  *
1894  * We don't use a PG_TRY block here, so be careful not to throw error
1895  * without releasing the PGresult.
1896  */
1897  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1898  if (PQresultStatus(res) !=
1900  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1901 
1902  /* Check number of rows affected, and fetch RETURNING tuple if any */
1903  if (fmstate->has_returning)
1904  {
1905  n_rows = PQntuples(res);
1906  if (n_rows > 0)
1907  store_returning_result(fmstate, slot, res);
1908  }
1909  else
1910  n_rows = atoi(PQcmdTuples(res));
1911 
1912  /* And clean up */
1913  PQclear(res);
1914 
1915  MemoryContextReset(fmstate->temp_cxt);
1916 
1917  /* Return NULL if nothing was updated on the remote end */
1918  return (n_rows > 0) ? slot : NULL;
1919 }
1920 
1921 /*
1922  * postgresExecForeignDelete
1923  * Delete one row from a foreign table
1924  */
1925 static TupleTableSlot *
1927  ResultRelInfo *resultRelInfo,
1928  TupleTableSlot *slot,
1929  TupleTableSlot *planSlot)
1930 {
1931  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1932  Datum datum;
1933  bool isNull;
1934  const char **p_values;
1935  PGresult *res;
1936  int n_rows;
1937 
1938  /* Set up the prepared statement on the remote server, if we didn't yet */
1939  if (!fmstate->p_name)
1940  prepare_foreign_modify(fmstate);
1941 
1942  /* Get the ctid that was passed up as a resjunk column */
1943  datum = ExecGetJunkAttribute(planSlot,
1944  fmstate->ctidAttno,
1945  &isNull);
1946  /* shouldn't ever get a null result... */
1947  if (isNull)
1948  elog(ERROR, "ctid is NULL");
1949 
1950  /* Convert parameters needed by prepared statement to text form */
1951  p_values = convert_prep_stmt_params(fmstate,
1952  (ItemPointer) DatumGetPointer(datum),
1953  NULL);
1954 
1955  /*
1956  * Execute the prepared statement.
1957  */
1958  if (!PQsendQueryPrepared(fmstate->conn,
1959  fmstate->p_name,
1960  fmstate->p_nums,
1961  p_values,
1962  NULL,
1963  NULL,
1964  0))
1965  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1966 
1967  /*
1968  * Get the result, and check for success.
1969  *
1970  * We don't use a PG_TRY block here, so be careful not to throw error
1971  * without releasing the PGresult.
1972  */
1973  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1974  if (PQresultStatus(res) !=
1976  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1977 
1978  /* Check number of rows affected, and fetch RETURNING tuple if any */
1979  if (fmstate->has_returning)
1980  {
1981  n_rows = PQntuples(res);
1982  if (n_rows > 0)
1983  store_returning_result(fmstate, slot, res);
1984  }
1985  else
1986  n_rows = atoi(PQcmdTuples(res));
1987 
1988  /* And clean up */
1989  PQclear(res);
1990 
1991  MemoryContextReset(fmstate->temp_cxt);
1992 
1993  /* Return NULL if nothing was deleted on the remote end */
1994  return (n_rows > 0) ? slot : NULL;
1995 }
1996 
1997 /*
1998  * postgresEndForeignModify
1999  * Finish an insert/update/delete operation on a foreign table
2000  */
2001 static void
2003  ResultRelInfo *resultRelInfo)
2004 {
2005  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2006 
2007  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2008  if (fmstate == NULL)
2009  return;
2010 
2011  /* If we created a prepared statement, destroy it */
2012  if (fmstate->p_name)
2013  {
2014  char sql[64];
2015  PGresult *res;
2016 
2017  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
2018 
2019  /*
2020  * We don't use a PG_TRY block here, so be careful not to throw error
2021  * without releasing the PGresult.
2022  */
2023  res = pgfdw_exec_query(fmstate->conn, sql);
2024  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2025  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2026  PQclear(res);
2027  fmstate->p_name = NULL;
2028  }
2029 
2030  /* Release remote connection */
2031  ReleaseConnection(fmstate->conn);
2032  fmstate->conn = NULL;
2033 }
2034 
2035 /*
2036  * postgresIsForeignRelUpdatable
2037  * Determine whether a foreign table supports INSERT, UPDATE and/or
2038  * DELETE.
2039  */
2040 static int
2042 {
2043  bool updatable;
2044  ForeignTable *table;
2045  ForeignServer *server;
2046  ListCell *lc;
2047 
2048  /*
2049  * By default, all postgres_fdw foreign tables are assumed updatable. This
2050  * can be overridden by a per-server setting, which in turn can be
2051  * overridden by a per-table setting.
2052  */
2053  updatable = true;
2054 
2055  table = GetForeignTable(RelationGetRelid(rel));
2056  server = GetForeignServer(table->serverid);
2057 
2058  foreach(lc, server->options)
2059  {
2060  DefElem *def = (DefElem *) lfirst(lc);
2061 
2062  if (strcmp(def->defname, "updatable") == 0)
2063  updatable = defGetBoolean(def);
2064  }
2065  foreach(lc, table->options)
2066  {
2067  DefElem *def = (DefElem *) lfirst(lc);
2068 
2069  if (strcmp(def->defname, "updatable") == 0)
2070  updatable = defGetBoolean(def);
2071  }
2072 
2073  /*
2074  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2075  */
2076  return updatable ?
2077  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2078 }
2079 
2080 /*
2081  * postgresRecheckForeignScan
2082  * Execute a local join execution plan for a foreign join
2083  */
2084 static bool
2086 {
2087  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2089  TupleTableSlot *result;
2090 
2091  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2092  if (scanrelid > 0)
2093  return true;
2094 
2095  Assert(outerPlan != NULL);
2096 
2097  /* Execute a local join execution plan */
2098  result = ExecProcNode(outerPlan);
2099  if (TupIsNull(result))
2100  return false;
2101 
2102  /* Store result in the given slot */
2103  ExecCopySlot(slot, result);
2104 
2105  return true;
2106 }
2107 
2108 /*
2109  * postgresPlanDirectModify
2110  * Consider a direct foreign table modification
2111  *
2112  * Decide whether it is safe to modify a foreign table directly, and if so,
2113  * rewrite subplan accordingly.
2114  */
2115 static bool
2117  ModifyTable *plan,
2118  Index resultRelation,
2119  int subplan_index)
2120 {
2121  CmdType operation = plan->operation;
2122  Plan *subplan;
2123  RelOptInfo *foreignrel;
2124  RangeTblEntry *rte;
2125  PgFdwRelationInfo *fpinfo;
2126  Relation rel;
2127  StringInfoData sql;
2128  ForeignScan *fscan;
2129  List *targetAttrs = NIL;
2130  List *remote_exprs;
2131  List *params_list = NIL;
2132  List *returningList = NIL;
2134 
2135  /*
2136  * Decide whether it is safe to modify a foreign table directly.
2137  */
2138 
2139  /*
2140  * The table modification must be an UPDATE or DELETE.
2141  */
2142  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2143  return false;
2144 
2145  /*
2146  * It's unsafe to modify a foreign table directly if there are any local
2147  * joins needed.
2148  */
2149  subplan = (Plan *) list_nth(plan->plans, subplan_index);
2150  if (!IsA(subplan, ForeignScan))
2151  return false;
2152  fscan = (ForeignScan *) subplan;
2153 
2154  /*
2155  * It's unsafe to modify a foreign table directly if there are any quals
2156  * that should be evaluated locally.
2157  */
2158  if (subplan->qual != NIL)
2159  return false;
2160 
2161  /* Safe to fetch data about the target foreign rel */
2162  if (fscan->scan.scanrelid == 0)
2163  {
2164  foreignrel = find_join_rel(root, fscan->fs_relids);
2165  /* We should have a rel for this foreign join. */
2166  Assert(foreignrel);
2167  }
2168  else
2169  foreignrel = root->simple_rel_array[resultRelation];
2170  rte = root->simple_rte_array[resultRelation];
2171  fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2172 
2173  /*
2174  * It's unsafe to update a foreign table directly, if any expressions to
2175  * assign to the target columns are unsafe to evaluate remotely.
2176  */
2177  if (operation == CMD_UPDATE)
2178  {
2179  int col;
2180 
2181  /*
2182  * We transmit only columns that were explicitly targets of the
2183  * UPDATE, so as to avoid unnecessary data transmission.
2184  */
2185  col = -1;
2186  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2187  {
2188  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2190  TargetEntry *tle;
2191 
2192  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2193  elog(ERROR, "system-column update is not supported");
2194 
2195  tle = get_tle_by_resno(subplan->targetlist, attno);
2196 
2197  if (!tle)
2198  elog(ERROR, "attribute number %d not found in subplan targetlist",
2199  attno);
2200 
2201  if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2202  return false;
2203 
2204  targetAttrs = lappend_int(targetAttrs, attno);
2205  }
2206  }
2207 
2208  /*
2209  * Ok, rewrite subplan so as to modify the foreign table directly.
2210  */
2211  initStringInfo(&sql);
2212 
2213  /*
2214  * Core code already has some lock on each rel being planned, so we can
2215  * use NoLock here.
2216  */
2217  rel = heap_open(rte->relid, NoLock);
2218 
2219  /*
2220  * Recall the qual clauses that must be evaluated remotely. (These are
2221  * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2222  * doesn't care.)
2223  */
2224  remote_exprs = fpinfo->final_remote_exprs;
2225 
2226  /*
2227  * Extract the relevant RETURNING list if any.
2228  */
2229  if (plan->returningLists)
2230  {
2231  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2232 
2233  /*
2234  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2235  * we fetch from the foreign server any Vars specified in RETURNING
2236  * that refer not only to the target relation but to non-target
2237  * relations. So we'll deparse them into the RETURNING clause of the
2238  * remote query; use a targetlist consisting of them instead, which
2239  * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2240  * node below.
2241  */
2242  if (fscan->scan.scanrelid == 0)
2243  returningList = build_remote_returning(resultRelation, rel,
2244  returningList);
2245  }
2246 
2247  /*
2248  * Construct the SQL command string.
2249  */
2250  switch (operation)
2251  {
2252  case CMD_UPDATE:
2253  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2254  foreignrel,
2255  ((Plan *) fscan)->targetlist,
2256  targetAttrs,
2257  remote_exprs, &params_list,
2258  returningList, &retrieved_attrs);
2259  break;
2260  case CMD_DELETE:
2261  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2262  foreignrel,
2263  remote_exprs, &params_list,
2264  returningList, &retrieved_attrs);
2265  break;
2266  default:
2267  elog(ERROR, "unexpected operation: %d", (int) operation);
2268  break;
2269  }
2270 
2271  /*
2272  * Update the operation info.
2273  */
2274  fscan->operation = operation;
2275 
2276  /*
2277  * Update the fdw_exprs list that will be available to the executor.
2278  */
2279  fscan->fdw_exprs = params_list;
2280 
2281  /*
2282  * Update the fdw_private list that will be available to the executor.
2283  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2284  */
2285  fscan->fdw_private = list_make4(makeString(sql.data),
2286  makeInteger((retrieved_attrs != NIL)),
2287  retrieved_attrs,
2288  makeInteger(plan->canSetTag));
2289 
2290  /*
2291  * Update the foreign-join-related fields.
2292  */
2293  if (fscan->scan.scanrelid == 0)
2294  {
2295  /* No need for the outer subplan. */
2296  fscan->scan.plan.lefttree = NULL;
2297 
2298  /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2299  if (returningList)
2300  rebuild_fdw_scan_tlist(fscan, returningList);
2301  }
2302 
2303  heap_close(rel, NoLock);
2304  return true;
2305 }
2306 
2307 /*
2308  * postgresBeginDirectModify
2309  * Prepare a direct foreign table modification
2310  */
2311 static void
2313 {
2314  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2315  EState *estate = node->ss.ps.state;
2316  PgFdwDirectModifyState *dmstate;
2317  Index rtindex;
2318  RangeTblEntry *rte;
2319  Oid userid;
2320  ForeignTable *table;
2321  UserMapping *user;
2322  int numParams;
2323 
2324  /*
2325  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2326  */
2327  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2328  return;
2329 
2330  /*
2331  * We'll save private state in node->fdw_state.
2332  */
2333  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2334  node->fdw_state = (void *) dmstate;
2335 
2336  /*
2337  * Identify which user to do the remote access as. This should match what
2338  * ExecCheckRTEPerms() does.
2339  */
2340  rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2341  rte = rt_fetch(rtindex, estate->es_range_table);
2342  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2343 
2344  /* Get info about foreign table. */
2345  if (fsplan->scan.scanrelid == 0)
2346  dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2347  else
2348  dmstate->rel = node->ss.ss_currentRelation;
2349  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2350  user = GetUserMapping(userid, table->serverid);
2351 
2352  /*
2353  * Get connection to the foreign server. Connection manager will
2354  * establish new connection if necessary.
2355  */
2356  dmstate->conn = GetConnection(user, false);
2357 
2358  /* Update the foreign-join-related fields. */
2359  if (fsplan->scan.scanrelid == 0)
2360  {
2361  /* Save info about foreign table. */
2362  dmstate->resultRel = dmstate->rel;
2363 
2364  /*
2365  * Set dmstate->rel to NULL to teach get_returning_data() and
2366  * make_tuple_from_result_row() that columns fetched from the remote
2367  * server are described by fdw_scan_tlist of the foreign-scan plan
2368  * node, not the tuple descriptor for the target relation.
2369  */
2370  dmstate->rel = NULL;
2371  }
2372 
2373  /* Initialize state variable */
2374  dmstate->num_tuples = -1; /* -1 means not set yet */
2375 
2376  /* Get private info created by planner functions. */
2377  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2379  dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2381  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2383  dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2385 
2386  /* Create context for per-tuple temp workspace. */
2387  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2388  "postgres_fdw temporary data",
2390 
2391  /* Prepare for input conversion of RETURNING results. */
2392  if (dmstate->has_returning)
2393  {
2395 
2396  if (fsplan->scan.scanrelid == 0)
2397  tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2398  else
2399  tupdesc = RelationGetDescr(dmstate->rel);
2400 
2401  dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2402 
2403  /*
2404  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2405  * initialize a filter to extract an updated/deleted tuple from a scan
2406  * tuple.
2407  */
2408  if (fsplan->scan.scanrelid == 0)
2409  init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2410  }
2411 
2412  /*
2413  * Prepare for processing of parameters used in remote query, if any.
2414  */
2415  numParams = list_length(fsplan->fdw_exprs);
2416  dmstate->numParams = numParams;
2417  if (numParams > 0)
2419  fsplan->fdw_exprs,
2420  numParams,
2421  &dmstate->param_flinfo,
2422  &dmstate->param_exprs,
2423  &dmstate->param_values);
2424 }
2425 
2426 /*
2427  * postgresIterateDirectModify
2428  * Execute a direct foreign table modification
2429  */
2430 static TupleTableSlot *
2432 {
2434  EState *estate = node->ss.ps.state;
2435  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2436 
2437  /*
2438  * If this is the first call after Begin, execute the statement.
2439  */
2440  if (dmstate->num_tuples == -1)
2441  execute_dml_stmt(node);
2442 
2443  /*
2444  * If the local query doesn't specify RETURNING, just clear tuple slot.
2445  */
2446  if (!resultRelInfo->ri_projectReturning)
2447  {
2448  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2449  Instrumentation *instr = node->ss.ps.instrument;
2450 
2451  Assert(!dmstate->has_returning);
2452 
2453  /* Increment the command es_processed count if necessary. */
2454  if (dmstate->set_processed)
2455  estate->es_processed += dmstate->num_tuples;
2456 
2457  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2458  if (instr)
2459  instr->tuplecount += dmstate->num_tuples;
2460 
2461  return ExecClearTuple(slot);
2462  }
2463 
2464  /*
2465  * Get the next RETURNING tuple.
2466  */
2467  return get_returning_data(node);
2468 }
2469 
2470 /*
2471  * postgresEndDirectModify
2472  * Finish a direct foreign table modification
2473  */
2474 static void
2476 {
2478 
2479  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2480  if (dmstate == NULL)
2481  return;
2482 
2483  /* Release PGresult */
2484  if (dmstate->result)
2485  PQclear(dmstate->result);
2486 
2487  /* Release remote connection */
2488  ReleaseConnection(dmstate->conn);
2489  dmstate->conn = NULL;
2490 
2491  /* close the target relation. */
2492  if (dmstate->resultRel)
2493  ExecCloseScanRelation(dmstate->resultRel);
2494 
2495  /* MemoryContext will be deleted automatically. */
2496 }
2497 
2498 /*
2499  * postgresExplainForeignScan
2500  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2501  */
2502 static void
2504 {
2505  List *fdw_private;
2506  char *sql;
2507  char *relations;
2508 
2509  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2510 
2511  /*
2512  * Add names of relation handled by the foreign scan when the scan is a
2513  * join
2514  */
2515  if (list_length(fdw_private) > FdwScanPrivateRelations)
2516  {
2517  relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2518  ExplainPropertyText("Relations", relations, es);
2519  }
2520 
2521  /*
2522  * Add remote query, when VERBOSE option is specified.
2523  */
2524  if (es->verbose)
2525  {
2526  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2527  ExplainPropertyText("Remote SQL", sql, es);
2528  }
2529 }
2530 
2531 /*
2532  * postgresExplainForeignModify
2533  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2534  */
2535 static void
2537  ResultRelInfo *rinfo,
2538  List *fdw_private,
2539  int subplan_index,
2540  ExplainState *es)
2541 {
2542  if (es->verbose)
2543  {
2544  char *sql = strVal(list_nth(fdw_private,
2546 
2547  ExplainPropertyText("Remote SQL", sql, es);
2548  }
2549 }
2550 
2551 /*
2552  * postgresExplainDirectModify
2553  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2554  * foreign table directly
2555  */
2556 static void
2558 {
2559  List *fdw_private;
2560  char *sql;
2561 
2562  if (es->verbose)
2563  {
2564  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2565  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2566  ExplainPropertyText("Remote SQL", sql, es);
2567  }
2568 }
2569 
2570 
2571 /*
2572  * estimate_path_cost_size
2573  * Get cost and size estimates for a foreign scan on given foreign relation
2574  * either a base relation or a join between foreign relations or an upper
2575  * relation containing foreign relations.
2576  *
2577  * param_join_conds are the parameterization clauses with outer relations.
2578  * pathkeys specify the expected sort order if any for given path being costed.
2579  *
2580  * The function returns the cost and size estimates in p_row, p_width,
2581  * p_startup_cost and p_total_cost variables.
2582  */
2583 static void
2585  RelOptInfo *foreignrel,
2586  List *param_join_conds,
2587  List *pathkeys,
2588  double *p_rows, int *p_width,
2589  Cost *p_startup_cost, Cost *p_total_cost)
2590 {
2591  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2592  double rows;
2593  double retrieved_rows;
2594  int width;
2595  Cost startup_cost;
2596  Cost total_cost;
2597  Cost cpu_per_tuple;
2598 
2599  /*
2600  * If the table or the server is configured to use remote estimates,
2601  * connect to the foreign server and execute EXPLAIN to estimate the
2602  * number of rows selected by the restriction+join clauses. Otherwise,
2603  * estimate rows using whatever statistics we have locally, in a way
2604  * similar to ordinary tables.
2605  */
2606  if (fpinfo->use_remote_estimate)
2607  {
2608  List *remote_param_join_conds;
2609  List *local_param_join_conds;
2610  StringInfoData sql;
2611  PGconn *conn;
2612  Selectivity local_sel;
2613  QualCost local_cost;
2614  List *fdw_scan_tlist = NIL;
2615  List *remote_conds;
2616 
2617  /* Required only to be passed to deparseSelectStmtForRel */
2619 
2620  /*
2621  * param_join_conds might contain both clauses that are safe to send
2622  * across, and clauses that aren't.
2623  */
2624  classifyConditions(root, foreignrel, param_join_conds,
2625  &remote_param_join_conds, &local_param_join_conds);
2626 
2627  /* Build the list of columns to be fetched from the foreign server. */
2628  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2629  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2630  else
2631  fdw_scan_tlist = NIL;
2632 
2633  /*
2634  * The complete list of remote conditions includes everything from
2635  * baserestrictinfo plus any extra join_conds relevant to this
2636  * particular path.
2637  */
2638  remote_conds = list_concat(list_copy(remote_param_join_conds),
2639  fpinfo->remote_conds);
2640 
2641  /*
2642  * Construct EXPLAIN query including the desired SELECT, FROM, and
2643  * WHERE clauses. Params and other-relation Vars are replaced by dummy
2644  * values, so don't request params_list.
2645  */
2646  initStringInfo(&sql);
2647  appendStringInfoString(&sql, "EXPLAIN ");
2648  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2649  remote_conds, pathkeys, false,
2650  &retrieved_attrs, NULL);
2651 
2652  /* Get the remote estimate */
2653  conn = GetConnection(fpinfo->user, false);
2654  get_remote_estimate(sql.data, conn, &rows, &width,
2655  &startup_cost, &total_cost);
2656  ReleaseConnection(conn);
2657 
2658  retrieved_rows = rows;
2659 
2660  /* Factor in the selectivity of the locally-checked quals */
2661  local_sel = clauselist_selectivity(root,
2662  local_param_join_conds,
2663  foreignrel->relid,
2664  JOIN_INNER,
2665  NULL);
2666  local_sel *= fpinfo->local_conds_sel;
2667 
2668  rows = clamp_row_est(rows * local_sel);
2669 
2670  /* Add in the eval cost of the locally-checked quals */
2671  startup_cost += fpinfo->local_conds_cost.startup;
2672  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2673  cost_qual_eval(&local_cost, local_param_join_conds, root);
2674  startup_cost += local_cost.startup;
2675  total_cost += local_cost.per_tuple * retrieved_rows;
2676  }
2677  else
2678  {
2679  Cost run_cost = 0;
2680 
2681  /*
2682  * We don't support join conditions in this mode (hence, no
2683  * parameterized paths can be made).
2684  */
2685  Assert(param_join_conds == NIL);
2686 
2687  /*
2688  * Use rows/width estimates made by set_baserel_size_estimates() for
2689  * base foreign relations and set_joinrel_size_estimates() for join
2690  * between foreign relations.
2691  */
2692  rows = foreignrel->rows;
2693  width = foreignrel->reltarget->width;
2694 
2695  /* Back into an estimate of the number of retrieved rows. */
2696  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2697 
2698  /*
2699  * We will come here again and again with different set of pathkeys
2700  * that caller wants to cost. We don't need to calculate the cost of
2701  * bare scan each time. Instead, use the costs if we have cached them
2702  * already.
2703  */
2704  if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2705  {
2706  startup_cost = fpinfo->rel_startup_cost;
2707  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2708  }
2709  else if (IS_JOIN_REL(foreignrel))
2710  {
2711  PgFdwRelationInfo *fpinfo_i;
2712  PgFdwRelationInfo *fpinfo_o;
2713  QualCost join_cost;
2714  QualCost remote_conds_cost;
2715  double nrows;
2716 
2717  /* For join we expect inner and outer relations set */
2718  Assert(fpinfo->innerrel && fpinfo->outerrel);
2719 
2720  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2721  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2722 
2723  /* Estimate of number of rows in cross product */
2724  nrows = fpinfo_i->rows * fpinfo_o->rows;
2725  /* Clamp retrieved rows estimate to at most size of cross product */
2726  retrieved_rows = Min(retrieved_rows, nrows);
2727 
2728  /*
2729  * The cost of foreign join is estimated as cost of generating
2730  * rows for the joining relations + cost for applying quals on the
2731  * rows.
2732  */
2733 
2734  /*
2735  * Calculate the cost of clauses pushed down to the foreign server
2736  */
2737  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2738  /* Calculate the cost of applying join clauses */
2739  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2740 
2741  /*
2742  * Startup cost includes startup cost of joining relations and the
2743  * startup cost for join and other clauses. We do not include the
2744  * startup cost specific to join strategy (e.g. setting up hash
2745  * tables) since we do not know what strategy the foreign server
2746  * is going to use.
2747  */
2748  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2749  startup_cost += join_cost.startup;
2750  startup_cost += remote_conds_cost.startup;
2751  startup_cost += fpinfo->local_conds_cost.startup;
2752 
2753  /*
2754  * Run time cost includes:
2755  *
2756  * 1. Run time cost (total_cost - startup_cost) of relations being
2757  * joined
2758  *
2759  * 2. Run time cost of applying join clauses on the cross product
2760  * of the joining relations.
2761  *
2762  * 3. Run time cost of applying pushed down other clauses on the
2763  * result of join
2764  *
2765  * 4. Run time cost of applying nonpushable other clauses locally
2766  * on the result fetched from the foreign server.
2767  */
2768  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2769  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2770  run_cost += nrows * join_cost.per_tuple;
2771  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2772  run_cost += nrows * remote_conds_cost.per_tuple;
2773  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2774  }
2775  else if (IS_UPPER_REL(foreignrel))
2776  {
2777  PgFdwRelationInfo *ofpinfo;
2778  PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG];
2779  AggClauseCosts aggcosts;
2780  double input_rows;
2781  int numGroupCols;
2782  double numGroups = 1;
2783 
2784  /*
2785  * This cost model is mixture of costing done for sorted and
2786  * hashed aggregates in cost_agg(). We are not sure which
2787  * strategy will be considered at remote side, thus for
2788  * simplicity, we put all startup related costs in startup_cost
2789  * and all finalization and run cost are added in total_cost.
2790  *
2791  * Also, core does not care about costing HAVING expressions and
2792  * adding that to the costs. So similarly, here too we are not
2793  * considering remote and local conditions for costing.
2794  */
2795 
2796  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2797 
2798  /* Get rows and width from input rel */
2799  input_rows = ofpinfo->rows;
2800  width = ofpinfo->width;
2801 
2802  /* Collect statistics about aggregates for estimating costs. */
2803  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2804  if (root->parse->hasAggs)
2805  {
2806  get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2807  AGGSPLIT_SIMPLE, &aggcosts);
2808  get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2809  AGGSPLIT_SIMPLE, &aggcosts);
2810  }
2811 
2812  /* Get number of grouping columns and possible number of groups */
2813  numGroupCols = list_length(root->parse->groupClause);
2814  numGroups = estimate_num_groups(root,
2816  fpinfo->grouped_tlist),
2817  input_rows, NULL);
2818 
2819  /*
2820  * Number of rows expected from foreign server will be same as
2821  * that of number of groups.
2822  */
2823  rows = retrieved_rows = numGroups;
2824 
2825  /*-----
2826  * Startup cost includes:
2827  * 1. Startup cost for underneath input * relation
2828  * 2. Cost of performing aggregation, per cost_agg()
2829  * 3. Startup cost for PathTarget eval
2830  *-----
2831  */
2832  startup_cost = ofpinfo->rel_startup_cost;
2833  startup_cost += aggcosts.transCost.startup;
2834  startup_cost += aggcosts.transCost.per_tuple * input_rows;
2835  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2836  startup_cost += ptarget->cost.startup;
2837 
2838  /*-----
2839  * Run time cost includes:
2840  * 1. Run time cost of underneath input relation
2841  * 2. Run time cost of performing aggregation, per cost_agg()
2842  * 3. PathTarget eval cost for each output row
2843  *-----
2844  */
2845  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2846  run_cost += aggcosts.finalCost * numGroups;
2847  run_cost += cpu_tuple_cost * numGroups;
2848  run_cost += ptarget->cost.per_tuple * numGroups;
2849  }
2850  else
2851  {
2852  /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2853  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2854 
2855  /*
2856  * Cost as though this were a seqscan, which is pessimistic. We
2857  * effectively imagine the local_conds are being evaluated
2858  * remotely, too.
2859  */
2860  startup_cost = 0;
2861  run_cost = 0;
2862  run_cost += seq_page_cost * foreignrel->pages;
2863 
2864  startup_cost += foreignrel->baserestrictcost.startup;
2865  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2866  run_cost += cpu_per_tuple * foreignrel->tuples;
2867  }
2868 
2869  /*
2870  * Without remote estimates, we have no real way to estimate the cost
2871  * of generating sorted output. It could be free if the query plan
2872  * the remote side would have chosen generates properly-sorted output
2873  * anyway, but in most cases it will cost something. Estimate a value
2874  * high enough that we won't pick the sorted path when the ordering
2875  * isn't locally useful, but low enough that we'll err on the side of
2876  * pushing down the ORDER BY clause when it's useful to do so.
2877  */
2878  if (pathkeys != NIL)
2879  {
2880  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2881  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2882  }
2883 
2884  total_cost = startup_cost + run_cost;
2885  }
2886 
2887  /*
2888  * Cache the costs for scans without any pathkeys or parameterization
2889  * before adding the costs for transferring data from the foreign server.
2890  * These costs are useful for costing the join between this relation and
2891  * another foreign relation or to calculate the costs of paths with
2892  * pathkeys for this relation, when the costs can not be obtained from the
2893  * foreign server. This function will be called at least once for every
2894  * foreign relation without pathkeys and parameterization.
2895  */
2896  if (pathkeys == NIL && param_join_conds == NIL)
2897  {
2898  fpinfo->rel_startup_cost = startup_cost;
2899  fpinfo->rel_total_cost = total_cost;
2900  }
2901 
2902  /*
2903  * Add some additional cost factors to account for connection overhead
2904  * (fdw_startup_cost), transferring data across the network
2905  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2906  * (cpu_tuple_cost per retrieved row).
2907  */
2908  startup_cost += fpinfo->fdw_startup_cost;
2909  total_cost += fpinfo->fdw_startup_cost;
2910  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2911  total_cost += cpu_tuple_cost * retrieved_rows;
2912 
2913  /* Return results. */
2914  *p_rows = rows;
2915  *p_width = width;
2916  *p_startup_cost = startup_cost;
2917  *p_total_cost = total_cost;
2918 }
2919 
2920 /*
2921  * Estimate costs of executing a SQL statement remotely.
2922  * The given "sql" must be an EXPLAIN command.
2923  */
2924 static void
2925 get_remote_estimate(const char *sql, PGconn *conn,
2926  double *rows, int *width,
2927  Cost *startup_cost, Cost *total_cost)
2928 {
2929  PGresult *volatile res = NULL;
2930 
2931  /* PGresult must be released before leaving this function. */
2932  PG_TRY();
2933  {
2934  char *line;
2935  char *p;
2936  int n;
2937 
2938  /*
2939  * Execute EXPLAIN remotely.
2940  */
2941  res = pgfdw_exec_query(conn, sql);
2942  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2943  pgfdw_report_error(ERROR, res, conn, false, sql);
2944 
2945  /*
2946  * Extract cost numbers for topmost plan node. Note we search for a
2947  * left paren from the end of the line to avoid being confused by
2948  * other uses of parentheses.
2949  */
2950  line = PQgetvalue(res, 0, 0);
2951  p = strrchr(line, '(');
2952  if (p == NULL)
2953  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2954  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2955  startup_cost, total_cost, rows, width);
2956  if (n != 4)
2957  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2958 
2959  PQclear(res);
2960  res = NULL;
2961  }
2962  PG_CATCH();
2963  {
2964  if (res)
2965  PQclear(res);
2966  PG_RE_THROW();
2967  }
2968  PG_END_TRY();
2969 }
2970 
2971 /*
2972  * Detect whether we want to process an EquivalenceClass member.
2973  *
2974  * This is a callback for use by generate_implied_equalities_for_column.
2975  */
2976 static bool
2979  void *arg)
2980 {
2982  Expr *expr = em->em_expr;
2983 
2984  /*
2985  * If we've identified what we're processing in the current scan, we only
2986  * want to match that expression.
2987  */
2988  if (state->current != NULL)
2989  return equal(expr, state->current);
2990 
2991  /*
2992  * Otherwise, ignore anything we've already processed.
2993  */
2994  if (list_member(state->already_used, expr))
2995  return false;
2996 
2997  /* This is the new target to process. */
2998  state->current = expr;
2999  return true;
3000 }
3001 
3002 /*
3003  * Create cursor for node's query with current parameter values.
3004  */
3005 static void
3007 {
3008  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3009  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3010  int numParams = fsstate->numParams;
3011  const char **values = fsstate->param_values;
3012  PGconn *conn = fsstate->conn;
3014  PGresult *res;
3015 
3016  /*
3017  * Construct array of query parameter values in text format. We do the
3018  * conversions in the short-lived per-tuple context, so as not to cause a
3019  * memory leak over repeated scans.
3020  */
3021  if (numParams > 0)
3022  {
3023  MemoryContext oldcontext;
3024 
3025  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3026 
3027  process_query_params(econtext,
3028  fsstate->param_flinfo,
3029  fsstate->param_exprs,
3030  values);
3031 
3032  MemoryContextSwitchTo(oldcontext);
3033  }
3034 
3035  /* Construct the DECLARE CURSOR command */
3036  initStringInfo(&buf);
3037  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3038  fsstate->cursor_number, fsstate->query);
3039 
3040  /*
3041  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3042  * to infer types for all parameters. Since we explicitly cast every
3043  * parameter (see deparse.c), the "inference" is trivial and will produce
3044  * the desired result. This allows us to avoid assuming that the remote
3045  * server has the same OIDs we do for the parameters' types.
3046  */
3047  if (!PQsendQueryParams(conn, buf.data, numParams,
3048  NULL, values, NULL, NULL, 0))
3049  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3050 
3051  /*
3052  * Get the result, and check for success.
3053  *
3054  * We don't use a PG_TRY block here, so be careful not to throw error
3055  * without releasing the PGresult.
3056  */
3057  res = pgfdw_get_result(conn, buf.data);
3058  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3059  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3060  PQclear(res);
3061 
3062  /* Mark the cursor as created, and show no tuples have been retrieved */
3063  fsstate->cursor_exists = true;
3064  fsstate->tuples = NULL;
3065  fsstate->num_tuples = 0;
3066  fsstate->next_tuple = 0;
3067  fsstate->fetch_ct_2 = 0;
3068  fsstate->eof_reached = false;
3069 
3070  /* Clean up */
3071  pfree(buf.data);
3072 }
3073 
3074 /*
3075  * Fetch some more rows from the node's cursor.
3076  */
3077 static void
3079 {
3080  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3081  PGresult *volatile res = NULL;
3082  MemoryContext oldcontext;
3083 
3084  /*
3085  * We'll store the tuples in the batch_cxt. First, flush the previous
3086  * batch.
3087  */
3088  fsstate->tuples = NULL;
3089  MemoryContextReset(fsstate->batch_cxt);
3090  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3091 
3092  /* PGresult must be released before leaving this function. */
3093  PG_TRY();
3094  {
3095  PGconn *conn = fsstate->conn;
3096  char sql[64];
3097  int numrows;
3098  int i;
3099 
3100  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3101  fsstate->fetch_size, fsstate->cursor_number);
3102 
3103  res = pgfdw_exec_query(conn, sql);
3104  /* On error, report the original query, not the FETCH. */
3105  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3106  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3107 
3108  /* Convert the data into HeapTuples */
3109  numrows = PQntuples(res);
3110  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3111  fsstate->num_tuples = numrows;
3112  fsstate->next_tuple = 0;
3113 
3114  for (i = 0; i < numrows; i++)
3115  {
3116  Assert(IsA(node->ss.ps.plan, ForeignScan));
3117 
3118  fsstate->tuples[i] =
3120  fsstate->rel,
3121  fsstate->attinmeta,
3122  fsstate->retrieved_attrs,
3123  node,
3124  fsstate->temp_cxt);
3125  }
3126 
3127  /* Update fetch_ct_2 */
3128  if (fsstate->fetch_ct_2 < 2)
3129  fsstate->fetch_ct_2++;
3130 
3131  /* Must be EOF if we didn't get as many tuples as we asked for. */
3132  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3133 
3134  PQclear(res);
3135  res = NULL;
3136  }
3137  PG_CATCH();
3138  {
3139  if (res)
3140  PQclear(res);
3141  PG_RE_THROW();
3142  }
3143  PG_END_TRY();
3144 
3145  MemoryContextSwitchTo(oldcontext);
3146 }
3147 
3148 /*
3149  * Force assorted GUC parameters to settings that ensure that we'll output
3150  * data values in a form that is unambiguous to the remote server.
3151  *
3152  * This is rather expensive and annoying to do once per row, but there's
3153  * little choice if we want to be sure values are transmitted accurately;
3154  * we can't leave the settings in place between rows for fear of affecting
3155  * user-visible computations.
3156  *
3157  * We use the equivalent of a function SET option to allow the settings to
3158  * persist only until the caller calls reset_transmission_modes(). If an
3159  * error is thrown in between, guc.c will take care of undoing the settings.
3160  *
3161  * The return value is the nestlevel that must be passed to
3162  * reset_transmission_modes() to undo things.
3163  */
3164 int
3166 {
3167  int nestlevel = NewGUCNestLevel();
3168 
3169  /*
3170  * The values set here should match what pg_dump does. See also
3171  * configure_remote_session in connection.c.
3172  */
3173  if (DateStyle != USE_ISO_DATES)
3174  (void) set_config_option("datestyle", "ISO",
3176  GUC_ACTION_SAVE, true, 0, false);
3178  (void) set_config_option("intervalstyle", "postgres",
3180  GUC_ACTION_SAVE, true, 0, false);
3181  if (extra_float_digits < 3)
3182  (void) set_config_option("extra_float_digits", "3",
3184  GUC_ACTION_SAVE, true, 0, false);
3185 
3186  return nestlevel;
3187 }
3188 
3189 /*
3190  * Undo the effects of set_transmission_modes().
3191  */
3192 void
3194 {
3195  AtEOXact_GUC(true, nestlevel);
3196 }
3197 
3198 /*
3199  * Utility routine to close a cursor.
3200  */
3201 static void
3203 {
3204  char sql[64];
3205  PGresult *res;
3206 
3207  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3208 
3209  /*
3210  * We don't use a PG_TRY block here, so be careful not to throw error
3211  * without releasing the PGresult.
3212  */
3213  res = pgfdw_exec_query(conn, sql);
3214  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3215  pgfdw_report_error(ERROR, res, conn, true, sql);
3216  PQclear(res);
3217 }
3218 
3219 /*
3220  * prepare_foreign_modify
3221  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3222  */
3223 static void
3225 {
3226  char prep_name[NAMEDATALEN];
3227  char *p_name;
3228  PGresult *res;
3229 
3230  /* Construct name we'll use for the prepared statement. */
3231  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3232  GetPrepStmtNumber(fmstate->conn));
3233  p_name = pstrdup(prep_name);
3234 
3235  /*
3236  * We intentionally do not specify parameter types here, but leave the
3237  * remote server to derive them by default. This avoids possible problems
3238  * with the remote server using different type OIDs than we do. All of
3239  * the prepared statements we use in this module are simple enough that
3240  * the remote server will make the right choices.
3241  */
3242  if (!PQsendPrepare(fmstate->conn,
3243  p_name,
3244  fmstate->query,
3245  0,
3246  NULL))
3247  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3248 
3249  /*
3250  * Get the result, and check for success.
3251  *
3252  * We don't use a PG_TRY block here, so be careful not to throw error
3253  * without releasing the PGresult.
3254  */
3255  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3256  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3257  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3258  PQclear(res);
3259 
3260  /* This action shows that the prepare has been done. */
3261  fmstate->p_name = p_name;
3262 }
3263 
3264 /*
3265  * convert_prep_stmt_params
3266  * Create array of text strings representing parameter values
3267  *
3268  * tupleid is ctid to send, or NULL if none
3269  * slot is slot to get remaining parameters from, or NULL if none
3270  *
3271  * Data is constructed in temp_cxt; caller should reset that after use.
3272  */
3273 static const char **
3275  ItemPointer tupleid,
3276  TupleTableSlot *slot)
3277 {
3278  const char **p_values;
3279  int pindex = 0;
3280  MemoryContext oldcontext;
3281 
3282  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3283 
3284  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3285 
3286  /* 1st parameter should be ctid, if it's in use */
3287  if (tupleid != NULL)
3288  {
3289  /* don't need set_transmission_modes for TID output */
3290  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3291  PointerGetDatum(tupleid));
3292  pindex++;
3293  }
3294 
3295  /* get following parameters from slot */
3296  if (slot != NULL && fmstate->target_attrs != NIL)
3297  {
3298  int nestlevel;
3299  ListCell *lc;
3300 
3301  nestlevel = set_transmission_modes();
3302 
3303  foreach(lc, fmstate->target_attrs)
3304  {
3305  int attnum = lfirst_int(lc);
3306  Datum value;
3307  bool isnull;
3308 
3309  value = slot_getattr(slot, attnum, &isnull);
3310  if (isnull)
3311  p_values[pindex] = NULL;
3312  else
3313  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3314  value);
3315  pindex++;
3316  }
3317 
3318  reset_transmission_modes(nestlevel);
3319  }
3320 
3321  Assert(pindex == fmstate->p_nums);
3322 
3323  MemoryContextSwitchTo(oldcontext);
3324 
3325  return p_values;
3326 }
3327 
3328 /*
3329  * store_returning_result
3330  * Store the result of a RETURNING clause
3331  *
3332  * On error, be sure to release the PGresult on the way out. Callers do not
3333  * have PG_TRY blocks to ensure this happens.
3334  */
3335 static void
3337  TupleTableSlot *slot, PGresult *res)
3338 {
3339  PG_TRY();
3340  {
3341  HeapTuple newtup;
3342 
3343  newtup = make_tuple_from_result_row(res, 0,
3344  fmstate->rel,
3345  fmstate->attinmeta,
3346  fmstate->retrieved_attrs,
3347  NULL,
3348  fmstate->temp_cxt);
3349  /* tuple will be deleted when it is cleared from the slot */
3350  ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3351  }
3352  PG_CATCH();
3353  {
3354  if (res)
3355  PQclear(res);
3356  PG_RE_THROW();
3357  }
3358  PG_END_TRY();
3359 }
3360 
3361 /*
3362  * build_remote_returning
3363  * Build a RETURNING targetlist of a remote query for performing an
3364  * UPDATE/DELETE .. RETURNING on a join directly
3365  */
3366 static List *
3367 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3368 {
3369  bool have_wholerow = false;
3370  List *tlist = NIL;
3371  List *vars;
3372  ListCell *lc;
3373 
3374  Assert(returningList);
3375 
3376  vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3377 
3378  /*
3379  * If there's a whole-row reference to the target relation, then we'll
3380  * need all the columns of the relation.
3381  */
3382  foreach(lc, vars)
3383  {
3384  Var *var = (Var *) lfirst(lc);
3385 
3386  if (IsA(var, Var) &&
3387  var->varno == rtindex &&
3388  var->varattno == InvalidAttrNumber)
3389  {
3390  have_wholerow = true;
3391  break;
3392  }
3393  }
3394 
3395  if (have_wholerow)
3396  {
3398  int i;
3399 
3400  for (i = 1; i <= tupdesc->natts; i++)
3401  {
3402  Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3403  Var *var;
3404 
3405  /* Ignore dropped attributes. */
3406  if (attr->attisdropped)
3407  continue;
3408 
3409  var = makeVar(rtindex,
3410  i,
3411  attr->atttypid,
3412  attr->atttypmod,
3413  attr->attcollation,
3414  0);
3415 
3416  tlist = lappend(tlist,
3417  makeTargetEntry((Expr *) var,
3418  list_length(tlist) + 1,
3419  NULL,
3420  false));
3421  }
3422  }
3423 
3424  /* Now add any remaining columns to tlist. */
3425  foreach(lc, vars)
3426  {
3427  Var *var = (Var *) lfirst(lc);
3428 
3429  /*
3430  * No need for whole-row references to the target relation. We don't
3431  * need system columns other than ctid and oid either, since those are
3432  * set locally.
3433  */
3434  if (IsA(var, Var) &&
3435  var->varno == rtindex &&
3436  var->varattno <= InvalidAttrNumber &&
3439  continue; /* don't need it */
3440 
3441  if (tlist_member((Expr *) var, tlist))
3442  continue; /* already got it */
3443 
3444  tlist = lappend(tlist,
3445  makeTargetEntry((Expr *) var,
3446  list_length(tlist) + 1,
3447  NULL,
3448  false));
3449  }
3450 
3451  list_free(vars);
3452 
3453  return tlist;
3454 }
3455 
3456 /*
3457  * rebuild_fdw_scan_tlist
3458  * Build new fdw_scan_tlist of given foreign-scan plan node from given
3459  * tlist
3460  *
3461  * There might be columns that the fdw_scan_tlist of the given foreign-scan
3462  * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
3463  * have contained resjunk columns such as 'ctid' of the target relation and
3464  * 'wholerow' of non-target relations, but the tlist might not contain them,
3465  * for example. So, adjust the tlist so it contains all the columns specified
3466  * in the fdw_scan_tlist; else setrefs.c will get confused.
3467  */
3468 static void
3470 {
3471  List *new_tlist = tlist;
3472  List *old_tlist = fscan->fdw_scan_tlist;
3473  ListCell *lc;
3474 
3475  foreach(lc, old_tlist)
3476  {
3477  TargetEntry *tle = (TargetEntry *) lfirst(lc);
3478 
3479  if (tlist_member(tle->expr, new_tlist))
3480  continue; /* already got it */
3481 
3482  new_tlist = lappend(new_tlist,
3483  makeTargetEntry(tle->expr,
3484  list_length(new_tlist) + 1,
3485  NULL,
3486  false));
3487  }
3488  fscan->fdw_scan_tlist = new_tlist;
3489 }
3490 
3491 /*
3492  * Execute a direct UPDATE/DELETE statement.
3493  */
3494 static void
3496 {
3498  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3499  int numParams = dmstate->numParams;
3500  const char **values = dmstate->param_values;
3501 
3502  /*
3503  * Construct array of query parameter values in text format.
3504  */
3505  if (numParams > 0)
3506  process_query_params(econtext,
3507  dmstate->param_flinfo,
3508  dmstate->param_exprs,
3509  values);
3510 
3511  /*
3512  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3513  * to infer types for all parameters. Since we explicitly cast every
3514  * parameter (see deparse.c), the "inference" is trivial and will produce
3515  * the desired result. This allows us to avoid assuming that the remote
3516  * server has the same OIDs we do for the parameters' types.
3517  */
3518  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3519  NULL, values, NULL, NULL, 0))
3520  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3521 
3522  /*
3523  * Get the result, and check for success.
3524  *
3525  * We don't use a PG_TRY block here, so be careful not to throw error
3526  * without releasing the PGresult.
3527  */
3528  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3529  if (PQresultStatus(dmstate->result) !=
3531  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3532  dmstate->query);
3533 
3534  /* Get the number of rows affected. */
3535  if (dmstate->has_returning)
3536  dmstate->num_tuples = PQntuples(dmstate->result);
3537  else
3538  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3539 }
3540 
3541 /*
3542  * Get the result of a RETURNING clause.
3543  */
3544 static TupleTableSlot *
3546 {
3548  EState *estate = node->ss.ps.state;
3549  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3550  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3551  TupleTableSlot *resultSlot;
3552 
3553  Assert(resultRelInfo->ri_projectReturning);
3554 
3555  /* If we didn't get any tuples, must be end of data. */
3556  if (dmstate->next_tuple >= dmstate->num_tuples)
3557  return ExecClearTuple(slot);
3558 
3559  /* Increment the command es_processed count if necessary. */
3560  if (dmstate->set_processed)
3561  estate->es_processed += 1;
3562 
3563  /*
3564  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3565  * tuple. (has_returning is false when the local query is of the form
3566  * "UPDATE/DELETE .. RETURNING 1" for example.)
3567  */
3568  if (!dmstate->has_returning)
3569  {
3570  ExecStoreAllNullTuple(slot);
3571  resultSlot = slot;
3572  }
3573  else
3574  {
3575  /*
3576  * On error, be sure to release the PGresult on the way out. Callers
3577  * do not have PG_TRY blocks to ensure this happens.
3578  */
3579  PG_TRY();
3580  {
3581  HeapTuple newtup;
3582 
3583  newtup = make_tuple_from_result_row(dmstate->result,
3584  dmstate->next_tuple,
3585  dmstate->rel,
3586  dmstate->attinmeta,
3587  dmstate->retrieved_attrs,
3588  node,
3589  dmstate->temp_cxt);
3590  ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3591  }
3592  PG_CATCH();
3593  {
3594  if (dmstate->result)
3595  PQclear(dmstate->result);
3596  PG_RE_THROW();
3597  }
3598  PG_END_TRY();
3599 
3600  /* Get the updated/deleted tuple. */
3601  if (dmstate->rel)
3602  resultSlot = slot;
3603  else
3604  resultSlot = apply_returning_filter(dmstate, slot, estate);
3605  }
3606  dmstate->next_tuple++;
3607 
3608  /* Make slot available for evaluation of the local query RETURNING list. */
3609  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
3610  resultSlot;
3611 
3612  return slot;
3613 }
3614 
3615 /*
3616  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
3617  */
3618 static void
3620  List *fdw_scan_tlist,
3621  Index rtindex)
3622 {
3623  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
3624  ListCell *lc;
3625  int i;
3626 
3627  /*
3628  * Calculate the mapping between the fdw_scan_tlist's entries and the
3629  * result tuple's attributes.
3630  *
3631  * The "map" is an array of indexes of the result tuple's attributes in
3632  * fdw_scan_tlist, i.e., one entry for every attribute of the result
3633  * tuple. We store zero for any attributes that don't have the
3634  * corresponding entries in that list, marking that a NULL is needed in
3635  * the result tuple.
3636  *
3637  * Also get the indexes of the entries for ctid and oid if any.
3638  */
3639  dmstate->attnoMap = (AttrNumber *)
3640  palloc0(resultTupType->natts * sizeof(AttrNumber));
3641 
3642  dmstate->ctidAttno = dmstate->oidAttno = 0;
3643 
3644  i = 1;
3645  dmstate->hasSystemCols = false;
3646  foreach(lc, fdw_scan_tlist)
3647  {
3648  TargetEntry *tle = (TargetEntry *) lfirst(lc);
3649  Var *var = (Var *) tle->expr;
3650 
3651  Assert(IsA(var, Var));
3652 
3653  /*
3654  * If the Var is a column of the target relation to be retrieved from
3655  * the foreign server, get the index of the entry.
3656  */
3657  if (var->varno == rtindex &&
3658  list_member_int(dmstate->retrieved_attrs, i))
3659  {
3660  int attrno = var->varattno;
3661 
3662  if (attrno < 0)
3663  {
3664  /*
3665  * We don't retrieve system columns other than ctid and oid.
3666  */
3667  if (attrno == SelfItemPointerAttributeNumber)
3668  dmstate->ctidAttno = i;
3669  else if (attrno == ObjectIdAttributeNumber)
3670  dmstate->oidAttno = i;
3671  else
3672  Assert(false);
3673  dmstate->hasSystemCols = true;
3674  }
3675  else
3676  {
3677  /*
3678  * We don't retrieve whole-row references to the target
3679  * relation either.
3680  */
3681  Assert(attrno > 0);
3682 
3683  dmstate->attnoMap[attrno - 1] = i;
3684  }
3685  }
3686  i++;
3687  }
3688 }
3689 
3690 /*
3691  * Extract and return an updated/deleted tuple from a scan tuple.
3692  */
3693 static TupleTableSlot *
3695  TupleTableSlot *slot,
3696  EState *estate)
3697 {
3698  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
3699  TupleTableSlot *resultSlot;
3700  Datum *values;
3701  bool *isnull;
3702  Datum *old_values;
3703  bool *old_isnull;
3704  int i;
3705 
3706  /*
3707  * Use the trigger tuple slot as a place to store the result tuple.
3708  */
3709  resultSlot = estate->es_trig_tuple_slot;
3710  if (resultSlot->tts_tupleDescriptor != resultTupType)
3711  ExecSetSlotDescriptor(resultSlot, resultTupType);
3712 
3713  /*
3714  * Extract all the values of the scan tuple.
3715  */
3716  slot_getallattrs(slot);
3717  old_values = slot->tts_values;
3718  old_isnull = slot->tts_isnull;
3719 
3720  /*
3721  * Prepare to build the result tuple.
3722  */
3723  ExecClearTuple(resultSlot);
3724  values = resultSlot->tts_values;
3725  isnull = resultSlot->tts_isnull;
3726 
3727  /*
3728  * Transpose data into proper fields of the result tuple.
3729  */
3730  for (i = 0; i < resultTupType->natts; i++)
3731  {
3732  int j = dmstate->attnoMap[i];
3733 
3734  if (j == 0)
3735  {
3736  values[i] = (Datum) 0;
3737  isnull[i] = true;
3738  }
3739  else
3740  {
3741  values[i] = old_values[j - 1];
3742  isnull[i] = old_isnull[j - 1];
3743  }
3744  }
3745 
3746  /*
3747  * Build the virtual tuple.
3748  */
3749  ExecStoreVirtualTuple(resultSlot);
3750 
3751  /*
3752  * If we have any system columns to return, install them.
3753  */
3754  if (dmstate->hasSystemCols)
3755  {
3756  HeapTuple resultTup = ExecMaterializeSlot(resultSlot);
3757 
3758  /* ctid */
3759  if (dmstate->ctidAttno)
3760  {
3761  ItemPointer ctid = NULL;
3762 
3763  ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
3764  resultTup->t_self = *ctid;
3765  }
3766 
3767  /* oid */
3768  if (dmstate->oidAttno)
3769  {
3770  Oid oid = InvalidOid;
3771 
3772  oid = DatumGetObjectId(old_values[dmstate->oidAttno - 1]);
3773  HeapTupleSetOid(resultTup, oid);
3774  }
3775 
3776  /*
3777  * And remaining columns
3778  *
3779  * Note: since we currently don't allow the target relation to appear
3780  * on the nullable side of an outer join, any system columns wouldn't
3781  * go to NULL.
3782  *
3783  * Note: no need to care about tableoid here because it will be
3784  * initialized in ExecProcessReturning().
3785  */
3789  }
3790 
3791  /*
3792  * And return the result tuple.
3793  */
3794  return resultSlot;
3795 }
3796 
3797 /*
3798  * Prepare for processing of parameters used in remote query.
3799  */
3800 static void
3802  List *fdw_exprs,
3803  int numParams,
3805  List **param_exprs,
3806  const char ***param_values)
3807 {
3808  int i;
3809  ListCell *lc;
3810 
3811  Assert(numParams > 0);
3812 
3813  /* Prepare for output conversion of parameters used in remote query. */
3814  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
3815 
3816  i = 0;
3817  foreach(lc, fdw_exprs)
3818  {
3819  Node *param_expr = (Node *) lfirst(lc);
3820  Oid typefnoid;
3821  bool isvarlena;
3822 
3823  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
3824  fmgr_info(typefnoid, &(*param_flinfo)[i]);
3825  i++;
3826  }
3827 
3828  /*
3829  * Prepare remote-parameter expressions for evaluation. (Note: in
3830  * practice, we expect that all these expressions will be just Params, so
3831  * we could possibly do something more efficient than using the full
3832  * expression-eval machinery for this. But probably there would be little
3833  * benefit, and it'd require postgres_fdw to know more than is desirable
3834  * about Param evaluation.)
3835  */
3836  *param_exprs = ExecInitExprList(fdw_exprs, node);
3837 
3838  /* Allocate buffer for text form of query parameters. */
3839  *param_values = (const char **) palloc0(numParams * sizeof(char *));
3840 }
3841 
3842 /*
3843  * Construct array of query parameter values in text format.
3844  */
3845 static void
3848  List *param_exprs,
3849  const char **param_values)
3850 {
3851  int nestlevel;
3852  int i;
3853  ListCell *lc;
3854 
3855  nestlevel = set_transmission_modes();
3856 
3857  i = 0;
3858  foreach(lc, param_exprs)
3859  {
3860  ExprState *expr_state = (ExprState *) lfirst(lc);
3861  Datum expr_value;
3862  bool isNull;
3863 
3864  /* Evaluate the parameter expression */
3865  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
3866 
3867  /*
3868  * Get string representation of each parameter value by invoking
3869  * type-specific output function, unless the value is null.
3870  */
3871  if (isNull)
3872  param_values[i] = NULL;
3873  else
3874  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
3875 
3876  i++;
3877  }
3878 
3879  reset_transmission_modes(nestlevel);
3880 }
3881 
3882 /*
3883  * postgresAnalyzeForeignTable
3884  * Test whether analyzing this foreign table is supported
3885  */
3886 static bool
3888  AcquireSampleRowsFunc *func,
3889  BlockNumber *totalpages)
3890 {
3891  ForeignTable *table;
3892  UserMapping *user;
3893  PGconn *conn;
3894  StringInfoData sql;
3895  PGresult *volatile res = NULL;
3896 
3897  /* Return the row-analysis function pointer */
3899 
3900  /*
3901  * Now we have to get the number of pages. It's annoying that the ANALYZE
3902  * API requires us to return that now, because it forces some duplication
3903  * of effort between this routine and postgresAcquireSampleRowsFunc. But
3904  * it's probably not worth redefining that API at this point.
3905  */
3906 
3907  /*
3908  * Get the connection to use. We do the remote access as the table's
3909  * owner, even if the ANALYZE was started by some other user.
3910  */
3911  table = GetForeignTable(RelationGetRelid(relation));
3912  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3913  conn = GetConnection(user, false);
3914 
3915  /*
3916  * Construct command to get page count for relation.
3917  */
3918  initStringInfo(&sql);
3919  deparseAnalyzeSizeSql(&sql, relation);
3920 
3921  /* In what follows, do not risk leaking any PGresults. */
3922  PG_TRY();
3923  {
3924  res = pgfdw_exec_query(conn, sql.data);
3925  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3926  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3927 
3928  if (PQntuples(res) != 1 || PQnfields(res) != 1)
3929  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
3930  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
3931 
3932  PQclear(res);
3933  res = NULL;
3934  }
3935  PG_CATCH();
3936  {
3937  if (res)
3938  PQclear(res);
3939  PG_RE_THROW();
3940  }
3941  PG_END_TRY();
3942 
3943  ReleaseConnection(conn);
3944 
3945  return true;
3946 }
3947 
3948 /*
3949  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
3950  *
3951  * We fetch the whole table from the remote side and pick out some sample rows.
3952  *
3953  * Selected rows are returned in the caller-allocated array rows[],
3954  * which must have at least targrows entries.
3955  * The actual number of rows selected is returned as the function result.
3956  * We also count the total number of rows in the table and return it into
3957  * *totalrows. Note that *totaldeadrows is always set to 0.
3958  *
3959  * Note that the returned list of rows is not always in order by physical
3960  * position in the table. Therefore, correlation estimates derived later
3961  * may be meaningless, but it's OK because we don't use the estimates
3962  * currently (the planner only pays attention to correlation for indexscans).
3963  */
3964 static int
3966  HeapTuple *rows, int targrows,
3967  double *totalrows,
3968  double *totaldeadrows)
3969 {
3970  PgFdwAnalyzeState astate;
3971  ForeignTable *table;
3972  ForeignServer *server;
3973  UserMapping *user;
3974  PGconn *conn;
3975  unsigned int cursor_number;
3976  StringInfoData sql;
3977  PGresult *volatile res = NULL;
3978 
3979  /* Initialize workspace state */
3980  astate.rel = relation;
3982 
3983  astate.rows = rows;
3984  astate.targrows = targrows;
3985  astate.numrows = 0;
3986  astate.samplerows = 0;
3987  astate.rowstoskip = -1; /* -1 means not set yet */
3988  reservoir_init_selection_state(&astate.rstate, targrows);
3989 
3990  /* Remember ANALYZE context, and create a per-tuple temp context */
3991  astate.anl_cxt = CurrentMemoryContext;
3993  "postgres_fdw temporary data",
3995 
3996  /*
3997  * Get the connection to use. We do the remote access as the table's
3998  * owner, even if the ANALYZE was started by some other user.
3999  */
4000  table = GetForeignTable(RelationGetRelid(relation));
4001  server = GetForeignServer(table->serverid);
4002  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4003  conn = GetConnection(user, false);
4004 
4005  /*
4006  * Construct cursor that retrieves whole rows from remote.
4007  */
4008  cursor_number = GetCursorNumber(conn);
4009  initStringInfo(&sql);
4010  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4011  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4012 
4013  /* In what follows, do not risk leaking any PGresults. */
4014  PG_TRY();
4015  {
4016  res = pgfdw_exec_query(conn, sql.data);
4017  if (PQresultStatus(res) != PGRES_COMMAND_OK)
4018  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4019  PQclear(res);
4020  res = NULL;
4021 
4022  /* Retrieve and process rows a batch at a time. */
4023  for (;;)
4024  {
4025  char fetch_sql[64];
4026  int fetch_size;
4027  int numrows;
4028  int i;
4029  ListCell *lc;
4030 
4031  /* Allow users to cancel long query */
4033 
4034  /*
4035  * XXX possible future improvement: if rowstoskip is large, we
4036  * could issue a MOVE rather than physically fetching the rows,
4037  * then just adjust rowstoskip and samplerows appropriately.
4038  */
4039 
4040  /* The fetch size is arbitrary, but shouldn't be enormous. */
4041  fetch_size = 100;
4042  foreach(lc, server->options)
4043  {
4044  DefElem *def = (DefElem *) lfirst(lc);
4045 
4046  if (strcmp(def->defname, "fetch_size") == 0)
4047  {
4048  fetch_size = strtol(defGetString(def), NULL, 10);
4049  break;
4050  }
4051  }
4052  foreach(lc, table->options)
4053  {
4054  DefElem *def = (DefElem *) lfirst(lc);
4055 
4056  if (strcmp(def->defname, "fetch_size") == 0)
4057  {
4058  fetch_size = strtol(defGetString(def), NULL, 10);
4059  break;
4060  }
4061  }
4062 
4063  /* Fetch some rows */
4064  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4065  fetch_size, cursor_number);
4066 
4067  res = pgfdw_exec_query(conn, fetch_sql);
4068  /* On error, report the original query, not the FETCH. */
4069  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4070  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4071 
4072  /* Process whatever we got. */
4073  numrows = PQntuples(res);
4074  for (i = 0; i < numrows; i++)
4075  analyze_row_processor(res, i, &astate);
4076 
4077  PQclear(res);
4078  res = NULL;
4079 
4080  /* Must be EOF if we didn't get all the rows requested. */
4081  if (numrows < fetch_size)
4082  break;
4083  }
4084 
4085  /* Close the cursor, just to be tidy. */
4086  close_cursor(conn, cursor_number);
4087  }
4088  PG_CATCH();
4089  {
4090  if (res)
4091  PQclear(res);
4092  PG_RE_THROW();
4093  }
4094  PG_END_TRY();
4095 
4096  ReleaseConnection(conn);
4097 
4098  /* We assume that we have no dead tuple. */
4099  *totaldeadrows = 0.0;
4100 
4101  /* We've retrieved all living tuples from foreign server. */
4102  *totalrows = astate.samplerows;
4103 
4104  /*
4105  * Emit some interesting relation info
4106  */
4107  ereport(elevel,
4108  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4109  RelationGetRelationName(relation),
4110  astate.samplerows, astate.numrows)));
4111 
4112  return astate.numrows;
4113 }
4114 
4115 /*
4116  * Collect sample rows from the result of query.
4117  * - Use all tuples in sample until target # of samples are collected.
4118  * - Subsequently, replace already-sampled tuples randomly.
4119  */
4120 static void
4122 {
4123  int targrows = astate->targrows;
4124  int pos; /* array index to store tuple in */
4125  MemoryContext oldcontext;
4126 
4127  /* Always increment sample row counter. */
4128  astate->samplerows += 1;
4129 
4130  /*
4131  * Determine the slot where this sample row should be stored. Set pos to
4132  * negative value to indicate the row should be skipped.
4133  */
4134  if (astate->numrows < targrows)
4135  {
4136  /* First targrows rows are always included into the sample */
4137  pos = astate->numrows++;
4138  }
4139  else
4140  {
4141  /*
4142  * Now we start replacing tuples in the sample until we reach the end
4143  * of the relation. Same algorithm as in acquire_sample_rows in
4144  * analyze.c; see Jeff Vitter's paper.
4145  */
4146  if (astate->rowstoskip < 0)
4147  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4148 
4149  if (astate->rowstoskip <= 0)
4150  {
4151  /* Choose a random reservoir element to replace. */
4152  pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4153  Assert(pos >= 0 && pos < targrows);
4154  heap_freetuple(astate->rows[pos]);
4155  }
4156  else
4157  {
4158  /* Skip this tuple. */
4159  pos = -1;
4160  }
4161 
4162  astate->rowstoskip -= 1;
4163  }
4164 
4165  if (pos >= 0)
4166  {
4167  /*
4168  * Create sample tuple from current result row, and store it in the
4169  * position determined above. The tuple has to be created in anl_cxt.
4170  */
4171  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4172 
4173  astate->rows[pos] = make_tuple_from_result_row(res, row,
4174  astate->rel,
4175  astate->attinmeta,
4176  astate->retrieved_attrs,
4177  NULL,
4178  astate->temp_cxt);
4179 
4180  MemoryContextSwitchTo(oldcontext);
4181  }
4182 }
4183 
4184 /*
4185  * Import a foreign schema
4186  */
4187 static List *
4189 {
4190  List *commands = NIL;
4191  bool import_collate = true;
4192  bool import_default = false;
4193  bool import_not_null = true;
4194  ForeignServer *server;
4195  UserMapping *mapping;
4196  PGconn *conn;
4198  PGresult *volatile res = NULL;
4199  int numrows,
4200  i;
4201  ListCell *lc;
4202 
4203  /* Parse statement options */
4204  foreach(lc, stmt->options)
4205  {
4206  DefElem *def = (DefElem *) lfirst(lc);
4207 
4208  if (strcmp(def->defname, "import_collate") == 0)
4209  import_collate = defGetBoolean(def);
4210  else if (strcmp(def->defname, "import_default") == 0)
4211  import_default = defGetBoolean(def);
4212  else if (strcmp(def->defname, "import_not_null") == 0)
4213  import_not_null = defGetBoolean(def);
4214  else
4215  ereport(ERROR,
4216  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4217  errmsg("invalid option \"%s\"", def->defname)));
4218  }
4219 
4220  /*
4221  * Get connection to the foreign server. Connection manager will
4222  * establish new connection if necessary.
4223  */
4224  server = GetForeignServer(serverOid);
4225  mapping = GetUserMapping(GetUserId(), server->serverid);
4226  conn = GetConnection(mapping, false);
4227 
4228  /* Don't attempt to import collation if remote server hasn't got it */
4229  if (PQserverVersion(conn) < 90100)
4230  import_collate = false;
4231 
4232  /* Create workspace for strings */
4233  initStringInfo(&buf);
4234 
4235  /* In what follows, do not risk leaking any PGresults. */
4236  PG_TRY();
4237  {
4238  /* Check that the schema really exists */
4239  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4240  deparseStringLiteral(&buf, stmt->remote_schema);
4241 
4242  res = pgfdw_exec_query(conn, buf.data);
4243  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4244  pgfdw_report_error(ERROR, res, conn, false, buf.data);
4245 
4246  if (PQntuples(res) != 1)
4247  ereport(ERROR,
4248  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4249  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4250  stmt->remote_schema, server->servername)));
4251 
4252  PQclear(res);
4253  res = NULL;
4254  resetStringInfo(&buf);
4255 
4256  /*
4257  * Fetch all table data from this schema, possibly restricted by
4258  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
4259  * to EXCEPT/LIMIT TO here, because the core code will filter the
4260  * statements we return according to those lists anyway. But it
4261  * should save a few cycles to not process excluded tables in the
4262  * first place.)
4263  *
4264  * Ignore table data for partitions and only include the definitions
4265  * of the root partitioned tables to allow access to the complete
4266  * remote data set locally in the schema imported.
4267  *
4268  * Note: because we run the connection with search_path restricted to
4269  * pg_catalog, the format_type() and pg_get_expr() outputs will always
4270  * include a schema name for types/functions in other schemas, which
4271  * is what we want.
4272  */
4273  if (import_collate)
4275  "SELECT relname, "
4276  " attname, "
4277  " format_type(atttypid, atttypmod), "
4278  " attnotnull, "
4279  " pg_get_expr(adbin, adrelid), "
4280  " collname, "
4281  " collnsp.nspname "
4282  "FROM pg_class c "
4283  " JOIN pg_namespace n ON "
4284  " relnamespace = n.oid "
4285  " LEFT JOIN pg_attribute a ON "
4286  " attrelid = c.oid AND attnum > 0 "
4287  " AND NOT attisdropped "
4288  " LEFT JOIN pg_attrdef ad ON "
4289  " adrelid = c.oid AND adnum = attnum "
4290  " LEFT JOIN pg_collation coll ON "
4291  " coll.oid = attcollation "
4292  " LEFT JOIN pg_namespace collnsp ON "
4293  " collnsp.oid = collnamespace ");
4294  else
4296  "SELECT relname, "
4297  " attname, "
4298  " format_type(atttypid, atttypmod), "
4299  " attnotnull, "
4300  " pg_get_expr(adbin, adrelid), "
4301  " NULL, NULL "
4302  "FROM pg_class c "
4303  " JOIN pg_namespace n ON "
4304  " relnamespace = n.oid "
4305  " LEFT JOIN pg_attribute a ON "
4306  " attrelid = c.oid AND attnum > 0 "
4307  " AND NOT attisdropped "
4308  " LEFT JOIN pg_attrdef ad ON "
4309  " adrelid = c.oid AND adnum = attnum ");
4310 
4312  "WHERE c.relkind IN ("
4318  " AND n.nspname = ");
4319  deparseStringLiteral(&buf, stmt->remote_schema);
4320 
4321  /* Partitions are supported since Postgres 10 */
4322  if (PQserverVersion(conn) >= 100000)
4323  appendStringInfoString(&buf, " AND NOT c.relispartition ");
4324 
4325  /* Apply restrictions for LIMIT TO and EXCEPT */
4326  if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4328  {
4329  bool first_item = true;
4330 
4331  appendStringInfoString(&buf, " AND c.relname ");
4332  if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4333  appendStringInfoString(&buf, "NOT ");
4334  appendStringInfoString(&buf, "IN (");
4335 
4336  /* Append list of table names within IN clause */
4337  foreach(lc, stmt->table_list)
4338  {
4339  RangeVar *rv = (RangeVar *) lfirst(lc);
4340 
4341  if (first_item)
4342  first_item = false;
4343  else
4344  appendStringInfoString(&buf, ", ");
4345  deparseStringLiteral(&buf, rv->relname);
4346  }
4347  appendStringInfoChar(&buf, ')');
4348  }
4349 
4350  /* Append ORDER BY at the end of query to ensure output ordering */
4351  appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4352 
4353  /* Fetch the data */
4354  res = pgfdw_exec_query(conn, buf.data);
4355  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4356  pgfdw_report_error(ERROR, res, conn, false, buf.data);
4357 
4358  /* Process results */
4359  numrows = PQntuples(res);
4360  /* note: incrementation of i happens in inner loop's while() test */
4361  for (i = 0; i < numrows;)
4362  {
4363  char *tablename = PQgetvalue(res, i, 0);
4364  bool first_item = true;
4365 
4366  resetStringInfo(&buf);
4367  appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4368  quote_identifier(tablename));
4369 
4370  /* Scan all rows for this table */
4371  do
4372  {
4373  char *attname;
4374  char *typename;
4375  char *attnotnull;
4376  char *attdefault;
4377  char *collname;
4378  char *collnamespace;
4379 
4380  /* If table has no columns, we'll see nulls here */
4381  if (PQgetisnull(res, i, 1))
4382  continue;
4383 
4384  attname = PQgetvalue(res, i, 1);
4385  typename = PQgetvalue(res, i, 2);
4386  attnotnull = PQgetvalue(res, i, 3);
4387  attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
4388  PQgetvalue(res, i, 4);
4389  collname = PQgetisnull(res, i, 5) ? (char *) NULL :
4390  PQgetvalue(res, i, 5);
4391  collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
4392  PQgetvalue(res, i, 6);
4393 
4394  if (first_item)
4395  first_item = false;
4396  else
4397  appendStringInfoString(&buf, ",\n");
4398 
4399  /* Print column name and type */
4400  appendStringInfo(&buf, " %s %s",
4401  quote_identifier(attname),
4402  typename);
4403 
4404  /*
4405  * Add column_name option so that renaming the foreign table's
4406  * column doesn't break the association to the underlying
4407  * column.
4408  */
4409  appendStringInfoString(&buf, " OPTIONS (column_name ");
4410  deparseStringLiteral(&buf, attname);
4411  appendStringInfoChar(&buf, ')');
4412 
4413  /* Add COLLATE if needed */
4414  if (import_collate && collname != NULL && collnamespace != NULL)
4415  appendStringInfo(&buf, " COLLATE %s.%s",
4416  quote_identifier(collnamespace),
4417  quote_identifier(collname));
4418 
4419  /* Add DEFAULT if needed */
4420  if (import_default && attdefault != NULL)
4421  appendStringInfo(&buf, " DEFAULT %s", attdefault);
4422 
4423  /* Add NOT NULL if needed */
4424  if (import_not_null && attnotnull[0] == 't')
4425  appendStringInfoString(&buf, " NOT NULL");
4426  }
4427  while (++i < numrows &&
4428  strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4429 
4430  /*
4431  * Add server name and table-level options. We specify remote
4432  * schema and table name as options (the latter to ensure that
4433  * renaming the foreign table doesn't break the association).
4434  */
4435  appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4436  quote_identifier(server->servername));
4437 
4438  appendStringInfoString(&buf, "schema_name ");
4439  deparseStringLiteral(&buf, stmt->remote_schema);
4440  appendStringInfoString(&buf, ", table_name ");
4441  deparseStringLiteral(&buf, tablename);
4442 
4443  appendStringInfoString(&buf, ");");
4444 
4445  commands = lappend(commands, pstrdup(buf.data));
4446  }
4447 
4448  /* Clean up */
4449  PQclear(res);
4450  res = NULL;
4451  }
4452  PG_CATCH();
4453  {
4454  if (res)
4455  PQclear(res);
4456  PG_RE_THROW();
4457  }
4458  PG_END_TRY();
4459 
4460  ReleaseConnection(conn);
4461 
4462  return commands;
4463 }
4464 
4465 /*
4466  * Assess whether the join between inner and outer relations can be pushed down
4467  * to the foreign server. As a side effect, save information we obtain in this
4468  * function to PgFdwRelationInfo passed in.
4469  */
4470 static bool
4472  RelOptInfo *outerrel, RelOptInfo *innerrel,
4473  JoinPathExtraData *extra)
4474 {
4475  PgFdwRelationInfo *fpinfo;
4476  PgFdwRelationInfo *fpinfo_o;
4477  PgFdwRelationInfo *fpinfo_i;
4478  ListCell *lc;
4479  List *joinclauses;
4480 
4481  /*
4482  * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4483  * Constructing queries representing SEMI and ANTI joins is hard, hence
4484  * not considered right now.
4485  */
4486  if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4487  jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4488  return false;
4489 
4490  /*
4491  * If either of the joining relations is marked as unsafe to pushdown, the
4492  * join can not be pushed down.
4493  */
4494  fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4495  fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4496  fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4497  if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4498  !fpinfo_i || !fpinfo_i->pushdown_safe)
4499  return false;
4500 
4501  /*
4502  * If joining relations have local conditions, those conditions are
4503  * required to be applied before joining the relations. Hence the join can
4504  * not be pushed down.
4505  */
4506  if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4507  return false;
4508 
4509  /*
4510  * Merge FDW options. We might be tempted to do this after we have deemed
4511  * the foreign join to be OK. But we must do this beforehand so that we
4512  * know which quals can be evaluated on the foreign server, which might
4513  * depend on shippable_extensions.
4514  */
4515  fpinfo->server = fpinfo_o->server;
4516  merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
4517 
4518  /*
4519  * Separate restrict list into join quals and pushed-down (other) quals.
4520  *
4521  * Join quals belonging to an outer join must all be shippable, else we
4522  * cannot execute the join remotely. Add such quals to 'joinclauses'.
4523  *
4524  * Add other quals to fpinfo->remote_conds if they are shippable, else to
4525  * fpinfo->local_conds. In an inner join it's okay to execute conditions
4526  * either locally or remotely; the same is true for pushed-down conditions
4527  * at an outer join.
4528  *
4529  * Note we might return failure after having already scribbled on
4530  * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
4531  * won't consult those lists again if we deem the join unshippable.
4532  */
4533  joinclauses = NIL;
4534  foreach(lc, extra->restrictlist)
4535  {
4536  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4537  bool is_remote_clause = is_foreign_expr(root, joinrel,
4538  rinfo->clause);
4539 
4540  if (IS_OUTER_JOIN(jointype) && !rinfo->is_pushed_down)
4541  {
4542  if (!is_remote_clause)
4543  return false;
4544  joinclauses = lappend(joinclauses, rinfo);
4545  }
4546  else
4547  {
4548  if (is_remote_clause)
4549  fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4550  else
4551  fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4552  }
4553  }
4554 
4555  /*
4556  * deparseExplicitTargetList() isn't smart enough to handle anything other
4557  * than a Var. In particular, if there's some PlaceHolderVar that would
4558  * need to be evaluated within this join tree (because there's an upper
4559  * reference to a quantity that may go to NULL as a result of an outer
4560  * join), then we can't try to push the join down because we'll fail when
4561  * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
4562  * needs to be evaluated *at the top* of this join tree is OK, because we
4563  * can do that locally after fetching the results from the remote side.
4564  */
4565  foreach(lc, root->placeholder_list)
4566  {
4567  PlaceHolderInfo *phinfo = lfirst(lc);
4568  Relids relids;
4569 
4570  /* PlaceHolderInfo refers to parent relids, not child relids. */
4571  relids = IS_OTHER_REL(joinrel) ?
4572  joinrel->top_parent_relids : joinrel->relids;
4573 
4574  if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4575  bms_nonempty_difference(relids, phinfo->ph_eval_at))
4576  return false;
4577  }
4578 
4579  /* Save the join clauses, for later use. */
4580  fpinfo->joinclauses = joinclauses;
4581 
4582  fpinfo->outerrel = outerrel;
4583  fpinfo->innerrel = innerrel;
4584  fpinfo->jointype = jointype;
4585 
4586  /*
4587  * By default, both the input relations are not required to be deparsed as
4588  * subqueries, but there might be some relations covered by the input
4589  * relations that are required to be deparsed as subqueries, so save the
4590  * relids of those relations for later use by the deparser.
4591  */
4592  fpinfo->make_outerrel_subquery = false;
4593  fpinfo->make_innerrel_subquery = false;
4594  Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
4595  Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
4597  fpinfo_i->lower_subquery_rels);
4598 
4599  /*
4600  * Pull the other remote conditions from the joining relations into join
4601  * clauses or other remote clauses (remote_conds) of this relation
4602  * wherever possible. This avoids building subqueries at every join step.
4603  *
4604  * For an inner join, clauses from both the relations are added to the
4605  * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4606  * the outer side are added to remote_conds since those can be evaluated
4607  * after the join is evaluated. The clauses from inner side are added to
4608  * the joinclauses, since they need to be evaluated while constructing the
4609  * join.
4610  *
4611  * For a FULL OUTER JOIN, the other clauses from either relation can not
4612  * be added to the joinclauses or remote_conds, since each relation acts
4613  * as an outer relation for the other.
4614  *
4615  * The joining sides can not have local conditions, thus no need to test
4616  * shippability of the clauses being pulled up.
4617  */
4618  switch (jointype)
4619  {
4620  case JOIN_INNER:
4621  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4622  list_copy(fpinfo_i->remote_conds));
4623  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4624  list_copy(fpinfo_o->remote_conds));
4625  break;
4626 
4627  case JOIN_LEFT:
4628  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4629  list_copy(fpinfo_i->remote_conds));
4630  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4631  list_copy(fpinfo_o->remote_conds));
4632  break;
4633 
4634  case JOIN_RIGHT:
4635  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4636  list_copy(fpinfo_o->remote_conds));
4637  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4638  list_copy(fpinfo_i->remote_conds));
4639  break;
4640 
4641  case JOIN_FULL:
4642 
4643  /*
4644  * In this case, if any of the input relations has conditions, we
4645  * need to deparse that relation as a subquery so that the
4646  * conditions can be evaluated before the join. Remember it in
4647  * the fpinfo of this relation so that the deparser can take
4648  * appropriate action. Also, save the relids of base relations
4649  * covered by that relation for later use by the deparser.
4650  */
4651  if (fpinfo_o->remote_conds)
4652  {
4653  fpinfo->make_outerrel_subquery = true;
4654  fpinfo->lower_subquery_rels =
4656  outerrel->relids);
4657  }
4658  if (fpinfo_i->remote_conds)
4659  {
4660  fpinfo->make_innerrel_subquery = true;
4661  fpinfo->lower_subquery_rels =
4663  innerrel->relids);
4664  }
4665  break;
4666 
4667  default:
4668  /* Should not happen, we have just checked this above */
4669  elog(ERROR, "unsupported join type %d", jointype);
4670  }
4671 
4672  /*
4673  * For an inner join, all restrictions can be treated alike. Treating the
4674  * pushed down conditions as join conditions allows a top level full outer
4675  * join to be deparsed without requiring subqueries.
4676  */
4677  if (jointype == JOIN_INNER)
4678  {
4679  Assert(!fpinfo->joinclauses);
4680  fpinfo->joinclauses = fpinfo->remote_conds;
4681  fpinfo->remote_conds = NIL;
4682  }
4683 
4684  /* Mark that this join can be pushed down safely */
4685  fpinfo->pushdown_safe = true;
4686 
4687  /* Get user mapping */
4688  if (fpinfo->use_remote_estimate)
4689  {
4690  if (fpinfo_o->use_remote_estimate)
4691  fpinfo->user = fpinfo_o->user;
4692  else
4693  fpinfo->user = fpinfo_i->user;
4694  }
4695  else
4696  fpinfo->user = NULL;
4697 
4698  /*
4699  * Set cached relation costs to some negative value, so that we can detect
4700  * when they are set to some sensible costs, during one (usually the
4701  * first) of the calls to estimate_path_cost_size().
4702  */
4703  fpinfo->rel_startup_cost = -1;
4704  fpinfo->rel_total_cost = -1;
4705 
4706  /*
4707  * Set the string describing this join relation to be used in EXPLAIN
4708  * output of corresponding ForeignScan.
4709  */
4710  fpinfo->relation_name = makeStringInfo();
4711  appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4712  fpinfo_o->relation_name->data,
4713  get_jointype_name(fpinfo->jointype),
4714  fpinfo_i->relation_name->data);
4715 
4716  /*
4717  * Set the relation index. This is defined as the position of this
4718  * joinrel in the join_rel_list list plus the length of the rtable list.
4719  * Note that since this joinrel is at the end of the join_rel_list list
4720  * when we are called, we can get the position by list_length.
4721  */
4722  Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
4723  fpinfo->relation_index =
4725 
4726  return true;
4727 }
4728 
4729 static void
4731  Path *epq_path)
4732 {
4733  List *useful_pathkeys_list = NIL; /* List of all pathkeys */
4734  ListCell *lc;
4735 
4736  useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4737 
4738  /* Create one path for each set of pathkeys we found above. */
4739  foreach(lc, useful_pathkeys_list)
4740  {
4741  double rows;
4742  int width;
4743  Cost startup_cost;
4744  Cost total_cost;
4745  List *useful_pathkeys = lfirst(lc);
4746  Path *sorted_epq_path;
4747 
4748  estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4749  &rows, &width, &startup_cost, &total_cost);
4750 
4751  /*
4752  * The EPQ path must be at least as well sorted as the path itself,
4753  * in case it gets used as input to a mergejoin.
4754  */
4755  sorted_epq_path = epq_path;
4756  if (sorted_epq_path != NULL &&
4757  !pathkeys_contained_in(useful_pathkeys,
4758  sorted_epq_path->pathkeys))
4759  sorted_epq_path = (Path *)
4760  create_sort_path(root,
4761  rel,
4762  sorted_epq_path,
4763  useful_pathkeys,
4764  -1.0);
4765 
4766  add_path(rel, (Path *)
4767  create_foreignscan_path(root, rel,
4768  NULL,
4769  rows,
4770  startup_cost,
4771  total_cost,
4772  useful_pathkeys,
4773  NULL,
4774  sorted_epq_path,
4775  NIL));
4776  }
4777 }
4778 
4779 /*
4780  * Parse options from foreign server and apply them to fpinfo.
4781  *
4782  * New options might also require tweaking merge_fdw_options().
4783  */
4784 static void
4786 {
4787  ListCell *lc;
4788 
4789  foreach(lc, fpinfo->server->options)
4790  {
4791  DefElem *def = (DefElem *) lfirst(lc);
4792 
4793  if (strcmp(def->defname, "use_remote_estimate") == 0)
4794  fpinfo->use_remote_estimate = defGetBoolean(def);
4795  else if (strcmp(def->defname, "fdw_startup_cost") == 0)
4796  fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
4797  else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
4798  fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
4799  else if (strcmp(def->defname, "extensions") == 0)
4800  fpinfo->shippable_extensions =
4801  ExtractExtensionList(defGetString(def), false);
4802  else if (strcmp(def->defname, "fetch_size") == 0)
4803  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4804  }
4805 }
4806 
4807 /*
4808  * Parse options from foreign table and apply them to fpinfo.
4809  *
4810  * New options might also require tweaking merge_fdw_options().
4811  */
4812 static void
4814 {
4815  ListCell *lc;
4816 
4817  foreach(lc, fpinfo->table->options)
4818  {
4819  DefElem *def = (DefElem *) lfirst(lc);
4820 
4821  if (strcmp(def->defname, "use_remote_estimate") == 0)
4822  fpinfo->use_remote_estimate = defGetBoolean(def);
4823  else if (strcmp(def->defname, "fetch_size") == 0)
4824  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4825  }
4826 }
4827 
4828 /*
4829  * Merge FDW options from input relations into a new set of options for a join
4830  * or an upper rel.
4831  *
4832  * For a join relation, FDW-specific information about the inner and outer
4833  * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
4834  * fpinfo_o provides the information for the input relation; fpinfo_i is
4835  * expected to NULL.
4836  */
4837 static void
4839  const PgFdwRelationInfo *fpinfo_o,
4840  const PgFdwRelationInfo *fpinfo_i)
4841 {
4842  /* We must always have fpinfo_o. */
4843  Assert(fpinfo_o);
4844 
4845  /* fpinfo_i may be NULL, but if present the servers must both match. */
4846  Assert(!fpinfo_i ||
4847  fpinfo_i->server->serverid == fpinfo_o->server->serverid);
4848 
4849  /*
4850  * Copy the server specific FDW options. (For a join, both relations come
4851  * from the same server, so the server options should have the same value
4852  * for both relations.)
4853  */
4854  fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
4855  fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
4856  fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
4857  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
4858  fpinfo->fetch_size = fpinfo_o->fetch_size;
4859 
4860  /* Merge the table level options from either side of the join. */
4861  if (fpinfo_i)
4862  {
4863  /*
4864  * We'll prefer to use remote estimates for this join if any table
4865  * from either side of the join is using remote estimates. This is
4866  * most likely going to be preferred since they're already willing to
4867  * pay the price of a round trip to get the remote EXPLAIN. In any
4868  * case it's not entirely clear how we might otherwise handle this
4869  * best.
4870  */
4871  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
4872  fpinfo_i->use_remote_estimate;
4873 
4874  /*
4875  * Set fetch size to maximum of the joining sides, since we are
4876  * expecting the rows returned by the join to be proportional to the
4877  * relation sizes.
4878  */
4879  fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
4880  }
4881 }
4882 
4883 /*
4884  * postgresGetForeignJoinPaths
4885  * Add possible ForeignPath to joinrel, if join is safe to push down.
4886  */
4887 static void
4889  RelOptInfo *joinrel,
4890  RelOptInfo *outerrel,
4891  RelOptInfo *innerrel,
4892  JoinType jointype,
4893  JoinPathExtraData *extra)
4894 {
4895  PgFdwRelationInfo *fpinfo;
4896  ForeignPath *joinpath;
4897  double rows;
4898  int width;
4899  Cost startup_cost;
4900  Cost total_cost;
4901  Path *epq_path; /* Path to create plan to be executed when
4902  * EvalPlanQual gets triggered. */
4903 
4904  /*
4905  * Skip if this join combination has been considered already.
4906  */
4907  if (joinrel->fdw_private)
4908  return;
4909 
4910  /*
4911  * Create unfinished PgFdwRelationInfo entry which is used to indicate
4912  * that the join relation is already considered, so that we won't waste
4913  * time in judging safety of join pushdown and adding the same paths again
4914  * if found safe. Once we know that this join can be pushed down, we fill
4915  * the entry.
4916  */
4917  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4918  fpinfo->pushdown_safe = false;
4919  joinrel->fdw_private = fpinfo;
4920  /* attrs_used is only for base relations. */
4921  fpinfo->attrs_used = NULL;
4922 
4923  /*
4924  * If there is a possibility that EvalPlanQual will be executed, we need
4925  * to be able to reconstruct the row using scans of the base relations.
4926  * GetExistingLocalJoinPath will find a suitable path for this purpose in
4927  * the path list of the joinrel, if one exists. We must be careful to
4928  * call it before adding any ForeignPath, since the ForeignPath might
4929  * dominate the only suitable local path available. We also do it before
4930  * calling foreign_join_ok(), since that function updates fpinfo and marks
4931  * it as pushable if the join is found to be pushable.
4932  */
4933  if (root->parse->commandType == CMD_DELETE ||
4934  root->parse->commandType == CMD_UPDATE ||
4935  root->rowMarks)
4936  {
4937  epq_path = GetExistingLocalJoinPath(joinrel);
4938  if (!epq_path)
4939  {
4940  elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
4941  return;
4942  }
4943  }
4944  else
4945  epq_path = NULL;
4946 
4947  if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
4948  {
4949  /* Free path required for EPQ if we copied one; we don't need it now */
4950  if (epq_path)
4951  pfree(epq_path);
4952  return;
4953  }
4954 
4955  /*
4956  * Compute the selectivity and cost of the local_conds, so we don't have
4957  * to do it over again for each path. The best we can do for these
4958  * conditions is to estimate selectivity on the basis of local statistics.
4959  * The local conditions are applied after the join has been computed on
4960  * the remote side like quals in WHERE clause, so pass jointype as
4961  * JOIN_INNER.
4962  */
4963  fpinfo->local_conds_sel = clauselist_selectivity(root,
4964  fpinfo->local_conds,
4965  0,
4966  JOIN_INNER,
4967  NULL);
4968  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
4969 
4970  /*
4971  * If we are going to estimate costs locally, estimate the join clause
4972  * selectivity here while we have special join info.
4973  */
4974  if (!fpinfo->use_remote_estimate)
4975  fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
4976  0, fpinfo->jointype,
4977  extra->sjinfo);
4978 
4979  /* Estimate costs for bare join relation */
4980  estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
4981  &width, &startup_cost, &total_cost);
4982  /* Now update this information in the joinrel */
4983  joinrel->rows = rows;
4984  joinrel->reltarget->width = width;
4985  fpinfo->rows = rows;
4986  fpinfo->width = width;
4987  fpinfo->startup_cost = startup_cost;
4988  fpinfo->total_cost = total_cost;
4989 
4990  /*
4991  * Create a new join path and add it to the joinrel which represents a
4992  * join between foreign tables.
4993  */
4994  joinpath = create_foreignscan_path(root,
4995  joinrel,
4996  NULL, /* default pathtarget */
4997  rows,
4998  startup_cost,
4999  total_cost,
5000  NIL, /* no pathkeys */
5001  NULL, /* no required_outer */
5002  epq_path,
5003  NIL); /* no fdw_private */
5004 
5005  /* Add generated path into joinrel by add_path(). */
5006  add_path(joinrel, (Path *) joinpath);
5007 
5008  /* Consider pathkeys for the join relation */
5009  add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5010 
5011  /* XXX Consider parameterized paths for the join relation */
5012 }
5013 
5014 /*
5015  * Assess whether the aggregation, grouping and having operations can be pushed
5016  * down to the foreign server. As a side effect, save information we obtain in
5017  * this function to PgFdwRelationInfo of the input relation.
5018  */
5019 static bool
5021 {
5022  Query *query = root->parse;
5023  PathTarget *grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
5024  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5025  PgFdwRelationInfo *ofpinfo;
5026  List *aggvars;
5027  ListCell *lc;
5028  int i;
5029  List *tlist = NIL;
5030 
5031  /* We currently don't support pushing Grouping Sets. */
5032  if (query->groupingSets)
5033  return false;
5034 
5035  /* Get the fpinfo of the underlying scan relation. */
5036  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5037 
5038  /*
5039  * If underlying scan relation has any local conditions, those conditions
5040  * are required to be applied before performing aggregation. Hence the
5041  * aggregate cannot be pushed down.
5042  */
5043  if (ofpinfo->local_conds)
5044  return false;
5045 
5046  /*
5047  * Examine grouping expressions, as well as other expressions we'd need to
5048  * compute, and check whether they are safe to push down to the foreign
5049  * server. All GROUP BY expressions will be part of the grouping target
5050  * and thus there is no need to search for them separately. Add grouping
5051  * expressions into target list which will be passed to foreign server.
5052  */
5053  i = 0;
5054  foreach(lc, grouping_target->exprs)
5055  {
5056  Expr *expr = (Expr *) lfirst(lc);
5057  Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
5058  ListCell *l;
5059 
5060  /* Check whether this expression is part of GROUP BY clause */
5061  if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5062  {
5063  TargetEntry *tle;
5064 
5065  /*
5066  * If any GROUP BY expression is not shippable, then we cannot
5067  * push down aggregation to the foreign server.
5068  */
5069  if (!is_foreign_expr(root, grouped_rel, expr))
5070  return false;
5071 
5072  /*
5073  * Pushable, so add to tlist. We need to create a TLE for this
5074  * expression and apply the sortgroupref to it. We cannot use
5075  * add_to_flat_tlist() here because that avoids making duplicate
5076  * entries in the tlist. If there are duplicate entries with
5077  * distinct sortgrouprefs, we have to duplicate that situation in
5078  * the output tlist.
5079  */
5080  tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
5081  tle->ressortgroupref = sgref;
5082  tlist = lappend(tlist, tle);
5083  }
5084  else
5085  {
5086  /*
5087  * Non-grouping expression we need to compute. Is it shippable?
5088  */
5089  if (is_foreign_expr(root, grouped_rel, expr))
5090  {
5091  /* Yes, so add to tlist as-is; OK to suppress duplicates */
5092  tlist = add_to_flat_tlist(tlist, list_make1(expr));
5093  }
5094  else
5095  {
5096  /* Not pushable as a whole; extract its Vars and aggregates */
5097  aggvars = pull_var_clause((Node *) expr,
5099 
5100  /*
5101  * If any aggregate expression is not shippable, then we
5102  * cannot push down aggregation to the foreign server.
5103  */
5104  if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
5105  return false;
5106 
5107  /*
5108  * Add aggregates, if any, into the targetlist. Plain Vars
5109  * outside an aggregate can be ignored, because they should be
5110  * either same as some GROUP BY column or part of some GROUP
5111  * BY expression. In either case, they are already part of
5112  * the targetlist and thus no need to add them again. In fact
5113  * including plain Vars in the tlist when they do not match a
5114  * GROUP BY column would cause the foreign server to complain
5115  * that the shipped query is invalid.
5116  */
5117  foreach(l, aggvars)
5118  {
5119  Expr *expr = (Expr *) lfirst(l);
5120 
5121  if (IsA(expr, Aggref))
5122  tlist = add_to_flat_tlist(tlist, list_make1(expr));
5123  }
5124  }
5125  }
5126 
5127  i++;
5128  }
5129 
5130  /*
5131  * Classify the pushable and non-pushable HAVING clauses and save them in
5132  * remote_conds and local_conds of the grouped rel's fpinfo.
5133  */
5134  if (root->hasHavingQual && query->havingQual)
5135  {
5136  ListCell *lc;
5137 
5138  foreach(lc, (List *) query->havingQual)
5139  {
5140  Expr *expr = (Expr *) lfirst(lc);
5141  RestrictInfo *rinfo;
5142 
5143  /*
5144  * Currently, the core code doesn't wrap havingQuals in
5145  * RestrictInfos, so we must make our own.
5146  */
5147  Assert(!IsA(expr, RestrictInfo));
5148  rinfo = make_restrictinfo(expr,
5149  true,
5150  false,
5151  false,
5152  root->qual_security_level,
5153  grouped_rel->relids,
5154  NULL,
5155  NULL);
5156  if (is_foreign_expr(root, grouped_rel, expr))
5157  fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5158  else
5159  fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5160  }
5161  }
5162 
5163  /*
5164  * If there are any local conditions, pull Vars and aggregates from it and
5165  * check whether they are safe to pushdown or not.
5166  */
5167  if (fpinfo->local_conds)
5168  {
5169  List *aggvars = NIL;
5170  ListCell *lc;
5171 
5172  foreach(lc, fpinfo->local_conds)
5173  {
5174  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5175 
5176  aggvars = list_concat(aggvars,
5177  pull_var_clause((Node *) rinfo->clause,
5179  }
5180 
5181  foreach(lc, aggvars)
5182  {
5183  Expr *expr = (Expr *) lfirst(lc);
5184 
5185  /*
5186  * If aggregates within local conditions are not safe to push
5187  * down, then we cannot push down the query. Vars are already
5188  * part of GROUP BY clause which are checked above, so no need to
5189  * access them again here.
5190  */
5191  if (IsA(expr, Aggref))
5192  {
5193  if (!is_foreign_expr(root, grouped_rel, expr))
5194  return false;
5195 
5196  tlist = add_to_flat_tlist(tlist, list_make1(expr));
5197  }
5198  }
5199  }
5200 
5201  /* Store generated targetlist */
5202  fpinfo->grouped_tlist = tlist;
5203 
5204  /* Safe to pushdown */
5205  fpinfo->pushdown_safe = true;
5206 
5207  /*
5208  * Set cached relation costs to some negative value, so that we can detect
5209  * when they are set to some sensible costs, during one (usually the
5210  * first) of the calls to estimate_path_cost_size().
5211  */
5212  fpinfo->rel_startup_cost = -1;
5213  fpinfo->rel_total_cost = -1;
5214 
5215  /*
5216  * Set the string describing this grouped relation to be used in EXPLAIN
5217  * output of corresponding ForeignScan.
5218  */
5219  fpinfo->relation_name = makeStringInfo();
5220  appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
5221  ofpinfo->relation_name->data);
5222 
5223  return true;
5224 }
5225 
5226 /*
5227  * postgresGetForeignUpperPaths
5228  * Add paths for post-join operations like aggregation, grouping etc. if
5229  * corresponding operations are safe to push down.
5230  *
5231  * Right now, we only support aggregate, grouping and having clause pushdown.
5232  */
5233 static void
5235  RelOptInfo *input_rel, RelOptInfo *output_rel)
5236 {
5237  PgFdwRelationInfo *fpinfo;
5238 
5239  /*
5240  * If input rel is not safe to pushdown, then simply return as we cannot
5241  * perform any post-join operations on the foreign server.
5242  */
5243  if (!input_rel->fdw_private ||
5244  !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
5245  return;
5246 
5247  /* Ignore stages we don't support; and skip any duplicate calls. */
5248  if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
5249  return;
5250 
5251  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5252  fpinfo->pushdown_safe = false;
5253  output_rel->fdw_private = fpinfo;
5254 
5255  add_foreign_grouping_paths(root, input_rel, output_rel);
5256 }
5257 
5258 /*
5259  * add_foreign_grouping_paths
5260  * Add foreign path for grouping and/or aggregation.
5261  *
5262  * Given input_rel represents the underlying scan. The paths are added to the
5263  * given grouped_rel.
5264  */
5265 static void
5267  RelOptInfo *grouped_rel)
5268 {
5269  Query *parse = root->parse;
5270  PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5271  PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
5272  ForeignPath *grouppath;
5273  PathTarget *grouping_target;
5274  double rows;
5275  int width;
5276  Cost startup_cost;
5277  Cost total_cost;
5278 
5279  /* Nothing to be done, if there is no grouping or aggregation required. */
5280  if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
5281  !root->hasHavingQual)
5282  return;
5283 
5284  grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
5285 
5286  /* save the input_rel as outerrel in fpinfo */
5287  fpinfo->outerrel = input_rel;
5288 
5289  /*
5290  * Copy foreign table, foreign server, user mapping, FDW options etc.
5291  * details from the input relation's fpinfo.
5292  */
5293  fpinfo->table = ifpinfo->table;
5294  fpinfo->server = ifpinfo->server;
5295  fpinfo->user = ifpinfo->user;
5296  merge_fdw_options(fpinfo, ifpinfo, NULL);
5297 
5298  /* Assess if it is safe to push down aggregation and grouping. */
5299  if (!foreign_grouping_ok(root, grouped_rel))
5300  return;
5301 
5302  /* Estimate the cost of push down */
5303  estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
5304  &width, &startup_cost, &total_cost);
5305 
5306  /* Now update this information in the fpinfo */
5307  fpinfo->rows = rows;
5308  fpinfo->width = width;
5309  fpinfo->startup_cost = startup_cost;
5310  fpinfo->total_cost = total_cost;
5311 
5312  /* Create and add foreign path to the grouping relation. */
5313  grouppath = create_foreignscan_path(root,
5314  grouped_rel,
5315  grouping_target,
5316  rows,
5317  startup_cost,
5318  total_cost,
5319  NIL, /* no pathkeys */
5320  NULL, /* no required_outer */
5321  NULL,
5322  NIL); /* no fdw_private */
5323 
5324  /* Add generated path into grouped_rel by add_path(). */
5325  add_path(grouped_rel, (Path *) grouppath);
5326 }
5327 
5328 /*
5329  * Create a tuple from the specified row of the PGresult.
5330  *
5331  * rel is the local representation of the foreign table, attinmeta is
5332  * conversion data for the rel's tupdesc, and retrieved_attrs is an
5333  * integer list of the table column numbers present in the PGresult.
5334  * temp_context is a working context that can be reset after each tuple.
5335  */
5336 static HeapTuple
5338  int row,
5339  Relation rel,
5342  ForeignScanState *fsstate,
5343  MemoryContext temp_context)
5344 {
5345  HeapTuple tuple;
5347  Datum *values;
5348  bool *nulls;
5349  ItemPointer ctid = NULL;
5350  Oid oid = InvalidOid;
5351  ConversionLocation errpos;
5352  ErrorContextCallback errcallback;
5353  MemoryContext oldcontext;
5354  ListCell *lc;
5355  int j;
5356 
5357  Assert(row < PQntuples(res));
5358 
5359  /*
5360  * Do the following work in a temp context that we reset after each tuple.
5361  * This cleans up not only the data we have direct access to, but any
5362  * cruft the I/O functions might leak.
5363  */
5364  oldcontext = MemoryContextSwitchTo(temp_context);
5365 
5366  if (rel)
5367  tupdesc = RelationGetDescr(rel);
5368  else
5369  {
5370  Assert(fsstate);
5371  tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
5372  }
5373 
5374  values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
5375  nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
5376  /* Initialize to nulls for any columns not present in result */
5377  memset(nulls, true, tupdesc->natts * sizeof(bool));
5378 
5379  /*
5380  * Set up and install callback to report where conversion error occurs.
5381  */
5382  errpos.rel = rel;
5383  errpos.cur_attno = 0;
5384  errpos.fsstate = fsstate;
5385  errcallback.callback = conversion_error_callback;
5386  errcallback.arg = (void *) &errpos;
5387  errcallback.previous = error_context_stack;
5388  error_context_stack = &errcallback;
5389 
5390  /*
5391  * i indexes columns in the relation, j indexes columns in the PGresult.
5392  */
5393  j = 0;
5394  foreach(lc, retrieved_attrs)
5395  {
5396  int i = lfirst_int(lc);
5397  char *valstr;
5398 
5399  /* fetch next column's textual value */
5400  if (PQgetisnull(res, row, j))
5401  valstr = NULL;
5402  else
5403  valstr = PQgetvalue(res, row, j);
5404 
5405  /*
5406  * convert value to internal representation
5407  *
5408  * Note: we ignore system columns other than ctid and oid in result
5409  */
5410  errpos.cur_attno = i;
5411  if (i > 0)
5412  {
5413  /* ordinary column */
5414  Assert(i <= tupdesc->natts);
5415  nulls[i - 1] = (valstr == NULL);
5416  /* Apply the input function even to nulls, to support domains */
5417  values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
5418  valstr,
5419  attinmeta->attioparams[i - 1],
5420  attinmeta->atttypmods[i - 1]);
5421  }
5422  else if (i == SelfItemPointerAttributeNumber)
5423  {
5424  /* ctid */
5425  if (valstr != NULL)
5426  {
5427  Datum datum;
5428 
5429  datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
5430  ctid = (ItemPointer) DatumGetPointer(datum);
5431  }
5432  }
5433  else if (i == ObjectIdAttributeNumber)
5434  {
5435  /* oid */
5436  if (valstr != NULL)
5437  {
5438  Datum datum;
5439 
5440  datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
5441  oid = DatumGetObjectId(datum);
5442  }
5443  }
5444  errpos.cur_attno = 0;
5445 
5446  j++;
5447  }
5448 
5449  /* Uninstall error context callback. */
5450  error_context_stack = errcallback.previous;
5451 
5452  /*
5453  * Check we got the expected number of columns. Note: j == 0 and
5454  * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
5455  */
5456  if (j > 0 && j != PQnfields(res))
5457  elog(ERROR, "remote query result does not match the foreign table");
5458 
5459  /*
5460  * Build the result tuple in caller's memory context.
5461  */
5462  MemoryContextSwitchTo(oldcontext);
5463 
5464  tuple = heap_form_tuple(tupdesc, values, nulls);
5465 
5466  /*
5467  * If we have a CTID to return, install it in both t_self and t_ctid.
5468  * t_self is the normal place, but if the tuple is converted to a
5469  * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
5470  * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
5471  */
5472  if (ctid)
5473  tuple->t_self = tuple->t_data->t_ctid = *ctid;
5474 
5475  /*
5476  * Stomp on the xmin, xmax, and cmin fields from the tuple created by
5477  * heap_form_tuple. heap_form_tuple actually creates the tuple with
5478  * DatumTupleFields, not HeapTupleFields, but the executor expects
5479  * HeapTupleFields and will happily extract system columns on that
5480  * assumption. If we don't do this then, for example, the tuple length
5481  * ends up in the xmin field, which isn't what we want.
5482  */
5486 
5487  /*
5488  * If we have an OID to return, install it.
5489  */
5490  if (OidIsValid(oid))
5491  HeapTupleSetOid(tuple, oid);
5492 
5493  /* Clean up */
5494  MemoryContextReset(temp_context);
5495 
5496  return tuple;
5497 }
5498 
5499 /*
5500  * Callback function which is called when error occurs during column value
5501  * conversion. Print names of column and relation.
5502  */
5503 static void
5505 {
5506  const char *attname = NULL;
5507  const char *relname = NULL;
5508  bool is_wholerow = false;
5509  ConversionLocation *errpos = (ConversionLocation *) arg;
5510 
5511  if (errpos->rel)
5512  {
5513  /* error occurred in a scan against a foreign table */
5515  Form_pg_attribute attr = TupleDescAttr(tupdesc, errpos->cur_attno - 1);
5516 
5517  if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
5518  attname = NameStr(attr->attname);
5519  else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
5520  attname = "ctid";
5521  else if (errpos->cur_attno == ObjectIdAttributeNumber)
5522  attname = "oid";
5523 
5524  relname = RelationGetRelationName(errpos->rel);
5525  }
5526  else
5527  {
5528  /* error occurred in a scan against a foreign join */
5529  ForeignScanState *fsstate = errpos->fsstate;
5530  ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
5531  EState *estate = fsstate->ss.ps.state;
5532  TargetEntry *tle;
5533 
5534  tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
5535  errpos->cur_attno - 1);
5536 
5537  /*
5538  * Target list can have Vars and expressions. For Vars, we can get
5539  * its relation, however for expressions we can't. Thus for
5540  * expressions, just show generic context message.
5541  */
5542  if (IsA(tle->expr, Var))
5543  {
5544  RangeTblEntry *rte;
5545  Var *var = (Var *) tle->expr;
5546 
5547  rte = rt_fetch(var->varno, estate->es_range_table);
5548 
5549  if (var->varattno == 0)
5550  is_wholerow = true;
5551  else
5552  attname = get_attname(rte->relid, var->varattno, false);
5553 
5554  relname = get_rel_name(rte->relid);
5555  }
5556  else
5557  errcontext("processing expression at position %d in select list",
5558  errpos->cur_attno);
5559  }
5560 
5561  if (relname)
5562  {
5563  if (is_wholerow)
5564  errcontext("whole-row reference to foreign table \"%s\"", relname);
5565  else if (attname)
5566  errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
5567  }
5568 }
5569 
5570 /*
5571  * Find an equivalence class member expression, all of whose Vars, come from
5572  * the indicated relation.
5573  */
5574 Expr *
5576 {
5577  ListCell *lc_em;
5578 
5579  foreach(lc_em, ec->ec_members)
5580  {
5581  EquivalenceMember *em = lfirst(lc_em);
5582 
5583  if (bms_is_subset(em->em_relids, rel->relids))
5584  {
5585  /*
5586  * If there is more than one equivalence member whose Vars are
5587  * taken entirely from this relation, we'll be content to choose
5588  * any one of those.
5589  */
5590  return em->em_expr;
5591  }
5592  }
5593 
5594  /* We didn't find any suitable equivalence class expression */
5595  return NULL;
5596 }
GetForeignPlan_function GetForeignPlan
Definition: fdwapi.h:182
bool has_eclass_joins
Definition: relation.h:651
Value * makeString(char *str)
Definition: value.c:53
#define list_make3(x1, x2, x3)
Definition: pg_list.h:141
BeginForeignScan_function BeginForeignScan
Definition: fdwapi.h:183
GetForeignUpperPaths_function GetForeignUpperPaths
Definition: fdwapi.h:197
ExecForeignDelete_function ExecForeignDelete
Definition: fdwapi.h:205
#define PG_RETURN_POINTER(x)
Definition: fmgr.h:321
EndDirectModify_function EndDirectModify
Definition: fdwapi.h:211
#define NIL
Definition: pg_list.h:69
List * rowMarks
Definition: relation.h:256
ScanState ss
Definition: execnodes.h:1575
void deparseUpdateSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1715
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:356
Datum postgres_fdw_handler(PG_FUNCTION_ARGS)
Definition: postgres_fdw.c:443
static List * get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:792
Definition: fmgr.h:56
List * qual
Definition: plannodes.h:145
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2732
double estimate_num_groups(PlannerInfo *root, List *groupExprs, double input_rows, List **pgset)
Definition: selfuncs.c:3398
#define SizeofHeapTupleHeader
Definition: htup_details.h:175
FdwModifyPrivateIndex
Definition: postgres_fdw.c:89
Relation ri_RelationDesc
Definition: execnodes.h:368
Plan plan
Definition: plannodes.h:330
#define IsA(nodeptr, _type_)
Definition: nodes.h:564
static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es)
Query * parse
Definition: relation.h:155
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2650
void add_path(RelOptInfo *parent_rel, Path *new_path)
Definition: pathnode.c:422