PostgreSQL Source Code  git master
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "commands/defrem.h"
21 #include "commands/explain.h"
22 #include "commands/vacuum.h"
23 #include "foreign/fdwapi.h"
24 #include "funcapi.h"
25 #include "miscadmin.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/cost.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/pathnode.h"
31 #include "optimizer/paths.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "optimizer/tlist.h"
36 #include "parser/parsetree.h"
37 #include "utils/builtins.h"
38 #include "utils/guc.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/rel.h"
42 #include "utils/sampling.h"
43 #include "utils/selfuncs.h"
44 
46 
47 /* Default CPU cost to start up a foreign query. */
48 #define DEFAULT_FDW_STARTUP_COST 100.0
49 
50 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
51 #define DEFAULT_FDW_TUPLE_COST 0.01
52 
53 /* If no remote estimates, assume a sort costs 20% extra */
54 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
55 
56 /*
57  * Indexes of FDW-private information stored in fdw_private lists.
58  *
59  * These items are indexed with the enum FdwScanPrivateIndex, so an item
60  * can be fetched with list_nth(). For example, to get the SELECT statement:
61  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
62  */
64 {
65  /* SQL statement to execute remotely (as a String node) */
67  /* Integer list of attribute numbers retrieved by the SELECT */
69  /* Integer representing the desired fetch_size */
71 
72  /*
73  * String describing join i.e. names of relations being joined and types
74  * of join, added when the scan is join
75  */
77 };
78 
79 /*
80  * Similarly, this enum describes what's kept in the fdw_private list for
81  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
82  *
83  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
84  * 2) Integer list of target attribute numbers for INSERT/UPDATE
85  * (NIL for a DELETE)
86  * 3) Boolean flag showing if the remote query has a RETURNING clause
87  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
88  */
90 {
91  /* SQL statement to execute remotely (as a String node) */
93  /* Integer list of target attribute numbers for INSERT/UPDATE */
95  /* has-returning flag (as an integer Value node) */
97  /* Integer list of attribute numbers retrieved by RETURNING */
99 };
100 
101 /*
102  * Similarly, this enum describes what's kept in the fdw_private list for
103  * a ForeignScan node that modifies a foreign table directly. We store:
104  *
105  * 1) UPDATE/DELETE statement text to be sent to the remote server
106  * 2) Boolean flag showing if the remote query has a RETURNING clause
107  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
108  * 4) Boolean flag showing if we set the command es_processed
109  */
111 {
112  /* SQL statement to execute remotely (as a String node) */
114  /* has-returning flag (as an integer Value node) */
116  /* Integer list of attribute numbers retrieved by RETURNING */
118  /* set-processed flag (as an integer Value node) */
120 };
121 
122 /*
123  * Execution state of a foreign scan using postgres_fdw.
124  */
125 typedef struct PgFdwScanState
126 {
127  Relation rel; /* relcache entry for the foreign table. NULL
128  * for a foreign join scan. */
129  TupleDesc tupdesc; /* tuple descriptor of scan */
130  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
131 
132  /* extracted fdw_private data */
133  char *query; /* text of SELECT command */
134  List *retrieved_attrs; /* list of retrieved attribute numbers */
135 
136  /* for remote query execution */
137  PGconn *conn; /* connection for the scan */
138  unsigned int cursor_number; /* quasi-unique ID for my cursor */
139  bool cursor_exists; /* have we created the cursor? */
140  int numParams; /* number of parameters passed to query */
141  FmgrInfo *param_flinfo; /* output conversion functions for them */
142  List *param_exprs; /* executable expressions for param values */
143  const char **param_values; /* textual values of query parameters */
144 
145  /* for storing result tuples */
146  HeapTuple *tuples; /* array of currently-retrieved tuples */
147  int num_tuples; /* # of tuples in array */
148  int next_tuple; /* index of next one to return */
149 
150  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
151  int fetch_ct_2; /* Min(# of fetches done, 2) */
152  bool eof_reached; /* true if last fetch reached EOF */
153 
154  /* working memory contexts */
155  MemoryContext batch_cxt; /* context holding current batch of tuples */
156  MemoryContext temp_cxt; /* context for per-tuple temporary data */
157 
158  int fetch_size; /* number of tuples per fetch */
160 
161 /*
162  * Execution state of a foreign insert/update/delete operation.
163  */
164 typedef struct PgFdwModifyState
165 {
166  Relation rel; /* relcache entry for the foreign table */
167  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
168 
169  /* for remote query execution */
170  PGconn *conn; /* connection for the scan */
171  char *p_name; /* name of prepared statement, if created */
172 
173  /* extracted fdw_private data */
174  char *query; /* text of INSERT/UPDATE/DELETE command */
175  List *target_attrs; /* list of target attribute numbers */
176  bool has_returning; /* is there a RETURNING clause? */
177  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
178 
179  /* info about parameters for prepared statement */
180  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
181  int p_nums; /* number of parameters to transmit */
182  FmgrInfo *p_flinfo; /* output conversion functions for them */
183 
184  /* working memory context */
185  MemoryContext temp_cxt; /* context for per-tuple temporary data */
187 
188 /*
189  * Execution state of a foreign scan that modifies a foreign table directly.
190  */
192 {
193  Relation rel; /* relcache entry for the foreign table */
194  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
195 
196  /* extracted fdw_private data */
197  char *query; /* text of UPDATE/DELETE command */
198  bool has_returning; /* is there a RETURNING clause? */
199  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
200  bool set_processed; /* do we set the command es_processed? */
201 
202  /* for remote query execution */
203  PGconn *conn; /* connection for the update */
204  int numParams; /* number of parameters passed to query */
205  FmgrInfo *param_flinfo; /* output conversion functions for them */
206  List *param_exprs; /* executable expressions for param values */
207  const char **param_values; /* textual values of query parameters */
208 
209  /* for storing result tuples */
210  PGresult *result; /* result for query */
211  int num_tuples; /* # of result tuples */
212  int next_tuple; /* index of next one to return */
213  Relation resultRel; /* relcache entry for the target relation */
214  AttrNumber *attnoMap; /* array of attnums of input user columns */
215  AttrNumber ctidAttno; /* attnum of input ctid column */
216  AttrNumber oidAttno; /* attnum of input oid column */
217  bool hasSystemCols; /* are there system columns of resultRel? */
218 
219  /* working memory context */
220  MemoryContext temp_cxt; /* context for per-tuple temporary data */
222 
223 /*
224  * Workspace for analyzing a foreign table.
225  */
226 typedef struct PgFdwAnalyzeState
227 {
228  Relation rel; /* relcache entry for the foreign table */
229  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
230  List *retrieved_attrs; /* attr numbers retrieved by query */
231 
232  /* collected sample rows */
233  HeapTuple *rows; /* array of size targrows */
234  int targrows; /* target # of sample rows */
235  int numrows; /* # of sample rows collected */
236 
237  /* for random sampling */
238  double samplerows; /* # of rows fetched */
239  double rowstoskip; /* # of rows to skip before next sample */
240  ReservoirStateData rstate; /* state for reservoir sampling */
241 
242  /* working memory contexts */
243  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
244  MemoryContext temp_cxt; /* context for per-tuple temporary data */
246 
247 /*
248  * Identify the attribute where data conversion fails.
249  */
250 typedef struct ConversionLocation
251 {
252  Relation rel; /* foreign table's relcache entry. */
253  AttrNumber cur_attno; /* attribute number being processed, or 0 */
254 
255  /*
256  * In case of foreign join push down, fdw_scan_tlist is used to identify
257  * the Var node corresponding to the error location and
258  * fsstate->ss.ps.state gives access to the RTEs of corresponding relation
259  * to get the relation name and attribute name.
260  */
263 
264 /* Callback argument for ec_member_matches_foreign */
265 typedef struct
266 {
267  Expr *current; /* current expr, or NULL if not yet found */
268  List *already_used; /* expressions already dealt with */
270 
271 /*
272  * SQL functions
273  */
275 
276 /*
277  * FDW callback routines
278  */
279 static void postgresGetForeignRelSize(PlannerInfo *root,
280  RelOptInfo *baserel,
281  Oid foreigntableid);
282 static void postgresGetForeignPaths(PlannerInfo *root,
283  RelOptInfo *baserel,
284  Oid foreigntableid);
286  RelOptInfo *foreignrel,
287  Oid foreigntableid,
288  ForeignPath *best_path,
289  List *tlist,
290  List *scan_clauses,
291  Plan *outer_plan);
292 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
295 static void postgresEndForeignScan(ForeignScanState *node);
296 static void postgresAddForeignUpdateTargets(Query *parsetree,
297  RangeTblEntry *target_rte,
298  Relation target_relation);
300  ModifyTable *plan,
301  Index resultRelation,
302  int subplan_index);
303 static void postgresBeginForeignModify(ModifyTableState *mtstate,
304  ResultRelInfo *resultRelInfo,
305  List *fdw_private,
306  int subplan_index,
307  int eflags);
309  ResultRelInfo *resultRelInfo,
310  TupleTableSlot *slot,
311  TupleTableSlot *planSlot);
313  ResultRelInfo *resultRelInfo,
314  TupleTableSlot *slot,
315  TupleTableSlot *planSlot);
317  ResultRelInfo *resultRelInfo,
318  TupleTableSlot *slot,
319  TupleTableSlot *planSlot);
320 static void postgresEndForeignModify(EState *estate,
321  ResultRelInfo *resultRelInfo);
322 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
323  ResultRelInfo *resultRelInfo);
324 static void postgresEndForeignInsert(EState *estate,
325  ResultRelInfo *resultRelInfo);
327 static bool postgresPlanDirectModify(PlannerInfo *root,
328  ModifyTable *plan,
329  Index resultRelation,
330  int subplan_index);
331 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
333 static void postgresEndDirectModify(ForeignScanState *node);
335  ExplainState *es);
337  ResultRelInfo *rinfo,
338  List *fdw_private,
339  int subplan_index,
340  ExplainState *es);
342  ExplainState *es);
343 static bool postgresAnalyzeForeignTable(Relation relation,
344  AcquireSampleRowsFunc *func,
345  BlockNumber *totalpages);
347  Oid serverOid);
348 static void postgresGetForeignJoinPaths(PlannerInfo *root,
349  RelOptInfo *joinrel,
350  RelOptInfo *outerrel,
351  RelOptInfo *innerrel,
352  JoinType jointype,
353  JoinPathExtraData *extra);
355  TupleTableSlot *slot);
356 static void postgresGetForeignUpperPaths(PlannerInfo *root,
357  UpperRelationKind stage,
358  RelOptInfo *input_rel,
359  RelOptInfo *output_rel,
360  void *extra);
361 
362 /*
363  * Helper functions
364  */
365 static void estimate_path_cost_size(PlannerInfo *root,
366  RelOptInfo *foreignrel,
367  List *param_join_conds,
368  List *pathkeys,
369  double *p_rows, int *p_width,
370  Cost *p_startup_cost, Cost *p_total_cost);
371 static void get_remote_estimate(const char *sql,
372  PGconn *conn,
373  double *rows,
374  int *width,
375  Cost *startup_cost,
376  Cost *total_cost);
379  void *arg);
380 static void create_cursor(ForeignScanState *node);
381 static void fetch_more_data(ForeignScanState *node);
382 static void close_cursor(PGconn *conn, unsigned int cursor_number);
384  RangeTblEntry *rte,
385  ResultRelInfo *resultRelInfo,
386  CmdType operation,
387  Plan *subplan,
388  char *query,
389  List *target_attrs,
390  bool has_returning,
392 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
393 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
394  ItemPointer tupleid,
395  TupleTableSlot *slot);
396 static void store_returning_result(PgFdwModifyState *fmstate,
397  TupleTableSlot *slot, PGresult *res);
398 static void finish_foreign_modify(PgFdwModifyState *fmstate);
399 static List *build_remote_returning(Index rtindex, Relation rel,
400  List *returningList);
401 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
402 static void execute_dml_stmt(ForeignScanState *node);
404 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
405  List *fdw_scan_tlist,
406  Index rtindex);
408  TupleTableSlot *slot,
409  EState *estate);
410 static void prepare_query_params(PlanState *node,
411  List *fdw_exprs,
412  int numParams,
414  List **param_exprs,
415  const char ***param_values);
416 static void process_query_params(ExprContext *econtext,
418  List *param_exprs,
419  const char **param_values);
420 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
421  HeapTuple *rows, int targrows,
422  double *totalrows,
423  double *totaldeadrows);
424 static void analyze_row_processor(PGresult *res, int row,
425  PgFdwAnalyzeState *astate);
427  int row,
428  Relation rel,
431  ForeignScanState *fsstate,
432  MemoryContext temp_context);
433 static void conversion_error_callback(void *arg);
434 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
435  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
436  JoinPathExtraData *extra);
437 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
438  Node *havingQual);
440  RelOptInfo *rel);
443  Path *epq_path);
444 static void add_foreign_grouping_paths(PlannerInfo *root,
445  RelOptInfo *input_rel,
446  RelOptInfo *grouped_rel,
447  GroupPathExtraData *extra);
448 static void apply_server_options(PgFdwRelationInfo *fpinfo);
449 static void apply_table_options(PgFdwRelationInfo *fpinfo);
450 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
451  const PgFdwRelationInfo *fpinfo_o,
452  const PgFdwRelationInfo *fpinfo_i);
453 
454 
455 /*
456  * Foreign-data wrapper handler function: return a struct with pointers
457  * to my callback routines.
458  */
459 Datum
461 {
462  FdwRoutine *routine = makeNode(FdwRoutine);
463 
464  /* Functions for scanning foreign tables */
472 
473  /* Functions for updating foreign tables */
488 
489  /* Function for EvalPlanQual rechecks */
491  /* Support functions for EXPLAIN */
495 
496  /* Support functions for ANALYZE */
498 
499  /* Support functions for IMPORT FOREIGN SCHEMA */
501 
502  /* Support functions for join push-down */
504 
505  /* Support functions for upper relation push-down */
507 
508  PG_RETURN_POINTER(routine);
509 }
510 
511 /*
512  * postgresGetForeignRelSize
513  * Estimate # of rows and width of the result of the scan
514  *
515  * We should consider the effect of all baserestrictinfo clauses here, but
516  * not any join clauses.
517  */
518 static void
520  RelOptInfo *baserel,
521  Oid foreigntableid)
522 {
523  PgFdwRelationInfo *fpinfo;
524  ListCell *lc;
525  RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
526  const char *namespace;
527  const char *relname;
528  const char *refname;
529 
530  /*
531  * We use PgFdwRelationInfo to pass various information to subsequent
532  * functions.
533  */
534  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
535  baserel->fdw_private = (void *) fpinfo;
536 
537  /* Base foreign tables need to be pushed down always. */
538  fpinfo->pushdown_safe = true;
539 
540  /* Look up foreign-table catalog info. */
541  fpinfo->table = GetForeignTable(foreigntableid);
542  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
543 
544  /*
545  * Extract user-settable option values. Note that per-table setting of
546  * use_remote_estimate overrides per-server setting.
547  */
548  fpinfo->use_remote_estimate = false;
551  fpinfo->shippable_extensions = NIL;
552  fpinfo->fetch_size = 100;
553 
554  apply_server_options(fpinfo);
555  apply_table_options(fpinfo);
556 
557  /*
558  * If the table or the server is configured to use remote estimates,
559  * identify which user to do remote access as during planning. This
560  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
561  * permissions, the query would have failed at runtime anyway.
562  */
563  if (fpinfo->use_remote_estimate)
564  {
565  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
566 
567  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
568  }
569  else
570  fpinfo->user = NULL;
571 
572  /*
573  * Identify which baserestrictinfo clauses can be sent to the remote
574  * server and which can't.
575  */
576  classifyConditions(root, baserel, baserel->baserestrictinfo,
577  &fpinfo->remote_conds, &fpinfo->local_conds);
578 
579  /*
580  * Identify which attributes will need to be retrieved from the remote
581  * server. These include all attrs needed for joins or final output, plus
582  * all attrs used in the local_conds. (Note: if we end up using a
583  * parameterized scan, it's possible that some of the join clauses will be
584  * sent to the remote and thus we wouldn't really need to retrieve the
585  * columns used in them. Doesn't seem worth detecting that case though.)
586  */
587  fpinfo->attrs_used = NULL;
588  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
589  &fpinfo->attrs_used);
590  foreach(lc, fpinfo->local_conds)
591  {
592  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
593 
594  pull_varattnos((Node *) rinfo->clause, baserel->relid,
595  &fpinfo->attrs_used);
596  }
597 
598  /*
599  * Compute the selectivity and cost of the local_conds, so we don't have
600  * to do it over again for each path. The best we can do for these
601  * conditions is to estimate selectivity on the basis of local statistics.
602  */
604  fpinfo->local_conds,
605  baserel->relid,
606  JOIN_INNER,
607  NULL);
608 
609  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
610 
611  /*
612  * Set cached relation costs to some negative value, so that we can detect
613  * when they are set to some sensible costs during one (usually the first)
614  * of the calls to estimate_path_cost_size().
615  */
616  fpinfo->rel_startup_cost = -1;
617  fpinfo->rel_total_cost = -1;
618 
619  /*
620  * If the table or the server is configured to use remote estimates,
621  * connect to the foreign server and execute EXPLAIN to estimate the
622  * number of rows selected by the restriction clauses, as well as the
623  * average row width. Otherwise, estimate using whatever statistics we
624  * have locally, in a way similar to ordinary tables.
625  */
626  if (fpinfo->use_remote_estimate)
627  {
628  /*
629  * Get cost/size estimates with help of remote server. Save the
630  * values in fpinfo so we don't need to do it again to generate the
631  * basic foreign path.
632  */
633  estimate_path_cost_size(root, baserel, NIL, NIL,
634  &fpinfo->rows, &fpinfo->width,
635  &fpinfo->startup_cost, &fpinfo->total_cost);
636 
637  /* Report estimated baserel size to planner. */
638  baserel->rows = fpinfo->rows;
639  baserel->reltarget->width = fpinfo->width;
640  }
641  else
642  {
643  /*
644  * If the foreign table has never been ANALYZEd, it will have relpages
645  * and reltuples equal to zero, which most likely has nothing to do
646  * with reality. We can't do a whole lot about that if we're not
647  * allowed to consult the remote server, but we can use a hack similar
648  * to plancat.c's treatment of empty relations: use a minimum size
649  * estimate of 10 pages, and divide by the column-datatype-based width
650  * estimate to get the corresponding number of tuples.
651  */
652  if (baserel->pages == 0 && baserel->tuples == 0)
653  {
654  baserel->pages = 10;
655  baserel->tuples =
656  (10 * BLCKSZ) / (baserel->reltarget->width +
658  }
659 
660  /* Estimate baserel size as best we can with local statistics. */
661  set_baserel_size_estimates(root, baserel);
662 
663  /* Fill in basically-bogus cost estimates for use later. */
664  estimate_path_cost_size(root, baserel, NIL, NIL,
665  &fpinfo->rows, &fpinfo->width,
666  &fpinfo->startup_cost, &fpinfo->total_cost);
667  }
668 
669  /*
670  * Set the name of relation in fpinfo, while we are constructing it here.
671  * It will be used to build the string describing the join relation in
672  * EXPLAIN output. We can't know whether VERBOSE option is specified or
673  * not, so always schema-qualify the foreign table name.
674  */
675  fpinfo->relation_name = makeStringInfo();
676  namespace = get_namespace_name(get_rel_namespace(foreigntableid));
677  relname = get_rel_name(foreigntableid);
678  refname = rte->eref->aliasname;
679  appendStringInfo(fpinfo->relation_name, "%s.%s",
680  quote_identifier(namespace),
681  quote_identifier(relname));
682  if (*refname && strcmp(refname, relname) != 0)
683  appendStringInfo(fpinfo->relation_name, " %s",
685 
686  /* No outer and inner relations. */
687  fpinfo->make_outerrel_subquery = false;
688  fpinfo->make_innerrel_subquery = false;
689  fpinfo->lower_subquery_rels = NULL;
690  /* Set the relation index. */
691  fpinfo->relation_index = baserel->relid;
692 }
693 
694 /*
695  * get_useful_ecs_for_relation
696  * Determine which EquivalenceClasses might be involved in useful
697  * orderings of this relation.
698  *
699  * This function is in some respects a mirror image of the core function
700  * pathkeys_useful_for_merging: for a regular table, we know what indexes
701  * we have and want to test whether any of them are useful. For a foreign
702  * table, we don't know what indexes are present on the remote side but
703  * want to speculate about which ones we'd like to use if they existed.
704  *
705  * This function returns a list of potentially-useful equivalence classes,
706  * but it does not guarantee that an EquivalenceMember exists which contains
707  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
708  * ft1.x + t1.x = 0, this function will say that the equivalence class
709  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
710  * t1 is local (or on a different server), it will turn out that no useful
711  * ORDER BY clause can be generated. It's not our job to figure that out
712  * here; we're only interested in identifying relevant ECs.
713  */
714 static List *
716 {
717  List *useful_eclass_list = NIL;
718  ListCell *lc;
719  Relids relids;
720 
721  /*
722  * First, consider whether any active EC is potentially useful for a merge
723  * join against this relation.
724  */
725  if (rel->has_eclass_joins)
726  {
727  foreach(lc, root->eq_classes)
728  {
729  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
730 
731  if (eclass_useful_for_merging(root, cur_ec, rel))
732  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
733  }
734  }
735 
736  /*
737  * Next, consider whether there are any non-EC derivable join clauses that
738  * are merge-joinable. If the joininfo list is empty, we can exit
739  * quickly.
740  */
741  if (rel->joininfo == NIL)
742  return useful_eclass_list;
743 
744  /* If this is a child rel, we must use the topmost parent rel to search. */
745  if (IS_OTHER_REL(rel))
746  {
748  relids = rel->top_parent_relids;
749  }
750  else
751  relids = rel->relids;
752 
753  /* Check each join clause in turn. */
754  foreach(lc, rel->joininfo)
755  {
756  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
757 
758  /* Consider only mergejoinable clauses */
759  if (restrictinfo->mergeopfamilies == NIL)
760  continue;
761 
762  /* Make sure we've got canonical ECs. */
763  update_mergeclause_eclasses(root, restrictinfo);
764 
765  /*
766  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
767  * that left_ec and right_ec will be initialized, per comments in
768  * distribute_qual_to_rels.
769  *
770  * We want to identify which side of this merge-joinable clause
771  * contains columns from the relation produced by this RelOptInfo. We
772  * test for overlap, not containment, because there could be extra
773  * relations on either side. For example, suppose we've got something
774  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
775  * A.y = D.y. The input rel might be the joinrel between A and B, and
776  * we'll consider the join clause A.y = D.y. relids contains a
777  * relation not involved in the join class (B) and the equivalence
778  * class for the left-hand side of the clause contains a relation not
779  * involved in the input rel (C). Despite the fact that we have only
780  * overlap and not containment in either direction, A.y is potentially
781  * useful as a sort column.
782  *
783  * Note that it's even possible that relids overlaps neither side of
784  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
785  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
786  * but overlaps neither side of B. In that case, we just skip this
787  * join clause, since it doesn't suggest a useful sort order for this
788  * relation.
789  */
790  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
791  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
792  restrictinfo->right_ec);
793  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
794  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
795  restrictinfo->left_ec);
796  }
797 
798  return useful_eclass_list;
799 }
800 
801 /*
802  * get_useful_pathkeys_for_relation
803  * Determine which orderings of a relation might be useful.
804  *
805  * Getting data in sorted order can be useful either because the requested
806  * order matches the final output ordering for the overall query we're
807  * planning, or because it enables an efficient merge join. Here, we try
808  * to figure out which pathkeys to consider.
809  */
810 static List *
812 {
813  List *useful_pathkeys_list = NIL;
814  List *useful_eclass_list;
816  EquivalenceClass *query_ec = NULL;
817  ListCell *lc;
818 
819  /*
820  * Pushing the query_pathkeys to the remote server is always worth
821  * considering, because it might let us avoid a local sort.
822  */
823  if (root->query_pathkeys)
824  {
825  bool query_pathkeys_ok = true;
826 
827  foreach(lc, root->query_pathkeys)
828  {
829  PathKey *pathkey = (PathKey *) lfirst(lc);
830  EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
831  Expr *em_expr;
832 
833  /*
834  * The planner and executor don't have any clever strategy for
835  * taking data sorted by a prefix of the query's pathkeys and
836  * getting it to be sorted by all of those pathkeys. We'll just
837  * end up resorting the entire data set. So, unless we can push
838  * down all of the query pathkeys, forget it.
839  *
840  * is_foreign_expr would detect volatile expressions as well, but
841  * checking ec_has_volatile here saves some cycles.
842  */
843  if (pathkey_ec->ec_has_volatile ||
844  !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
845  !is_foreign_expr(root, rel, em_expr))
846  {
847  query_pathkeys_ok = false;
848  break;
849  }
850  }
851 
852  if (query_pathkeys_ok)
853  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
854  }
855 
856  /*
857  * Even if we're not using remote estimates, having the remote side do the
858  * sort generally won't be any worse than doing it locally, and it might
859  * be much better if the remote side can generate data in the right order
860  * without needing a sort at all. However, what we're going to do next is
861  * try to generate pathkeys that seem promising for possible merge joins,
862  * and that's more speculative. A wrong choice might hurt quite a bit, so
863  * bail out if we can't use remote estimates.
864  */
865  if (!fpinfo->use_remote_estimate)
866  return useful_pathkeys_list;
867 
868  /* Get the list of interesting EquivalenceClasses. */
869  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
870 
871  /* Extract unique EC for query, if any, so we don't consider it again. */
872  if (list_length(root->query_pathkeys) == 1)
873  {
874  PathKey *query_pathkey = linitial(root->query_pathkeys);
875 
876  query_ec = query_pathkey->pk_eclass;
877  }
878 
879  /*
880  * As a heuristic, the only pathkeys we consider here are those of length
881  * one. It's surely possible to consider more, but since each one we
882  * choose to consider will generate a round-trip to the remote side, we
883  * need to be a bit cautious here. It would sure be nice to have a local
884  * cache of information about remote index definitions...
885  */
886  foreach(lc, useful_eclass_list)
887  {
888  EquivalenceClass *cur_ec = lfirst(lc);
889  Expr *em_expr;
890  PathKey *pathkey;
891 
892  /* If redundant with what we did above, skip it. */
893  if (cur_ec == query_ec)
894  continue;
895 
896  /* If no pushable expression for this rel, skip it. */
897  em_expr = find_em_expr_for_rel(cur_ec, rel);
898  if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
899  continue;
900 
901  /* Looks like we can generate a pathkey, so let's do it. */
902  pathkey = make_canonical_pathkey(root, cur_ec,
903  linitial_oid(cur_ec->ec_opfamilies),
905  false);
906  useful_pathkeys_list = lappend(useful_pathkeys_list,
907  list_make1(pathkey));
908  }
909 
910  return useful_pathkeys_list;
911 }
912 
913 /*
914  * postgresGetForeignPaths
915  * Create possible scan paths for a scan on the foreign table
916  */
917 static void
919  RelOptInfo *baserel,
920  Oid foreigntableid)
921 {
922  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
923  ForeignPath *path;
924  List *ppi_list;
925  ListCell *lc;
926 
927  /*
928  * Create simplest ForeignScan path node and add it to baserel. This path
929  * corresponds to SeqScan path of regular tables (though depending on what
930  * baserestrict conditions we were able to send to remote, there might
931  * actually be an indexscan happening there). We already did all the work
932  * to estimate cost and size of this path.
933  */
934  path = create_foreignscan_path(root, baserel,
935  NULL, /* default pathtarget */
936  fpinfo->rows,
937  fpinfo->startup_cost,
938  fpinfo->total_cost,
939  NIL, /* no pathkeys */
940  NULL, /* no outer rel either */
941  NULL, /* no extra plan */
942  NIL); /* no fdw_private list */
943  add_path(baserel, (Path *) path);
944 
945  /* Add paths with pathkeys */
946  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
947 
948  /*
949  * If we're not using remote estimates, stop here. We have no way to
950  * estimate whether any join clauses would be worth sending across, so
951  * don't bother building parameterized paths.
952  */
953  if (!fpinfo->use_remote_estimate)
954  return;
955 
956  /*
957  * Thumb through all join clauses for the rel to identify which outer
958  * relations could supply one or more safe-to-send-to-remote join clauses.
959  * We'll build a parameterized path for each such outer relation.
960  *
961  * It's convenient to manage this by representing each candidate outer
962  * relation by the ParamPathInfo node for it. We can then use the
963  * ppi_clauses list in the ParamPathInfo node directly as a list of the
964  * interesting join clauses for that rel. This takes care of the
965  * possibility that there are multiple safe join clauses for such a rel,
966  * and also ensures that we account for unsafe join clauses that we'll
967  * still have to enforce locally (since the parameterized-path machinery
968  * insists that we handle all movable clauses).
969  */
970  ppi_list = NIL;
971  foreach(lc, baserel->joininfo)
972  {
973  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
974  Relids required_outer;
975  ParamPathInfo *param_info;
976 
977  /* Check if clause can be moved to this rel */
978  if (!join_clause_is_movable_to(rinfo, baserel))
979  continue;
980 
981  /* See if it is safe to send to remote */
982  if (!is_foreign_expr(root, baserel, rinfo->clause))
983  continue;
984 
985  /* Calculate required outer rels for the resulting path */
986  required_outer = bms_union(rinfo->clause_relids,
987  baserel->lateral_relids);
988  /* We do not want the foreign rel itself listed in required_outer */
989  required_outer = bms_del_member(required_outer, baserel->relid);
990 
991  /*
992  * required_outer probably can't be empty here, but if it were, we
993  * couldn't make a parameterized path.
994  */
995  if (bms_is_empty(required_outer))
996  continue;
997 
998  /* Get the ParamPathInfo */
999  param_info = get_baserel_parampathinfo(root, baserel,
1000  required_outer);
1001  Assert(param_info != NULL);
1002 
1003  /*
1004  * Add it to list unless we already have it. Testing pointer equality
1005  * is OK since get_baserel_parampathinfo won't make duplicates.
1006  */
1007  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1008  }
1009 
1010  /*
1011  * The above scan examined only "generic" join clauses, not those that
1012  * were absorbed into EquivalenceClauses. See if we can make anything out
1013  * of EquivalenceClauses.
1014  */
1015  if (baserel->has_eclass_joins)
1016  {
1017  /*
1018  * We repeatedly scan the eclass list looking for column references
1019  * (or expressions) belonging to the foreign rel. Each time we find
1020  * one, we generate a list of equivalence joinclauses for it, and then
1021  * see if any are safe to send to the remote. Repeat till there are
1022  * no more candidate EC members.
1023  */
1025 
1026  arg.already_used = NIL;
1027  for (;;)
1028  {
1029  List *clauses;
1030 
1031  /* Make clauses, skipping any that join to lateral_referencers */
1032  arg.current = NULL;
1034  baserel,
1036  (void *) &arg,
1037  baserel->lateral_referencers);
1038 
1039  /* Done if there are no more expressions in the foreign rel */
1040  if (arg.current == NULL)
1041  {
1042  Assert(clauses == NIL);
1043  break;
1044  }
1045 
1046  /* Scan the extracted join clauses */
1047  foreach(lc, clauses)
1048  {
1049  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1050  Relids required_outer;
1051  ParamPathInfo *param_info;
1052 
1053  /* Check if clause can be moved to this rel */
1054  if (!join_clause_is_movable_to(rinfo, baserel))
1055  continue;
1056 
1057  /* See if it is safe to send to remote */
1058  if (!is_foreign_expr(root, baserel, rinfo->clause))
1059  continue;
1060 
1061  /* Calculate required outer rels for the resulting path */
1062  required_outer = bms_union(rinfo->clause_relids,
1063  baserel->lateral_relids);
1064  required_outer = bms_del_member(required_outer, baserel->relid);
1065  if (bms_is_empty(required_outer))
1066  continue;
1067 
1068  /* Get the ParamPathInfo */
1069  param_info = get_baserel_parampathinfo(root, baserel,
1070  required_outer);
1071  Assert(param_info != NULL);
1072 
1073  /* Add it to list unless we already have it */
1074  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1075  }
1076 
1077  /* Try again, now ignoring the expression we found this time */
1078  arg.already_used = lappend(arg.already_used, arg.current);
1079  }
1080  }
1081 
1082  /*
1083  * Now build a path for each useful outer relation.
1084  */
1085  foreach(lc, ppi_list)
1086  {
1087  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1088  double rows;
1089  int width;
1090  Cost startup_cost;
1091  Cost total_cost;
1092 
1093  /* Get a cost estimate from the remote */
1094  estimate_path_cost_size(root, baserel,
1095  param_info->ppi_clauses, NIL,
1096  &rows, &width,
1097  &startup_cost, &total_cost);
1098 
1099  /*
1100  * ppi_rows currently won't get looked at by anything, but still we
1101  * may as well ensure that it matches our idea of the rowcount.
1102  */
1103  param_info->ppi_rows = rows;
1104 
1105  /* Make the path */
1106  path = create_foreignscan_path(root, baserel,
1107  NULL, /* default pathtarget */
1108  rows,
1109  startup_cost,
1110  total_cost,
1111  NIL, /* no pathkeys */
1112  param_info->ppi_req_outer,
1113  NULL,
1114  NIL); /* no fdw_private list */
1115  add_path(baserel, (Path *) path);
1116  }
1117 }
1118 
1119 /*
1120  * postgresGetForeignPlan
1121  * Create ForeignScan plan node which implements selected best path
1122  */
1123 static ForeignScan *
1125  RelOptInfo *foreignrel,
1126  Oid foreigntableid,
1127  ForeignPath *best_path,
1128  List *tlist,
1129  List *scan_clauses,
1130  Plan *outer_plan)
1131 {
1132  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1133  Index scan_relid;
1134  List *fdw_private;
1135  List *remote_exprs = NIL;
1136  List *local_exprs = NIL;
1137  List *params_list = NIL;
1138  List *fdw_scan_tlist = NIL;
1139  List *fdw_recheck_quals = NIL;
1141  StringInfoData sql;
1142  ListCell *lc;
1143 
1144  if (IS_SIMPLE_REL(foreignrel))
1145  {
1146  /*
1147  * For base relations, set scan_relid as the relid of the relation.
1148  */
1149  scan_relid = foreignrel->relid;
1150 
1151  /*
1152  * In a base-relation scan, we must apply the given scan_clauses.
1153  *
1154  * Separate the scan_clauses into those that can be executed remotely
1155  * and those that can't. baserestrictinfo clauses that were
1156  * previously determined to be safe or unsafe by classifyConditions
1157  * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1158  * else in the scan_clauses list will be a join clause, which we have
1159  * to check for remote-safety.
1160  *
1161  * Note: the join clauses we see here should be the exact same ones
1162  * previously examined by postgresGetForeignPaths. Possibly it'd be
1163  * worth passing forward the classification work done then, rather
1164  * than repeating it here.
1165  *
1166  * This code must match "extract_actual_clauses(scan_clauses, false)"
1167  * except for the additional decision about remote versus local
1168  * execution.
1169  */
1170  foreach(lc, scan_clauses)
1171  {
1172  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1173 
1174  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1175  if (rinfo->pseudoconstant)
1176  continue;
1177 
1178  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1179  remote_exprs = lappend(remote_exprs, rinfo->clause);
1180  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1181  local_exprs = lappend(local_exprs, rinfo->clause);
1182  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1183  remote_exprs = lappend(remote_exprs, rinfo->clause);
1184  else
1185  local_exprs = lappend(local_exprs, rinfo->clause);
1186  }
1187 
1188  /*
1189  * For a base-relation scan, we have to support EPQ recheck, which
1190  * should recheck all the remote quals.
1191  */
1192  fdw_recheck_quals = remote_exprs;
1193  }
1194  else
1195  {
1196  /*
1197  * Join relation or upper relation - set scan_relid to 0.
1198  */
1199  scan_relid = 0;
1200 
1201  /*
1202  * For a join rel, baserestrictinfo is NIL and we are not considering
1203  * parameterization right now, so there should be no scan_clauses for
1204  * a joinrel or an upper rel either.
1205  */
1206  Assert(!scan_clauses);
1207 
1208  /*
1209  * Instead we get the conditions to apply from the fdw_private
1210  * structure.
1211  */
1212  remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1213  local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1214 
1215  /*
1216  * We leave fdw_recheck_quals empty in this case, since we never need
1217  * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1218  * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1219  * If we're planning an upperrel (ie, remote grouping or aggregation)
1220  * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1221  * allowed, and indeed we *can't* put the remote clauses into
1222  * fdw_recheck_quals because the unaggregated Vars won't be available
1223  * locally.
1224  */
1225 
1226  /* Build the list of columns to be fetched from the foreign server. */
1227  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1228 
1229  /*
1230  * Ensure that the outer plan produces a tuple whose descriptor
1231  * matches our scan tuple slot. This is safe because all scans and
1232  * joins support projection, so we never need to insert a Result node.
1233  * Also, remove the local conditions from outer plan's quals, lest
1234  * they will be evaluated twice, once by the local plan and once by
1235  * the scan.
1236  */
1237  if (outer_plan)
1238  {
1239  ListCell *lc;
1240 
1241  /*
1242  * Right now, we only consider grouping and aggregation beyond
1243  * joins. Queries involving aggregates or grouping do not require
1244  * EPQ mechanism, hence should not have an outer plan here.
1245  */
1246  Assert(!IS_UPPER_REL(foreignrel));
1247 
1248  outer_plan->targetlist = fdw_scan_tlist;
1249 
1250  foreach(lc, local_exprs)
1251  {
1252  Join *join_plan = (Join *) outer_plan;
1253  Node *qual = lfirst(lc);
1254 
1255  outer_plan->qual = list_delete(outer_plan->qual, qual);
1256 
1257  /*
1258  * For an inner join the local conditions of foreign scan plan
1259  * can be part of the joinquals as well.
1260  */
1261  if (join_plan->jointype == JOIN_INNER)
1262  join_plan->joinqual = list_delete(join_plan->joinqual,
1263  qual);
1264  }
1265  }
1266  }
1267 
1268  /*
1269  * Build the query string to be sent for execution, and identify
1270  * expressions to be sent as parameters.
1271  */
1272  initStringInfo(&sql);
1273  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1274  remote_exprs, best_path->path.pathkeys,
1275  false, &retrieved_attrs, &params_list);
1276 
1277  /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1278  fpinfo->final_remote_exprs = remote_exprs;
1279 
1280  /*
1281  * Build the fdw_private list that will be available to the executor.
1282  * Items in the list must match order in enum FdwScanPrivateIndex.
1283  */
1284  fdw_private = list_make3(makeString(sql.data),
1286  makeInteger(fpinfo->fetch_size));
1287  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1288  fdw_private = lappend(fdw_private,
1289  makeString(fpinfo->relation_name->data));
1290 
1291  /*
1292  * Create the ForeignScan node for the given relation.
1293  *
1294  * Note that the remote parameter expressions are stored in the fdw_exprs
1295  * field of the finished plan node; we can't keep them in private state
1296  * because then they wouldn't be subject to later planner processing.
1297  */
1298  return make_foreignscan(tlist,
1299  local_exprs,
1300  scan_relid,
1301  params_list,
1302  fdw_private,
1303  fdw_scan_tlist,
1304  fdw_recheck_quals,
1305  outer_plan);
1306 }
1307 
1308 /*
1309  * postgresBeginForeignScan
1310  * Initiate an executor scan of a foreign PostgreSQL table.
1311  */
1312 static void
1314 {
1315  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1316  EState *estate = node->ss.ps.state;
1317  PgFdwScanState *fsstate;
1318  RangeTblEntry *rte;
1319  Oid userid;
1320  ForeignTable *table;
1321  UserMapping *user;
1322  int rtindex;
1323  int numParams;
1324 
1325  /*
1326  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1327  */
1328  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1329  return;
1330 
1331  /*
1332  * We'll save private state in node->fdw_state.
1333  */
1334  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1335  node->fdw_state = (void *) fsstate;
1336 
1337  /*
1338  * Identify which user to do the remote access as. This should match what
1339  * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1340  * lowest-numbered member RTE as a representative; we would get the same
1341  * result from any.
1342  */
1343  if (fsplan->scan.scanrelid > 0)
1344  rtindex = fsplan->scan.scanrelid;
1345  else
1346  rtindex = bms_next_member(fsplan->fs_relids, -1);
1347  rte = rt_fetch(rtindex, estate->es_range_table);
1348  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1349 
1350  /* Get info about foreign table. */
1351  table = GetForeignTable(rte->relid);
1352  user = GetUserMapping(userid, table->serverid);
1353 
1354  /*
1355  * Get connection to the foreign server. Connection manager will
1356  * establish new connection if necessary.
1357  */
1358  fsstate->conn = GetConnection(user, false);
1359 
1360  /* Assign a unique ID for my cursor */
1361  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1362  fsstate->cursor_exists = false;
1363 
1364  /* Get private info created by planner functions. */
1365  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1367  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1369  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1371 
1372  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1373  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1374  "postgres_fdw tuple data",
1376  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1377  "postgres_fdw temporary data",
1379 
1380  /*
1381  * Get info we'll need for converting data fetched from the foreign server
1382  * into local representation and error reporting during that process.
1383  */
1384  if (fsplan->scan.scanrelid > 0)
1385  {
1386  fsstate->rel = node->ss.ss_currentRelation;
1387  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1388  }
1389  else
1390  {
1391  fsstate->rel = NULL;
1392  fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1393  }
1394 
1395  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1396 
1397  /*
1398  * Prepare for processing of parameters used in remote query, if any.
1399  */
1400  numParams = list_length(fsplan->fdw_exprs);
1401  fsstate->numParams = numParams;
1402  if (numParams > 0)
1404  fsplan->fdw_exprs,
1405  numParams,
1406  &fsstate->param_flinfo,
1407  &fsstate->param_exprs,
1408  &fsstate->param_values);
1409 }
1410 
1411 /*
1412  * postgresIterateForeignScan
1413  * Retrieve next row from the result set, or clear tuple slot to indicate
1414  * EOF.
1415  */
1416 static TupleTableSlot *
1418 {
1419  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1420  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1421 
1422  /*
1423  * If this is the first call after Begin or ReScan, we need to create the
1424  * cursor on the remote side.
1425  */
1426  if (!fsstate->cursor_exists)
1427  create_cursor(node);
1428 
1429  /*
1430  * Get some more tuples, if we've run out.
1431  */
1432  if (fsstate->next_tuple >= fsstate->num_tuples)
1433  {
1434  /* No point in another fetch if we already detected EOF, though. */
1435  if (!fsstate->eof_reached)
1436  fetch_more_data(node);
1437  /* If we didn't get any tuples, must be end of data. */
1438  if (fsstate->next_tuple >= fsstate->num_tuples)
1439  return ExecClearTuple(slot);
1440  }
1441 
1442  /*
1443  * Return the next tuple.
1444  */
1445  ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1446  slot,
1447  InvalidBuffer,
1448  false);
1449 
1450  return slot;
1451 }
1452 
1453 /*
1454  * postgresReScanForeignScan
1455  * Restart the scan.
1456  */
1457 static void
1459 {
1460  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1461  char sql[64];
1462  PGresult *res;
1463 
1464  /* If we haven't created the cursor yet, nothing to do. */
1465  if (!fsstate->cursor_exists)
1466  return;
1467 
1468  /*
1469  * If any internal parameters affecting this node have changed, we'd
1470  * better destroy and recreate the cursor. Otherwise, rewinding it should
1471  * be good enough. If we've only fetched zero or one batch, we needn't
1472  * even rewind the cursor, just rescan what we have.
1473  */
1474  if (node->ss.ps.chgParam != NULL)
1475  {
1476  fsstate->cursor_exists = false;
1477  snprintf(sql, sizeof(sql), "CLOSE c%u",
1478  fsstate->cursor_number);
1479  }
1480  else if (fsstate->fetch_ct_2 > 1)
1481  {
1482  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1483  fsstate->cursor_number);
1484  }
1485  else
1486  {
1487  /* Easy: just rescan what we already have in memory, if anything */
1488  fsstate->next_tuple = 0;
1489  return;
1490  }
1491 
1492  /*
1493  * We don't use a PG_TRY block here, so be careful not to throw error
1494  * without releasing the PGresult.
1495  */
1496  res = pgfdw_exec_query(fsstate->conn, sql);
1497  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1498  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1499  PQclear(res);
1500 
1501  /* Now force a fresh FETCH. */
1502  fsstate->tuples = NULL;
1503  fsstate->num_tuples = 0;
1504  fsstate->next_tuple = 0;
1505  fsstate->fetch_ct_2 = 0;
1506  fsstate->eof_reached = false;
1507 }
1508 
1509 /*
1510  * postgresEndForeignScan
1511  * Finish scanning foreign table and dispose objects used for this scan
1512  */
1513 static void
1515 {
1516  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1517 
1518  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1519  if (fsstate == NULL)
1520  return;
1521 
1522  /* Close the cursor if open, to prevent accumulation of cursors */
1523  if (fsstate->cursor_exists)
1524  close_cursor(fsstate->conn, fsstate->cursor_number);
1525 
1526  /* Release remote connection */
1527  ReleaseConnection(fsstate->conn);
1528  fsstate->conn = NULL;
1529 
1530  /* MemoryContexts will be deleted automatically. */
1531 }
1532 
1533 /*
1534  * postgresAddForeignUpdateTargets
1535  * Add resjunk column(s) needed for update/delete on a foreign table
1536  */
1537 static void
1539  RangeTblEntry *target_rte,
1540  Relation target_relation)
1541 {
1542  Var *var;
1543  const char *attrname;
1544  TargetEntry *tle;
1545 
1546  /*
1547  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1548  */
1549 
1550  /* Make a Var representing the desired value */
1551  var = makeVar(parsetree->resultRelation,
1553  TIDOID,
1554  -1,
1555  InvalidOid,
1556  0);
1557 
1558  /* Wrap it in a resjunk TLE with the right name ... */
1559  attrname = "ctid";
1560 
1561  tle = makeTargetEntry((Expr *) var,
1562  list_length(parsetree->targetList) + 1,
1563  pstrdup(attrname),
1564  true);
1565 
1566  /* ... and add it to the query's targetlist */
1567  parsetree->targetList = lappend(parsetree->targetList, tle);
1568 }
1569 
1570 /*
1571  * postgresPlanForeignModify
1572  * Plan an insert/update/delete operation on a foreign table
1573  */
1574 static List *
1576  ModifyTable *plan,
1577  Index resultRelation,
1578  int subplan_index)
1579 {
1580  CmdType operation = plan->operation;
1581  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1582  Relation rel;
1583  StringInfoData sql;
1584  List *targetAttrs = NIL;
1585  List *returningList = NIL;
1587  bool doNothing = false;
1588 
1589  initStringInfo(&sql);
1590 
1591  /*
1592  * Core code already has some lock on each rel being planned, so we can
1593  * use NoLock here.
1594  */
1595  rel = heap_open(rte->relid, NoLock);
1596 
1597  /*
1598  * In an INSERT, we transmit all columns that are defined in the foreign
1599  * table. In an UPDATE, we transmit only columns that were explicitly
1600  * targets of the UPDATE, so as to avoid unnecessary data transmission.
1601  * (We can't do that for INSERT since we would miss sending default values
1602  * for columns not listed in the source statement.)
1603  */
1604  if (operation == CMD_INSERT)
1605  {
1607  int attnum;
1608 
1609  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1610  {
1611  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1612 
1613  if (!attr->attisdropped)
1614  targetAttrs = lappend_int(targetAttrs, attnum);
1615  }
1616  }
1617  else if (operation == CMD_UPDATE)
1618  {
1619  int col;
1620 
1621  col = -1;
1622  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1623  {
1624  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1626 
1627  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1628  elog(ERROR, "system-column update is not supported");
1629  targetAttrs = lappend_int(targetAttrs, attno);
1630  }
1631  }
1632 
1633  /*
1634  * Extract the relevant RETURNING list if any.
1635  */
1636  if (plan->returningLists)
1637  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1638 
1639  /*
1640  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1641  * should have already been rejected in the optimizer, as presently there
1642  * is no way to recognize an arbiter index on a foreign table. Only DO
1643  * NOTHING is supported without an inference specification.
1644  */
1645  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1646  doNothing = true;
1647  else if (plan->onConflictAction != ONCONFLICT_NONE)
1648  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1649  (int) plan->onConflictAction);
1650 
1651  /*
1652  * Construct the SQL command string.
1653  */
1654  switch (operation)
1655  {
1656  case CMD_INSERT:
1657  deparseInsertSql(&sql, rte, resultRelation, rel,
1658  targetAttrs, doNothing, returningList,
1659  &retrieved_attrs);
1660  break;
1661  case CMD_UPDATE:
1662  deparseUpdateSql(&sql, rte, resultRelation, rel,
1663  targetAttrs, returningList,
1664  &retrieved_attrs);
1665  break;
1666  case CMD_DELETE:
1667  deparseDeleteSql(&sql, rte, resultRelation, rel,
1668  returningList,
1669  &retrieved_attrs);
1670  break;
1671  default:
1672  elog(ERROR, "unexpected operation: %d", (int) operation);
1673  break;
1674  }
1675 
1676  heap_close(rel, NoLock);
1677 
1678  /*
1679  * Build the fdw_private list that will be available to the executor.
1680  * Items in the list must match enum FdwModifyPrivateIndex, above.
1681  */
1682  return list_make4(makeString(sql.data),
1683  targetAttrs,
1684  makeInteger((retrieved_attrs != NIL)),
1685  retrieved_attrs);
1686 }
1687 
1688 /*
1689  * postgresBeginForeignModify
1690  * Begin an insert/update/delete operation on a foreign table
1691  */
1692 static void
1694  ResultRelInfo *resultRelInfo,
1695  List *fdw_private,
1696  int subplan_index,
1697  int eflags)
1698 {
1699  PgFdwModifyState *fmstate;
1700  char *query;
1701  List *target_attrs;
1702  bool has_returning;
1704  RangeTblEntry *rte;
1705 
1706  /*
1707  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1708  * stays NULL.
1709  */
1710  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1711  return;
1712 
1713  /* Deconstruct fdw_private data. */
1714  query = strVal(list_nth(fdw_private,
1716  target_attrs = (List *) list_nth(fdw_private,
1718  has_returning = intVal(list_nth(fdw_private,
1720  retrieved_attrs = (List *) list_nth(fdw_private,
1722 
1723  /* Find RTE. */
1724  rte = rt_fetch(resultRelInfo->ri_RangeTableIndex,
1725  mtstate->ps.state->es_range_table);
1726 
1727  /* Construct an execution state. */
1728  fmstate = create_foreign_modify(mtstate->ps.state,
1729  rte,
1730  resultRelInfo,
1731  mtstate->operation,
1732  mtstate->mt_plans[subplan_index]->plan,
1733  query,
1734  target_attrs,
1735  has_returning,
1736  retrieved_attrs);
1737 
1738  resultRelInfo->ri_FdwState = fmstate;
1739 }
1740 
1741 /*
1742  * postgresExecForeignInsert
1743  * Insert one row into a foreign table
1744  */
1745 static TupleTableSlot *
1747  ResultRelInfo *resultRelInfo,
1748  TupleTableSlot *slot,
1749  TupleTableSlot *planSlot)
1750 {
1751  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1752  const char **p_values;
1753  PGresult *res;
1754  int n_rows;
1755 
1756  /* Set up the prepared statement on the remote server, if we didn't yet */
1757  if (!fmstate->p_name)
1758  prepare_foreign_modify(fmstate);
1759 
1760  /* Convert parameters needed by prepared statement to text form */
1761  p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1762 
1763  /*
1764  * Execute the prepared statement.
1765  */
1766  if (!PQsendQueryPrepared(fmstate->conn,
1767  fmstate->p_name,
1768  fmstate->p_nums,
1769  p_values,
1770  NULL,
1771  NULL,
1772  0))
1773  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1774 
1775  /*
1776  * Get the result, and check for success.
1777  *
1778  * We don't use a PG_TRY block here, so be careful not to throw error
1779  * without releasing the PGresult.
1780  */
1781  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1782  if (PQresultStatus(res) !=
1784  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1785 
1786  /* Check number of rows affected, and fetch RETURNING tuple if any */
1787  if (fmstate->has_returning)
1788  {
1789  n_rows = PQntuples(res);
1790  if (n_rows > 0)
1791  store_returning_result(fmstate, slot, res);
1792  }
1793  else
1794  n_rows = atoi(PQcmdTuples(res));
1795 
1796  /* And clean up */
1797  PQclear(res);
1798 
1799  MemoryContextReset(fmstate->temp_cxt);
1800 
1801  /* Return NULL if nothing was inserted on the remote end */
1802  return (n_rows > 0) ? slot : NULL;
1803 }
1804 
1805 /*
1806  * postgresExecForeignUpdate
1807  * Update one row in a foreign table
1808  */
1809 static TupleTableSlot *
1811  ResultRelInfo *resultRelInfo,
1812  TupleTableSlot *slot,
1813  TupleTableSlot *planSlot)
1814 {
1815  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1816  Datum datum;
1817  bool isNull;
1818  const char **p_values;
1819  PGresult *res;
1820  int n_rows;
1821 
1822  /* Set up the prepared statement on the remote server, if we didn't yet */
1823  if (!fmstate->p_name)
1824  prepare_foreign_modify(fmstate);
1825 
1826  /* Get the ctid that was passed up as a resjunk column */
1827  datum = ExecGetJunkAttribute(planSlot,
1828  fmstate->ctidAttno,
1829  &isNull);
1830  /* shouldn't ever get a null result... */
1831  if (isNull)
1832  elog(ERROR, "ctid is NULL");
1833 
1834  /* Convert parameters needed by prepared statement to text form */
1835  p_values = convert_prep_stmt_params(fmstate,
1836  (ItemPointer) DatumGetPointer(datum),
1837  slot);
1838 
1839  /*
1840  * Execute the prepared statement.
1841  */
1842  if (!PQsendQueryPrepared(fmstate->conn,
1843  fmstate->p_name,
1844  fmstate->p_nums,
1845  p_values,
1846  NULL,
1847  NULL,
1848  0))
1849  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1850 
1851  /*
1852  * Get the result, and check for success.
1853  *
1854  * We don't use a PG_TRY block here, so be careful not to throw error
1855  * without releasing the PGresult.
1856  */
1857  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1858  if (PQresultStatus(res) !=
1860  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1861 
1862  /* Check number of rows affected, and fetch RETURNING tuple if any */
1863  if (fmstate->has_returning)
1864  {
1865  n_rows = PQntuples(res);
1866  if (n_rows > 0)
1867  store_returning_result(fmstate, slot, res);
1868  }
1869  else
1870  n_rows = atoi(PQcmdTuples(res));
1871 
1872  /* And clean up */
1873  PQclear(res);
1874 
1875  MemoryContextReset(fmstate->temp_cxt);
1876 
1877  /* Return NULL if nothing was updated on the remote end */
1878  return (n_rows > 0) ? slot : NULL;
1879 }
1880 
1881 /*
1882  * postgresExecForeignDelete
1883  * Delete one row from a foreign table
1884  */
1885 static TupleTableSlot *
1887  ResultRelInfo *resultRelInfo,
1888  TupleTableSlot *slot,
1889  TupleTableSlot *planSlot)
1890 {
1891  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1892  Datum datum;
1893  bool isNull;
1894  const char **p_values;
1895  PGresult *res;
1896  int n_rows;
1897 
1898  /* Set up the prepared statement on the remote server, if we didn't yet */
1899  if (!fmstate->p_name)
1900  prepare_foreign_modify(fmstate);
1901 
1902  /* Get the ctid that was passed up as a resjunk column */
1903  datum = ExecGetJunkAttribute(planSlot,
1904  fmstate->ctidAttno,
1905  &isNull);
1906  /* shouldn't ever get a null result... */
1907  if (isNull)
1908  elog(ERROR, "ctid is NULL");
1909 
1910  /* Convert parameters needed by prepared statement to text form */
1911  p_values = convert_prep_stmt_params(fmstate,
1912  (ItemPointer) DatumGetPointer(datum),
1913  NULL);
1914 
1915  /*
1916  * Execute the prepared statement.
1917  */
1918  if (!PQsendQueryPrepared(fmstate->conn,
1919  fmstate->p_name,
1920  fmstate->p_nums,
1921  p_values,
1922  NULL,
1923  NULL,
1924  0))
1925  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1926 
1927  /*
1928  * Get the result, and check for success.
1929  *
1930  * We don't use a PG_TRY block here, so be careful not to throw error
1931  * without releasing the PGresult.
1932  */
1933  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1934  if (PQresultStatus(res) !=
1936  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1937 
1938  /* Check number of rows affected, and fetch RETURNING tuple if any */
1939  if (fmstate->has_returning)
1940  {
1941  n_rows = PQntuples(res);
1942  if (n_rows > 0)
1943  store_returning_result(fmstate, slot, res);
1944  }
1945  else
1946  n_rows = atoi(PQcmdTuples(res));
1947 
1948  /* And clean up */
1949  PQclear(res);
1950 
1951  MemoryContextReset(fmstate->temp_cxt);
1952 
1953  /* Return NULL if nothing was deleted on the remote end */
1954  return (n_rows > 0) ? slot : NULL;
1955 }
1956 
1957 /*
1958  * postgresEndForeignModify
1959  * Finish an insert/update/delete operation on a foreign table
1960  */
1961 static void
1963  ResultRelInfo *resultRelInfo)
1964 {
1965  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1966 
1967  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1968  if (fmstate == NULL)
1969  return;
1970 
1971  /* Destroy the execution state */
1972  finish_foreign_modify(fmstate);
1973 }
1974 
1975 /*
1976  * postgresBeginForeignInsert
1977  * Begin an insert operation on a foreign table
1978  */
1979 static void
1981  ResultRelInfo *resultRelInfo)
1982 {
1983  PgFdwModifyState *fmstate;
1984  ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
1985  EState *estate = mtstate->ps.state;
1986  Index resultRelation = resultRelInfo->ri_RangeTableIndex;
1987  Relation rel = resultRelInfo->ri_RelationDesc;
1988  RangeTblEntry *rte;
1990  int attnum;
1991  StringInfoData sql;
1992  List *targetAttrs = NIL;
1994  bool doNothing = false;
1995 
1996  initStringInfo(&sql);
1997 
1998  /* We transmit all columns that are defined in the foreign table. */
1999  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2000  {
2001  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
2002 
2003  if (!attr->attisdropped)
2004  targetAttrs = lappend_int(targetAttrs, attnum);
2005  }
2006 
2007  /* Check if we add the ON CONFLICT clause to the remote query. */
2008  if (plan)
2009  {
2010  OnConflictAction onConflictAction = plan->onConflictAction;
2011 
2012  /* We only support DO NOTHING without an inference specification. */
2013  if (onConflictAction == ONCONFLICT_NOTHING)
2014  doNothing = true;
2015  else if (onConflictAction != ONCONFLICT_NONE)
2016  elog(ERROR, "unexpected ON CONFLICT specification: %d",
2017  (int) onConflictAction);
2018  }
2019 
2020  /*
2021  * If the foreign table is a partition, we need to create a new RTE
2022  * describing the foreign table for use by deparseInsertSql and
2023  * create_foreign_modify() below, after first copying the parent's
2024  * RTE and modifying some fields to describe the foreign partition to
2025  * work on. However, if this is invoked by UPDATE, the existing RTE
2026  * may already correspond to this partition if it is one of the
2027  * UPDATE subplan target rels; in that case, we can just use the
2028  * existing RTE as-is.
2029  */
2030  rte = list_nth(estate->es_range_table, resultRelation - 1);
2031  if (rte->relid != RelationGetRelid(rel))
2032  {
2033  rte = copyObject(rte);
2034  rte->relid = RelationGetRelid(rel);
2035  rte->relkind = RELKIND_FOREIGN_TABLE;
2036 
2037  /*
2038  * For UPDATE, we must use the RT index of the first subplan
2039  * target rel's RTE, because the core code would have built
2040  * expressions for the partition, such as RETURNING, using that
2041  * RT index as varno of Vars contained in those expressions.
2042  */
2043  if (plan && plan->operation == CMD_UPDATE &&
2044  resultRelation == plan->nominalRelation)
2045  resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2046  }
2047 
2048  /* Construct the SQL command string. */
2049  deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2050  resultRelInfo->ri_returningList, &retrieved_attrs);
2051 
2052  /* Construct an execution state. */
2053  fmstate = create_foreign_modify(mtstate->ps.state,
2054  rte,
2055  resultRelInfo,
2056  CMD_INSERT,
2057  NULL,
2058  sql.data,
2059  targetAttrs,
2060  retrieved_attrs != NIL,
2061  retrieved_attrs);
2062 
2063  resultRelInfo->ri_FdwState = fmstate;
2064 }
2065 
2066 /*
2067  * postgresEndForeignInsert
2068  * Finish an insert operation on a foreign table
2069  */
2070 static void
2072  ResultRelInfo *resultRelInfo)
2073 {
2074  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2075 
2076  Assert(fmstate != NULL);
2077 
2078  /* Destroy the execution state */
2079  finish_foreign_modify(fmstate);
2080 }
2081 
2082 /*
2083  * postgresIsForeignRelUpdatable
2084  * Determine whether a foreign table supports INSERT, UPDATE and/or
2085  * DELETE.
2086  */
2087 static int
2089 {
2090  bool updatable;
2091  ForeignTable *table;
2092  ForeignServer *server;
2093  ListCell *lc;
2094 
2095  /*
2096  * By default, all postgres_fdw foreign tables are assumed updatable. This
2097  * can be overridden by a per-server setting, which in turn can be
2098  * overridden by a per-table setting.
2099  */
2100  updatable = true;
2101 
2102  table = GetForeignTable(RelationGetRelid(rel));
2103  server = GetForeignServer(table->serverid);
2104 
2105  foreach(lc, server->options)
2106  {
2107  DefElem *def = (DefElem *) lfirst(lc);
2108 
2109  if (strcmp(def->defname, "updatable") == 0)
2110  updatable = defGetBoolean(def);
2111  }
2112  foreach(lc, table->options)
2113  {
2114  DefElem *def = (DefElem *) lfirst(lc);
2115 
2116  if (strcmp(def->defname, "updatable") == 0)
2117  updatable = defGetBoolean(def);
2118  }
2119 
2120  /*
2121  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2122  */
2123  return updatable ?
2124  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2125 }
2126 
2127 /*
2128  * postgresRecheckForeignScan
2129  * Execute a local join execution plan for a foreign join
2130  */
2131 static bool
2133 {
2134  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2136  TupleTableSlot *result;
2137 
2138  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2139  if (scanrelid > 0)
2140  return true;
2141 
2142  Assert(outerPlan != NULL);
2143 
2144  /* Execute a local join execution plan */
2145  result = ExecProcNode(outerPlan);
2146  if (TupIsNull(result))
2147  return false;
2148 
2149  /* Store result in the given slot */
2150  ExecCopySlot(slot, result);
2151 
2152  return true;
2153 }
2154 
2155 /*
2156  * postgresPlanDirectModify
2157  * Consider a direct foreign table modification
2158  *
2159  * Decide whether it is safe to modify a foreign table directly, and if so,
2160  * rewrite subplan accordingly.
2161  */
2162 static bool
2164  ModifyTable *plan,
2165  Index resultRelation,
2166  int subplan_index)
2167 {
2168  CmdType operation = plan->operation;
2169  Plan *subplan;
2170  RelOptInfo *foreignrel;
2171  RangeTblEntry *rte;
2172  PgFdwRelationInfo *fpinfo;
2173  Relation rel;
2174  StringInfoData sql;
2175  ForeignScan *fscan;
2176  List *targetAttrs = NIL;
2177  List *remote_exprs;
2178  List *params_list = NIL;
2179  List *returningList = NIL;
2181 
2182  /*
2183  * Decide whether it is safe to modify a foreign table directly.
2184  */
2185 
2186  /*
2187  * The table modification must be an UPDATE or DELETE.
2188  */
2189  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2190  return false;
2191 
2192  /*
2193  * It's unsafe to modify a foreign table directly if there are any local
2194  * joins needed.
2195  */
2196  subplan = (Plan *) list_nth(plan->plans, subplan_index);
2197  if (!IsA(subplan, ForeignScan))
2198  return false;
2199  fscan = (ForeignScan *) subplan;
2200 
2201  /*
2202  * It's unsafe to modify a foreign table directly if there are any quals
2203  * that should be evaluated locally.
2204  */
2205  if (subplan->qual != NIL)
2206  return false;
2207 
2208  /* Safe to fetch data about the target foreign rel */
2209  if (fscan->scan.scanrelid == 0)
2210  {
2211  foreignrel = find_join_rel(root, fscan->fs_relids);
2212  /* We should have a rel for this foreign join. */
2213  Assert(foreignrel);
2214  }
2215  else
2216  foreignrel = root->simple_rel_array[resultRelation];
2217  rte = root->simple_rte_array[resultRelation];
2218  fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2219 
2220  /*
2221  * It's unsafe to update a foreign table directly, if any expressions to
2222  * assign to the target columns are unsafe to evaluate remotely.
2223  */
2224  if (operation == CMD_UPDATE)
2225  {
2226  int col;
2227 
2228  /*
2229  * We transmit only columns that were explicitly targets of the
2230  * UPDATE, so as to avoid unnecessary data transmission.
2231  */
2232  col = -1;
2233  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2234  {
2235  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2237  TargetEntry *tle;
2238 
2239  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2240  elog(ERROR, "system-column update is not supported");
2241 
2242  tle = get_tle_by_resno(subplan->targetlist, attno);
2243 
2244  if (!tle)
2245  elog(ERROR, "attribute number %d not found in subplan targetlist",
2246  attno);
2247 
2248  if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2249  return false;
2250 
2251  targetAttrs = lappend_int(targetAttrs, attno);
2252  }
2253  }
2254 
2255  /*
2256  * Ok, rewrite subplan so as to modify the foreign table directly.
2257  */
2258  initStringInfo(&sql);
2259 
2260  /*
2261  * Core code already has some lock on each rel being planned, so we can
2262  * use NoLock here.
2263  */
2264  rel = heap_open(rte->relid, NoLock);
2265 
2266  /*
2267  * Recall the qual clauses that must be evaluated remotely. (These are
2268  * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2269  * doesn't care.)
2270  */
2271  remote_exprs = fpinfo->final_remote_exprs;
2272 
2273  /*
2274  * Extract the relevant RETURNING list if any.
2275  */
2276  if (plan->returningLists)
2277  {
2278  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2279 
2280  /*
2281  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2282  * we fetch from the foreign server any Vars specified in RETURNING
2283  * that refer not only to the target relation but to non-target
2284  * relations. So we'll deparse them into the RETURNING clause of the
2285  * remote query; use a targetlist consisting of them instead, which
2286  * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2287  * node below.
2288  */
2289  if (fscan->scan.scanrelid == 0)
2290  returningList = build_remote_returning(resultRelation, rel,
2291  returningList);
2292  }
2293 
2294  /*
2295  * Construct the SQL command string.
2296  */
2297  switch (operation)
2298  {
2299  case CMD_UPDATE:
2300  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2301  foreignrel,
2302  ((Plan *) fscan)->targetlist,
2303  targetAttrs,
2304  remote_exprs, &params_list,
2305  returningList, &retrieved_attrs);
2306  break;
2307  case CMD_DELETE:
2308  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2309  foreignrel,
2310  remote_exprs, &params_list,
2311  returningList, &retrieved_attrs);
2312  break;
2313  default:
2314  elog(ERROR, "unexpected operation: %d", (int) operation);
2315  break;
2316  }
2317 
2318  /*
2319  * Update the operation info.
2320  */
2321  fscan->operation = operation;
2322 
2323  /*
2324  * Update the fdw_exprs list that will be available to the executor.
2325  */
2326  fscan->fdw_exprs = params_list;
2327 
2328  /*
2329  * Update the fdw_private list that will be available to the executor.
2330  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2331  */
2332  fscan->fdw_private = list_make4(makeString(sql.data),
2333  makeInteger((retrieved_attrs != NIL)),
2334  retrieved_attrs,
2335  makeInteger(plan->canSetTag));
2336 
2337  /*
2338  * Update the foreign-join-related fields.
2339  */
2340  if (fscan->scan.scanrelid == 0)
2341  {
2342  /* No need for the outer subplan. */
2343  fscan->scan.plan.lefttree = NULL;
2344 
2345  /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2346  if (returningList)
2347  rebuild_fdw_scan_tlist(fscan, returningList);
2348  }
2349 
2350  heap_close(rel, NoLock);
2351  return true;
2352 }
2353 
2354 /*
2355  * postgresBeginDirectModify
2356  * Prepare a direct foreign table modification
2357  */
2358 static void
2360 {
2361  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2362  EState *estate = node->ss.ps.state;
2363  PgFdwDirectModifyState *dmstate;
2364  Index rtindex;
2365  RangeTblEntry *rte;
2366  Oid userid;
2367  ForeignTable *table;
2368  UserMapping *user;
2369  int numParams;
2370 
2371  /*
2372  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2373  */
2374  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2375  return;
2376 
2377  /*
2378  * We'll save private state in node->fdw_state.
2379  */
2380  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2381  node->fdw_state = (void *) dmstate;
2382 
2383  /*
2384  * Identify which user to do the remote access as. This should match what
2385  * ExecCheckRTEPerms() does.
2386  */
2387  rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2388  rte = rt_fetch(rtindex, estate->es_range_table);
2389  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2390 
2391  /* Get info about foreign table. */
2392  if (fsplan->scan.scanrelid == 0)
2393  dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2394  else
2395  dmstate->rel = node->ss.ss_currentRelation;
2396  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2397  user = GetUserMapping(userid, table->serverid);
2398 
2399  /*
2400  * Get connection to the foreign server. Connection manager will
2401  * establish new connection if necessary.
2402  */
2403  dmstate->conn = GetConnection(user, false);
2404 
2405  /* Update the foreign-join-related fields. */
2406  if (fsplan->scan.scanrelid == 0)
2407  {
2408  /* Save info about foreign table. */
2409  dmstate->resultRel = dmstate->rel;
2410 
2411  /*
2412  * Set dmstate->rel to NULL to teach get_returning_data() and
2413  * make_tuple_from_result_row() that columns fetched from the remote
2414  * server are described by fdw_scan_tlist of the foreign-scan plan
2415  * node, not the tuple descriptor for the target relation.
2416  */
2417  dmstate->rel = NULL;
2418  }
2419 
2420  /* Initialize state variable */
2421  dmstate->num_tuples = -1; /* -1 means not set yet */
2422 
2423  /* Get private info created by planner functions. */
2424  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2426  dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2428  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2430  dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2432 
2433  /* Create context for per-tuple temp workspace. */
2434  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2435  "postgres_fdw temporary data",
2437 
2438  /* Prepare for input conversion of RETURNING results. */
2439  if (dmstate->has_returning)
2440  {
2442 
2443  if (fsplan->scan.scanrelid == 0)
2444  tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2445  else
2446  tupdesc = RelationGetDescr(dmstate->rel);
2447 
2448  dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2449 
2450  /*
2451  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2452  * initialize a filter to extract an updated/deleted tuple from a scan
2453  * tuple.
2454  */
2455  if (fsplan->scan.scanrelid == 0)
2456  init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2457  }
2458 
2459  /*
2460  * Prepare for processing of parameters used in remote query, if any.
2461  */
2462  numParams = list_length(fsplan->fdw_exprs);
2463  dmstate->numParams = numParams;
2464  if (numParams > 0)
2466  fsplan->fdw_exprs,
2467  numParams,
2468  &dmstate->param_flinfo,
2469  &dmstate->param_exprs,
2470  &dmstate->param_values);
2471 }
2472 
2473 /*
2474  * postgresIterateDirectModify
2475  * Execute a direct foreign table modification
2476  */
2477 static TupleTableSlot *
2479 {
2481  EState *estate = node->ss.ps.state;
2482  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2483 
2484  /*
2485  * If this is the first call after Begin, execute the statement.
2486  */
2487  if (dmstate->num_tuples == -1)
2488  execute_dml_stmt(node);
2489 
2490  /*
2491  * If the local query doesn't specify RETURNING, just clear tuple slot.
2492  */
2493  if (!resultRelInfo->ri_projectReturning)
2494  {
2495  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2496  Instrumentation *instr = node->ss.ps.instrument;
2497 
2498  Assert(!dmstate->has_returning);
2499 
2500  /* Increment the command es_processed count if necessary. */
2501  if (dmstate->set_processed)
2502  estate->es_processed += dmstate->num_tuples;
2503 
2504  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2505  if (instr)
2506  instr->tuplecount += dmstate->num_tuples;
2507 
2508  return ExecClearTuple(slot);
2509  }
2510 
2511  /*
2512  * Get the next RETURNING tuple.
2513  */
2514  return get_returning_data(node);
2515 }
2516 
2517 /*
2518  * postgresEndDirectModify
2519  * Finish a direct foreign table modification
2520  */
2521 static void
2523 {
2525 
2526  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2527  if (dmstate == NULL)
2528  return;
2529 
2530  /* Release PGresult */
2531  if (dmstate->result)
2532  PQclear(dmstate->result);
2533 
2534  /* Release remote connection */
2535  ReleaseConnection(dmstate->conn);
2536  dmstate->conn = NULL;
2537 
2538  /* close the target relation. */
2539  if (dmstate->resultRel)
2540  ExecCloseScanRelation(dmstate->resultRel);
2541 
2542  /* MemoryContext will be deleted automatically. */
2543 }
2544 
2545 /*
2546  * postgresExplainForeignScan
2547  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2548  */
2549 static void
2551 {
2552  List *fdw_private;
2553  char *sql;
2554  char *relations;
2555 
2556  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2557 
2558  /*
2559  * Add names of relation handled by the foreign scan when the scan is a
2560  * join
2561  */
2562  if (list_length(fdw_private) > FdwScanPrivateRelations)
2563  {
2564  relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2565  ExplainPropertyText("Relations", relations, es);
2566  }
2567 
2568  /*
2569  * Add remote query, when VERBOSE option is specified.
2570  */
2571  if (es->verbose)
2572  {
2573  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2574  ExplainPropertyText("Remote SQL", sql, es);
2575  }
2576 }
2577 
2578 /*
2579  * postgresExplainForeignModify
2580  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2581  */
2582 static void
2584  ResultRelInfo *rinfo,
2585  List *fdw_private,
2586  int subplan_index,
2587  ExplainState *es)
2588 {
2589  if (es->verbose)
2590  {
2591  char *sql = strVal(list_nth(fdw_private,
2593 
2594  ExplainPropertyText("Remote SQL", sql, es);
2595  }
2596 }
2597 
2598 /*
2599  * postgresExplainDirectModify
2600  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2601  * foreign table directly
2602  */
2603 static void
2605 {
2606  List *fdw_private;
2607  char *sql;
2608 
2609  if (es->verbose)
2610  {
2611  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2612  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2613  ExplainPropertyText("Remote SQL", sql, es);
2614  }
2615 }
2616 
2617 
2618 /*
2619  * estimate_path_cost_size
2620  * Get cost and size estimates for a foreign scan on given foreign relation
2621  * either a base relation or a join between foreign relations or an upper
2622  * relation containing foreign relations.
2623  *
2624  * param_join_conds are the parameterization clauses with outer relations.
2625  * pathkeys specify the expected sort order if any for given path being costed.
2626  *
2627  * The function returns the cost and size estimates in p_row, p_width,
2628  * p_startup_cost and p_total_cost variables.
2629  */
2630 static void
2632  RelOptInfo *foreignrel,
2633  List *param_join_conds,
2634  List *pathkeys,
2635  double *p_rows, int *p_width,
2636  Cost *p_startup_cost, Cost *p_total_cost)
2637 {
2638  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2639  double rows;
2640  double retrieved_rows;
2641  int width;
2642  Cost startup_cost;
2643  Cost total_cost;
2644  Cost cpu_per_tuple;
2645 
2646  /*
2647  * If the table or the server is configured to use remote estimates,
2648  * connect to the foreign server and execute EXPLAIN to estimate the
2649  * number of rows selected by the restriction+join clauses. Otherwise,
2650  * estimate rows using whatever statistics we have locally, in a way
2651  * similar to ordinary tables.
2652  */
2653  if (fpinfo->use_remote_estimate)
2654  {
2655  List *remote_param_join_conds;
2656  List *local_param_join_conds;
2657  StringInfoData sql;
2658  PGconn *conn;
2659  Selectivity local_sel;
2660  QualCost local_cost;
2661  List *fdw_scan_tlist = NIL;
2662  List *remote_conds;
2663 
2664  /* Required only to be passed to deparseSelectStmtForRel */
2666 
2667  /*
2668  * param_join_conds might contain both clauses that are safe to send
2669  * across, and clauses that aren't.
2670  */
2671  classifyConditions(root, foreignrel, param_join_conds,
2672  &remote_param_join_conds, &local_param_join_conds);
2673 
2674  /* Build the list of columns to be fetched from the foreign server. */
2675  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2676  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2677  else
2678  fdw_scan_tlist = NIL;
2679 
2680  /*
2681  * The complete list of remote conditions includes everything from
2682  * baserestrictinfo plus any extra join_conds relevant to this
2683  * particular path.
2684  */
2685  remote_conds = list_concat(list_copy(remote_param_join_conds),
2686  fpinfo->remote_conds);
2687 
2688  /*
2689  * Construct EXPLAIN query including the desired SELECT, FROM, and
2690  * WHERE clauses. Params and other-relation Vars are replaced by dummy
2691  * values, so don't request params_list.
2692  */
2693  initStringInfo(&sql);
2694  appendStringInfoString(&sql, "EXPLAIN ");
2695  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2696  remote_conds, pathkeys, false,
2697  &retrieved_attrs, NULL);
2698 
2699  /* Get the remote estimate */
2700  conn = GetConnection(fpinfo->user, false);
2701  get_remote_estimate(sql.data, conn, &rows, &width,
2702  &startup_cost, &total_cost);
2703  ReleaseConnection(conn);
2704 
2705  retrieved_rows = rows;
2706 
2707  /* Factor in the selectivity of the locally-checked quals */
2708  local_sel = clauselist_selectivity(root,
2709  local_param_join_conds,
2710  foreignrel->relid,
2711  JOIN_INNER,
2712  NULL);
2713  local_sel *= fpinfo->local_conds_sel;
2714 
2715  rows = clamp_row_est(rows * local_sel);
2716 
2717  /* Add in the eval cost of the locally-checked quals */
2718  startup_cost += fpinfo->local_conds_cost.startup;
2719  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2720  cost_qual_eval(&local_cost, local_param_join_conds, root);
2721  startup_cost += local_cost.startup;
2722  total_cost += local_cost.per_tuple * retrieved_rows;
2723  }
2724  else
2725  {
2726  Cost run_cost = 0;
2727 
2728  /*
2729  * We don't support join conditions in this mode (hence, no
2730  * parameterized paths can be made).
2731  */
2732  Assert(param_join_conds == NIL);
2733 
2734  /*
2735  * Use rows/width estimates made by set_baserel_size_estimates() for
2736  * base foreign relations and set_joinrel_size_estimates() for join
2737  * between foreign relations.
2738  */
2739  rows = foreignrel->rows;
2740  width = foreignrel->reltarget->width;
2741 
2742  /* Back into an estimate of the number of retrieved rows. */
2743  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2744 
2745  /*
2746  * We will come here again and again with different set of pathkeys
2747  * that caller wants to cost. We don't need to calculate the cost of
2748  * bare scan each time. Instead, use the costs if we have cached them
2749  * already.
2750  */
2751  if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2752  {
2753  startup_cost = fpinfo->rel_startup_cost;
2754  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2755  }
2756  else if (IS_JOIN_REL(foreignrel))
2757  {
2758  PgFdwRelationInfo *fpinfo_i;
2759  PgFdwRelationInfo *fpinfo_o;
2760  QualCost join_cost;
2761  QualCost remote_conds_cost;
2762  double nrows;
2763 
2764  /* For join we expect inner and outer relations set */
2765  Assert(fpinfo->innerrel && fpinfo->outerrel);
2766 
2767  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2768  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2769 
2770  /* Estimate of number of rows in cross product */
2771  nrows = fpinfo_i->rows * fpinfo_o->rows;
2772  /* Clamp retrieved rows estimate to at most size of cross product */
2773  retrieved_rows = Min(retrieved_rows, nrows);
2774 
2775  /*
2776  * The cost of foreign join is estimated as cost of generating
2777  * rows for the joining relations + cost for applying quals on the
2778  * rows.
2779  */
2780 
2781  /*
2782  * Calculate the cost of clauses pushed down to the foreign server
2783  */
2784  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2785  /* Calculate the cost of applying join clauses */
2786  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2787 
2788  /*
2789  * Startup cost includes startup cost of joining relations and the
2790  * startup cost for join and other clauses. We do not include the
2791  * startup cost specific to join strategy (e.g. setting up hash
2792  * tables) since we do not know what strategy the foreign server
2793  * is going to use.
2794  */
2795  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2796  startup_cost += join_cost.startup;
2797  startup_cost += remote_conds_cost.startup;
2798  startup_cost += fpinfo->local_conds_cost.startup;
2799 
2800  /*
2801  * Run time cost includes:
2802  *
2803  * 1. Run time cost (total_cost - startup_cost) of relations being
2804  * joined
2805  *
2806  * 2. Run time cost of applying join clauses on the cross product
2807  * of the joining relations.
2808  *
2809  * 3. Run time cost of applying pushed down other clauses on the
2810  * result of join
2811  *
2812  * 4. Run time cost of applying nonpushable other clauses locally
2813  * on the result fetched from the foreign server.
2814  */
2815  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2816  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2817  run_cost += nrows * join_cost.per_tuple;
2818  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2819  run_cost += nrows * remote_conds_cost.per_tuple;
2820  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2821  }
2822  else if (IS_UPPER_REL(foreignrel))
2823  {
2824  PgFdwRelationInfo *ofpinfo;
2825  PathTarget *ptarget = foreignrel->reltarget;
2826  AggClauseCosts aggcosts;
2827  double input_rows;
2828  int numGroupCols;
2829  double numGroups = 1;
2830 
2831  /* Make sure the core code set the pathtarget. */
2832  Assert(ptarget != NULL);
2833 
2834  /*
2835  * This cost model is mixture of costing done for sorted and
2836  * hashed aggregates in cost_agg(). We are not sure which
2837  * strategy will be considered at remote side, thus for
2838  * simplicity, we put all startup related costs in startup_cost
2839  * and all finalization and run cost are added in total_cost.
2840  *
2841  * Also, core does not care about costing HAVING expressions and
2842  * adding that to the costs. So similarly, here too we are not
2843  * considering remote and local conditions for costing.
2844  */
2845 
2846  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2847 
2848  /* Get rows and width from input rel */
2849  input_rows = ofpinfo->rows;
2850  width = ofpinfo->width;
2851 
2852  /* Collect statistics about aggregates for estimating costs. */
2853  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2854  if (root->parse->hasAggs)
2855  {
2856  get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2857  AGGSPLIT_SIMPLE, &aggcosts);
2858 
2859  /*
2860  * The cost of aggregates in the HAVING qual will be the same
2861  * for each child as it is for the parent, so there's no need
2862  * to use a translated version of havingQual.
2863  */
2864  get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2865  AGGSPLIT_SIMPLE, &aggcosts);
2866  }
2867 
2868  /* Get number of grouping columns and possible number of groups */
2869  numGroupCols = list_length(root->parse->groupClause);
2870  numGroups = estimate_num_groups(root,
2872  fpinfo->grouped_tlist),
2873  input_rows, NULL);
2874 
2875  /*
2876  * Number of rows expected from foreign server will be same as
2877  * that of number of groups.
2878  */
2879  rows = retrieved_rows = numGroups;
2880 
2881  /*-----
2882  * Startup cost includes:
2883  * 1. Startup cost for underneath input * relation
2884  * 2. Cost of performing aggregation, per cost_agg()
2885  * 3. Startup cost for PathTarget eval
2886  *-----
2887  */
2888  startup_cost = ofpinfo->rel_startup_cost;
2889  startup_cost += aggcosts.transCost.startup;
2890  startup_cost += aggcosts.transCost.per_tuple * input_rows;
2891  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2892  startup_cost += ptarget->cost.startup;
2893 
2894  /*-----
2895  * Run time cost includes:
2896  * 1. Run time cost of underneath input relation
2897  * 2. Run time cost of performing aggregation, per cost_agg()
2898  * 3. PathTarget eval cost for each output row
2899  *-----
2900  */
2901  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2902  run_cost += aggcosts.finalCost * numGroups;
2903  run_cost += cpu_tuple_cost * numGroups;
2904  run_cost += ptarget->cost.per_tuple * numGroups;
2905  }
2906  else
2907  {
2908  /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2909  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2910 
2911  /*
2912  * Cost as though this were a seqscan, which is pessimistic. We
2913  * effectively imagine the local_conds are being evaluated
2914  * remotely, too.
2915  */
2916  startup_cost = 0;
2917  run_cost = 0;
2918  run_cost += seq_page_cost * foreignrel->pages;
2919 
2920  startup_cost += foreignrel->baserestrictcost.startup;
2921  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2922  run_cost += cpu_per_tuple * foreignrel->tuples;
2923  }
2924 
2925  /*
2926  * Without remote estimates, we have no real way to estimate the cost
2927  * of generating sorted output. It could be free if the query plan
2928  * the remote side would have chosen generates properly-sorted output
2929  * anyway, but in most cases it will cost something. Estimate a value
2930  * high enough that we won't pick the sorted path when the ordering
2931  * isn't locally useful, but low enough that we'll err on the side of
2932  * pushing down the ORDER BY clause when it's useful to do so.
2933  */
2934  if (pathkeys != NIL)
2935  {
2936  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2937  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2938  }
2939 
2940  total_cost = startup_cost + run_cost;
2941  }
2942 
2943  /*
2944  * Cache the costs for scans without any pathkeys or parameterization
2945  * before adding the costs for transferring data from the foreign server.
2946  * These costs are useful for costing the join between this relation and
2947  * another foreign relation or to calculate the costs of paths with
2948  * pathkeys for this relation, when the costs can not be obtained from the
2949  * foreign server. This function will be called at least once for every
2950  * foreign relation without pathkeys and parameterization.
2951  */
2952  if (pathkeys == NIL && param_join_conds == NIL)
2953  {
2954  fpinfo->rel_startup_cost = startup_cost;
2955  fpinfo->rel_total_cost = total_cost;
2956  }
2957 
2958  /*
2959  * Add some additional cost factors to account for connection overhead
2960  * (fdw_startup_cost), transferring data across the network
2961  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2962  * (cpu_tuple_cost per retrieved row).
2963  */
2964  startup_cost += fpinfo->fdw_startup_cost;
2965  total_cost += fpinfo->fdw_startup_cost;
2966  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2967  total_cost += cpu_tuple_cost * retrieved_rows;
2968 
2969  /* Return results. */
2970  *p_rows = rows;
2971  *p_width = width;
2972  *p_startup_cost = startup_cost;
2973  *p_total_cost = total_cost;
2974 }
2975 
2976 /*
2977  * Estimate costs of executing a SQL statement remotely.
2978  * The given "sql" must be an EXPLAIN command.
2979  */
2980 static void
2981 get_remote_estimate(const char *sql, PGconn *conn,
2982  double *rows, int *width,
2983  Cost *startup_cost, Cost *total_cost)
2984 {
2985  PGresult *volatile res = NULL;
2986 
2987  /* PGresult must be released before leaving this function. */
2988  PG_TRY();
2989  {
2990  char *line;
2991  char *p;
2992  int n;
2993 
2994  /*
2995  * Execute EXPLAIN remotely.
2996  */
2997  res = pgfdw_exec_query(conn, sql);
2998  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2999  pgfdw_report_error(ERROR, res, conn, false, sql);
3000 
3001  /*
3002  * Extract cost numbers for topmost plan node. Note we search for a
3003  * left paren from the end of the line to avoid being confused by
3004  * other uses of parentheses.
3005  */
3006  line = PQgetvalue(res, 0, 0);
3007  p = strrchr(line, '(');
3008  if (p == NULL)
3009  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3010  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3011  startup_cost, total_cost, rows, width);
3012  if (n != 4)
3013  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3014 
3015  PQclear(res);
3016  res = NULL;
3017  }
3018  PG_CATCH();
3019  {
3020  if (res)
3021  PQclear(res);
3022  PG_RE_THROW();
3023  }
3024  PG_END_TRY();
3025 }
3026 
3027 /*
3028  * Detect whether we want to process an EquivalenceClass member.
3029  *
3030  * This is a callback for use by generate_implied_equalities_for_column.
3031  */
3032 static bool
3035  void *arg)
3036 {
3038  Expr *expr = em->em_expr;
3039 
3040  /*
3041  * If we've identified what we're processing in the current scan, we only
3042  * want to match that expression.
3043  */
3044  if (state->current != NULL)
3045  return equal(expr, state->current);
3046 
3047  /*
3048  * Otherwise, ignore anything we've already processed.
3049  */
3050  if (list_member(state->already_used, expr))
3051  return false;
3052 
3053  /* This is the new target to process. */
3054  state->current = expr;
3055  return true;
3056 }
3057 
3058 /*
3059  * Create cursor for node's query with current parameter values.
3060  */
3061 static void
3063 {
3064  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3065  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3066  int numParams = fsstate->numParams;
3067  const char **values = fsstate->param_values;
3068  PGconn *conn = fsstate->conn;
3070  PGresult *res;
3071 
3072  /*
3073  * Construct array of query parameter values in text format. We do the
3074  * conversions in the short-lived per-tuple context, so as not to cause a
3075  * memory leak over repeated scans.
3076  */
3077  if (numParams > 0)
3078  {
3079  MemoryContext oldcontext;
3080 
3081  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3082 
3083  process_query_params(econtext,
3084  fsstate->param_flinfo,
3085  fsstate->param_exprs,
3086  values);
3087 
3088  MemoryContextSwitchTo(oldcontext);
3089  }
3090 
3091  /* Construct the DECLARE CURSOR command */
3092  initStringInfo(&buf);
3093  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3094  fsstate->cursor_number, fsstate->query);
3095 
3096  /*
3097  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3098  * to infer types for all parameters. Since we explicitly cast every
3099  * parameter (see deparse.c), the "inference" is trivial and will produce
3100  * the desired result. This allows us to avoid assuming that the remote
3101  * server has the same OIDs we do for the parameters' types.
3102  */
3103  if (!PQsendQueryParams(conn, buf.data, numParams,
3104  NULL, values, NULL, NULL, 0))
3105  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3106 
3107  /*
3108  * Get the result, and check for success.
3109  *
3110  * We don't use a PG_TRY block here, so be careful not to throw error
3111  * without releasing the PGresult.
3112  */
3113  res = pgfdw_get_result(conn, buf.data);
3114  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3115  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3116  PQclear(res);
3117 
3118  /* Mark the cursor as created, and show no tuples have been retrieved */
3119  fsstate->cursor_exists = true;
3120  fsstate->tuples = NULL;
3121  fsstate->num_tuples = 0;
3122  fsstate->next_tuple = 0;
3123  fsstate->fetch_ct_2 = 0;
3124  fsstate->eof_reached = false;
3125 
3126  /* Clean up */
3127  pfree(buf.data);
3128 }
3129 
3130 /*
3131  * Fetch some more rows from the node's cursor.
3132  */
3133 static void
3135 {
3136  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3137  PGresult *volatile res = NULL;
3138  MemoryContext oldcontext;
3139 
3140  /*
3141  * We'll store the tuples in the batch_cxt. First, flush the previous
3142  * batch.
3143  */
3144  fsstate->tuples = NULL;
3145  MemoryContextReset(fsstate->batch_cxt);
3146  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3147 
3148  /* PGresult must be released before leaving this function. */
3149  PG_TRY();
3150  {
3151  PGconn *conn = fsstate->conn;
3152  char sql[64];
3153  int numrows;
3154  int i;
3155 
3156  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3157  fsstate->fetch_size, fsstate->cursor_number);
3158 
3159  res = pgfdw_exec_query(conn, sql);
3160  /* On error, report the original query, not the FETCH. */
3161  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3162  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3163 
3164  /* Convert the data into HeapTuples */
3165  numrows = PQntuples(res);
3166  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3167  fsstate->num_tuples = numrows;
3168  fsstate->next_tuple = 0;
3169 
3170  for (i = 0; i < numrows; i++)
3171  {
3172  Assert(IsA(node->ss.ps.plan, ForeignScan));
3173 
3174  fsstate->tuples[i] =
3176  fsstate->rel,
3177  fsstate->attinmeta,
3178  fsstate->retrieved_attrs,
3179  node,
3180  fsstate->temp_cxt);
3181  }
3182 
3183  /* Update fetch_ct_2 */
3184  if (fsstate->fetch_ct_2 < 2)
3185  fsstate->fetch_ct_2++;
3186 
3187  /* Must be EOF if we didn't get as many tuples as we asked for. */
3188  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3189 
3190  PQclear(res);
3191  res = NULL;
3192  }
3193  PG_CATCH();
3194  {
3195  if (res)
3196  PQclear(res);
3197  PG_RE_THROW();
3198  }
3199  PG_END_TRY();
3200 
3201  MemoryContextSwitchTo(oldcontext);
3202 }
3203 
3204 /*
3205  * Force assorted GUC parameters to settings that ensure that we'll output
3206  * data values in a form that is unambiguous to the remote server.
3207  *
3208  * This is rather expensive and annoying to do once per row, but there's
3209  * little choice if we want to be sure values are transmitted accurately;
3210  * we can't leave the settings in place between rows for fear of affecting
3211  * user-visible computations.
3212  *
3213  * We use the equivalent of a function SET option to allow the settings to
3214  * persist only until the caller calls reset_transmission_modes(). If an
3215  * error is thrown in between, guc.c will take care of undoing the settings.
3216  *
3217  * The return value is the nestlevel that must be passed to
3218  * reset_transmission_modes() to undo things.
3219  */
3220 int
3222 {
3223  int nestlevel = NewGUCNestLevel();
3224 
3225  /*
3226  * The values set here should match what pg_dump does. See also
3227  * configure_remote_session in connection.c.
3228  */
3229  if (DateStyle != USE_ISO_DATES)
3230  (void) set_config_option("datestyle", "ISO",
3232  GUC_ACTION_SAVE, true, 0, false);
3234  (void) set_config_option("intervalstyle", "postgres",
3236  GUC_ACTION_SAVE, true, 0, false);
3237  if (extra_float_digits < 3)
3238  (void) set_config_option("extra_float_digits", "3",
3240  GUC_ACTION_SAVE, true, 0, false);
3241 
3242  return nestlevel;
3243 }
3244 
3245 /*
3246  * Undo the effects of set_transmission_modes().
3247  */
3248 void
3250 {
3251  AtEOXact_GUC(true, nestlevel);
3252 }
3253 
3254 /*
3255  * Utility routine to close a cursor.
3256  */
3257 static void
3259 {
3260  char sql[64];
3261  PGresult *res;
3262 
3263  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3264 
3265  /*
3266  * We don't use a PG_TRY block here, so be careful not to throw error
3267  * without releasing the PGresult.
3268  */
3269  res = pgfdw_exec_query(conn, sql);
3270  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3271  pgfdw_report_error(ERROR, res, conn, true, sql);
3272  PQclear(res);
3273 }
3274 
3275 /*
3276  * create_foreign_modify
3277  * Construct an execution state of a foreign insert/update/delete
3278  * operation
3279  */
3280 static PgFdwModifyState *
3282  RangeTblEntry *rte,
3283  ResultRelInfo *resultRelInfo,
3284  CmdType operation,
3285  Plan *subplan,
3286  char *query,
3287  List *target_attrs,
3288  bool has_returning,
3290 {
3291  PgFdwModifyState *fmstate;
3292  Relation rel = resultRelInfo->ri_RelationDesc;
3294  Oid userid;
3295  ForeignTable *table;
3296  UserMapping *user;
3297  AttrNumber n_params;
3298  Oid typefnoid;
3299  bool isvarlena;
3300  ListCell *lc;
3301 
3302  /* Begin constructing PgFdwModifyState. */
3303  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3304  fmstate->rel = rel;
3305 
3306  /*
3307  * Identify which user to do the remote access as. This should match what
3308  * ExecCheckRTEPerms() does.
3309  */
3310  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3311 
3312  /* Get info about foreign table. */
3313  table = GetForeignTable(RelationGetRelid(rel));
3314  user = GetUserMapping(userid, table->serverid);
3315 
3316  /* Open connection; report that we'll create a prepared statement. */
3317  fmstate->conn = GetConnection(user, true);
3318  fmstate->p_name = NULL; /* prepared statement not made yet */
3319 
3320  /* Set up remote query information. */
3321  fmstate->query = query;
3322  fmstate->target_attrs = target_attrs;
3323  fmstate->has_returning = has_returning;
3324  fmstate->retrieved_attrs = retrieved_attrs;
3325 
3326  /* Create context for per-tuple temp workspace. */
3327  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3328  "postgres_fdw temporary data",
3330 
3331  /* Prepare for input conversion of RETURNING results. */
3332  if (fmstate->has_returning)
3333  fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3334 
3335  /* Prepare for output conversion of parameters used in prepared stmt. */
3336  n_params = list_length(fmstate->target_attrs) + 1;
3337  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3338  fmstate->p_nums = 0;
3339 
3340  if (operation == CMD_UPDATE || operation == CMD_DELETE)
3341  {
3342  Assert(subplan != NULL);
3343 
3344  /* Find the ctid resjunk column in the subplan's result */
3346  "ctid");
3347  if (!AttributeNumberIsValid(fmstate->ctidAttno))
3348  elog(ERROR, "could not find junk ctid column");
3349 
3350  /* First transmittable parameter will be ctid */
3351  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3352  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3353  fmstate->p_nums++;
3354  }
3355 
3356  if (operation == CMD_INSERT || operation == CMD_UPDATE)
3357  {
3358  /* Set up for remaining transmittable parameters */
3359  foreach(lc, fmstate->target_attrs)
3360  {
3361  int attnum = lfirst_int(lc);
3362  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3363 
3364  Assert(!attr->attisdropped);
3365 
3366  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3367  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3368  fmstate->p_nums++;
3369  }
3370  }
3371 
3372  Assert(fmstate->p_nums <= n_params);
3373 
3374  return fmstate;
3375 }
3376 
3377 /*
3378  * prepare_foreign_modify
3379  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3380  */
3381 static void
3383 {
3384  char prep_name[NAMEDATALEN];
3385  char *p_name;
3386  PGresult *res;
3387 
3388  /* Construct name we'll use for the prepared statement. */
3389  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3390  GetPrepStmtNumber(fmstate->conn));
3391  p_name = pstrdup(prep_name);
3392 
3393  /*
3394  * We intentionally do not specify parameter types here, but leave the
3395  * remote server to derive them by default. This avoids possible problems
3396  * with the remote server using different type OIDs than we do. All of
3397  * the prepared statements we use in this module are simple enough that
3398  * the remote server will make the right choices.
3399  */
3400  if (!PQsendPrepare(fmstate->conn,
3401  p_name,
3402  fmstate->query,
3403  0,
3404  NULL))
3405  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3406 
3407  /*
3408  * Get the result, and check for success.
3409  *
3410  * We don't use a PG_TRY block here, so be careful not to throw error
3411  * without releasing the PGresult.
3412  */
3413  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3414  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3415  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3416  PQclear(res);
3417 
3418  /* This action shows that the prepare has been done. */
3419  fmstate->p_name = p_name;
3420 }
3421 
3422 /*
3423  * convert_prep_stmt_params
3424  * Create array of text strings representing parameter values
3425  *
3426  * tupleid is ctid to send, or NULL if none
3427  * slot is slot to get remaining parameters from, or NULL if none
3428  *
3429  * Data is constructed in temp_cxt; caller should reset that after use.
3430  */
3431 static const char **
3433  ItemPointer tupleid,
3434  TupleTableSlot *slot)
3435 {
3436  const char **p_values;
3437  int pindex = 0;
3438  MemoryContext oldcontext;
3439 
3440  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3441 
3442  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3443 
3444  /* 1st parameter should be ctid, if it's in use */
3445  if (tupleid != NULL)
3446  {
3447  /* don't need set_transmission_modes for TID output */
3448  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3449  PointerGetDatum(tupleid));
3450  pindex++;
3451  }
3452 
3453  /* get following parameters from slot */
3454  if (slot != NULL && fmstate->target_attrs != NIL)
3455  {
3456  int nestlevel;
3457  ListCell *lc;
3458 
3459  nestlevel = set_transmission_modes();
3460 
3461  foreach(lc, fmstate->target_attrs)
3462  {
3463  int attnum = lfirst_int(lc);
3464  Datum value;
3465  bool isnull;
3466 
3467  value = slot_getattr(slot, attnum, &isnull);
3468  if (isnull)
3469  p_values[pindex] = NULL;
3470  else
3471  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3472  value);
3473  pindex++;
3474  }
3475 
3476  reset_transmission_modes(nestlevel);
3477  }
3478 
3479  Assert(pindex == fmstate->p_nums);
3480 
3481  MemoryContextSwitchTo(oldcontext);
3482 
3483  return p_values;
3484 }
3485 
3486 /*
3487  * store_returning_result
3488  * Store the result of a RETURNING clause
3489  *
3490  * On error, be sure to release the PGresult on the way out. Callers do not
3491  * have PG_TRY blocks to ensure this happens.
3492  */
3493 static void
3495  TupleTableSlot *slot, PGresult *res)
3496 {
3497  PG_TRY();
3498  {
3499  HeapTuple newtup;
3500 
3501  newtup = make_tuple_from_result_row(res, 0,
3502  fmstate->rel,
3503  fmstate->attinmeta,
3504  fmstate->retrieved_attrs,
3505  NULL,
3506  fmstate->temp_cxt);
3507  /* tuple will be deleted when it is cleared from the slot */
3508  ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3509  }
3510  PG_CATCH();
3511  {
3512  if (res)
3513  PQclear(res);
3514  PG_RE_THROW();
3515  }
3516  PG_END_TRY();
3517 }
3518 
3519 /*
3520  * finish_foreign_modify
3521  * Release resources for a foreign insert/update/delete operation
3522  */
3523 static void
3525 {
3526  Assert(fmstate != NULL);
3527 
3528  /* If we created a prepared statement, destroy it */
3529  if (fmstate->p_name)
3530  {
3531  char sql[64];
3532  PGresult *res;
3533 
3534  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3535 
3536  /*
3537  * We don't use a PG_TRY block here, so be careful not to throw error
3538  * without releasing the PGresult.
3539  */
3540  res = pgfdw_exec_query(fmstate->conn, sql);
3541  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3542  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3543  PQclear(res);
3544  fmstate->p_name = NULL;
3545  }
3546 
3547  /* Release remote connection */
3548  ReleaseConnection(fmstate->conn);
3549  fmstate->conn = NULL;
3550 }
3551 
3552 /*
3553  * build_remote_returning
3554  * Build a RETURNING targetlist of a remote query for performing an
3555  * UPDATE/DELETE .. RETURNING on a join directly
3556  */
3557 static List *
3558 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3559 {
3560  bool have_wholerow = false;
3561  List *tlist = NIL;
3562  List *vars;
3563  ListCell *lc;
3564 
3565  Assert(returningList);
3566 
3567  vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3568 
3569  /*
3570  * If there's a whole-row reference to the target relation, then we'll
3571  * need all the columns of the relation.
3572  */
3573  foreach(lc, vars)
3574  {
3575  Var *var = (Var *) lfirst(lc);
3576 
3577  if (IsA(var, Var) &&
3578  var->varno == rtindex &&
3579  var->varattno == InvalidAttrNumber)
3580  {
3581  have_wholerow = true;
3582  break;
3583  }
3584  }
3585 
3586  if (have_wholerow)
3587  {
3589  int i;
3590 
3591  for (i = 1; i <= tupdesc->natts; i++)
3592  {
3593  Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3594  Var *var;
3595 
3596  /* Ignore dropped attributes. */
3597  if (attr->attisdropped)
3598  continue;
3599 
3600  var = makeVar(rtindex,
3601  i,
3602  attr->atttypid,
3603  attr->atttypmod,
3604  attr->attcollation,
3605  0);
3606 
3607  tlist = lappend(tlist,
3608  makeTargetEntry((Expr *) var,
3609  list_length(tlist) + 1,
3610  NULL,
3611  false));
3612  }
3613  }
3614 
3615  /* Now add any remaining columns to tlist. */
3616  foreach(lc, vars)
3617  {
3618  Var *var = (Var *) lfirst(lc);
3619 
3620  /*
3621  * No need for whole-row references to the target relation. We don't
3622  * need system columns other than ctid and oid either, since those are
3623  * set locally.
3624  */
3625  if (IsA(var, Var) &&
3626  var->varno == rtindex &&
3627  var->varattno <= InvalidAttrNumber &&
3630  continue; /* don't need it */
3631 
3632  if (tlist_member((Expr *) var, tlist))
3633  continue; /* already got it */
3634 
3635  tlist = lappend(tlist,
3636  makeTargetEntry((Expr *) var,
3637  list_length(tlist) + 1,
3638  NULL,
3639  false));
3640  }
3641 
3642  list_free(vars);
3643 
3644  return tlist;
3645 }
3646 
3647 /*
3648  * rebuild_fdw_scan_tlist
3649  * Build new fdw_scan_tlist of given foreign-scan plan node from given
3650  * tlist
3651  *
3652  * There might be columns that the fdw_scan_tlist of the given foreign-scan
3653  * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
3654  * have contained resjunk columns such as 'ctid' of the target relation and
3655  * 'wholerow' of non-target relations, but the tlist might not contain them,
3656  * for example. So, adjust the tlist so it contains all the columns specified
3657  * in the fdw_scan_tlist; else setrefs.c will get confused.
3658  */
3659 static void
3661 {
3662  List *new_tlist = tlist;
3663  List *old_tlist = fscan->fdw_scan_tlist;
3664  ListCell *lc;
3665 
3666  foreach(lc, old_tlist)
3667  {
3668  TargetEntry *tle = (TargetEntry *) lfirst(lc);
3669 
3670  if (tlist_member(tle->expr, new_tlist))
3671  continue; /* already got it */
3672 
3673  new_tlist = lappend(new_tlist,
3674  makeTargetEntry(tle->expr,
3675  list_length(new_tlist) + 1,
3676  NULL,
3677  false));
3678  }
3679  fscan->fdw_scan_tlist = new_tlist;
3680 }
3681 
3682 /*
3683  * Execute a direct UPDATE/DELETE statement.
3684  */
3685 static void
3687 {
3689  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3690  int numParams = dmstate->numParams;
3691  const char **values = dmstate->param_values;
3692 
3693  /*
3694  * Construct array of query parameter values in text format.
3695  */
3696  if (numParams > 0)
3697  process_query_params(econtext,
3698  dmstate->param_flinfo,
3699  dmstate->param_exprs,
3700  values);
3701 
3702  /*
3703  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3704  * to infer types for all parameters. Since we explicitly cast every
3705  * parameter (see deparse.c), the "inference" is trivial and will produce
3706  * the desired result. This allows us to avoid assuming that the remote
3707  * server has the same OIDs we do for the parameters' types.
3708  */
3709  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3710  NULL, values, NULL, NULL, 0))
3711  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3712 
3713  /*
3714  * Get the result, and check for success.
3715  *
3716  * We don't use a PG_TRY block here, so be careful not to throw error
3717  * without releasing the PGresult.
3718  */
3719  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3720  if (PQresultStatus(dmstate->result) !=
3722  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3723  dmstate->query);
3724 
3725  /* Get the number of rows affected. */
3726  if (dmstate->has_returning)
3727  dmstate->num_tuples = PQntuples(dmstate->result);
3728  else
3729  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3730 }
3731 
3732 /*
3733  * Get the result of a RETURNING clause.
3734  */
3735 static TupleTableSlot *
3737 {
3739  EState *estate = node->ss.ps.state;
3740  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3741  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3742  TupleTableSlot *resultSlot;
3743 
3744  Assert(resultRelInfo->ri_projectReturning);
3745 
3746  /* If we didn't get any tuples, must be end of data. */
3747  if (dmstate->next_tuple >= dmstate->num_tuples)
3748  return ExecClearTuple(slot);
3749 
3750  /* Increment the command es_processed count if necessary. */
3751  if (dmstate->set_processed)
3752  estate->es_processed += 1;
3753 
3754  /*
3755  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3756  * tuple. (has_returning is false when the local query is of the form
3757  * "UPDATE/DELETE .. RETURNING 1" for example.)
3758  */
3759  if (!dmstate->has_returning)
3760  {
3761  ExecStoreAllNullTuple(slot);
3762  resultSlot = slot;
3763  }
3764  else
3765  {
3766  /*
3767  * On error, be sure to release the PGresult on the way out. Callers
3768  * do not have PG_TRY blocks to ensure this happens.
3769  */
3770  PG_TRY();
3771  {
3772  HeapTuple newtup;
3773 
3774  newtup = make_tuple_from_result_row(dmstate->result,
3775  dmstate->next_tuple,
3776  dmstate->rel,
3777  dmstate->attinmeta,
3778  dmstate->retrieved_attrs,
3779  node,
3780  dmstate->temp_cxt);
3781  ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3782  }
3783  PG_CATCH();
3784  {
3785  if (dmstate->result)
3786  PQclear(dmstate->result);
3787  PG_RE_THROW();
3788  }
3789  PG_END_TRY();
3790 
3791  /* Get the updated/deleted tuple. */
3792  if (dmstate->rel)
3793  resultSlot = slot;
3794  else
3795  resultSlot = apply_returning_filter(dmstate, slot, estate);
3796  }
3797  dmstate->next_tuple++;
3798 
3799  /* Make slot available for evaluation of the local query RETURNING list. */
3800  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
3801  resultSlot;
3802 
3803  return slot;
3804 }
3805 
3806 /*
3807  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
3808  */
3809 static void
3811  List *fdw_scan_tlist,
3812  Index rtindex)
3813 {
3814  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
3815  ListCell *lc;
3816  int i;
3817 
3818  /*
3819  * Calculate the mapping between the fdw_scan_tlist's entries and the
3820  * result tuple's attributes.
3821  *
3822  * The "map" is an array of indexes of the result tuple's attributes in
3823  * fdw_scan_tlist, i.e., one entry for every attribute of the result
3824  * tuple. We store zero for any attributes that don't have the
3825  * corresponding entries in that list, marking that a NULL is needed in
3826  * the result tuple.
3827  *
3828  * Also get the indexes of the entries for ctid and oid if any.
3829  */
3830  dmstate->attnoMap = (AttrNumber *)
3831  palloc0(resultTupType->natts * sizeof(AttrNumber));
3832 
3833  dmstate->ctidAttno = dmstate->oidAttno = 0;
3834 
3835  i = 1;
3836  dmstate->hasSystemCols = false;
3837  foreach(lc, fdw_scan_tlist)
3838  {
3839  TargetEntry *tle = (TargetEntry *) lfirst(lc);
3840  Var *var = (Var *) tle->expr;
3841 
3842  Assert(IsA(var, Var));
3843 
3844  /*
3845  * If the Var is a column of the target relation to be retrieved from
3846  * the foreign server, get the index of the entry.
3847  */
3848  if (var->varno == rtindex &&
3849  list_member_int(dmstate->retrieved_attrs, i))
3850  {
3851  int attrno = var->varattno;
3852 
3853  if (attrno < 0)
3854  {
3855  /*
3856  * We don't retrieve system columns other than ctid and oid.
3857  */
3858  if (attrno == SelfItemPointerAttributeNumber)
3859  dmstate->ctidAttno = i;
3860  else if (attrno == ObjectIdAttributeNumber)
3861  dmstate->oidAttno = i;
3862  else
3863  Assert(false);
3864  dmstate->hasSystemCols = true;
3865  }
3866  else
3867  {
3868  /*
3869  * We don't retrieve whole-row references to the target
3870  * relation either.
3871  */
3872  Assert(attrno > 0);
3873 
3874  dmstate->attnoMap[attrno - 1] = i;
3875  }
3876  }
3877  i++;
3878  }
3879 }
3880 
3881 /*
3882  * Extract and return an updated/deleted tuple from a scan tuple.
3883  */
3884 static TupleTableSlot *
3886  TupleTableSlot *slot,
3887  EState *estate)
3888 {
3889  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
3890  TupleTableSlot *resultSlot;
3891  Datum *values;
3892  bool *isnull;
3893  Datum *old_values;
3894  bool *old_isnull;
3895  int i;
3896 
3897  /*
3898  * Use the trigger tuple slot as a place to store the result tuple.
3899  */
3900  resultSlot = estate->es_trig_tuple_slot;
3901  if (resultSlot->tts_tupleDescriptor != resultTupType)
3902  ExecSetSlotDescriptor(resultSlot, resultTupType);
3903 
3904  /*
3905  * Extract all the values of the scan tuple.
3906  */
3907  slot_getallattrs(slot);
3908  old_values = slot->tts_values;
3909  old_isnull = slot->tts_isnull;
3910 
3911  /*
3912  * Prepare to build the result tuple.
3913  */
3914  ExecClearTuple(resultSlot);
3915  values = resultSlot->tts_values;
3916  isnull = resultSlot->tts_isnull;
3917 
3918  /*
3919  * Transpose data into proper fields of the result tuple.
3920  */
3921  for (i = 0; i < resultTupType->natts; i++)
3922  {
3923  int j = dmstate->attnoMap[i];
3924 
3925  if (j == 0)
3926  {
3927  values[i] = (Datum) 0;
3928  isnull[i] = true;
3929  }
3930  else
3931  {
3932  values[i] = old_values[j - 1];
3933  isnull[i] = old_isnull[j - 1];
3934  }
3935  }
3936 
3937  /*
3938  * Build the virtual tuple.
3939  */
3940  ExecStoreVirtualTuple(resultSlot);
3941 
3942  /*
3943  * If we have any system columns to return, install them.
3944  */
3945  if (dmstate->hasSystemCols)
3946  {
3947  HeapTuple resultTup = ExecMaterializeSlot(resultSlot);
3948 
3949  /* ctid */
3950  if (dmstate->ctidAttno)
3951  {
3952  ItemPointer ctid = NULL;
3953 
3954  ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
3955  resultTup->t_self = *ctid;
3956  }
3957 
3958  /* oid */
3959  if (dmstate->oidAttno)
3960  {
3961  Oid oid = InvalidOid;
3962 
3963  oid = DatumGetObjectId(old_values[dmstate->oidAttno - 1]);
3964  HeapTupleSetOid(resultTup, oid);
3965  }
3966 
3967  /*
3968  * And remaining columns
3969  *
3970  * Note: since we currently don't allow the target relation to appear
3971  * on the nullable side of an outer join, any system columns wouldn't
3972  * go to NULL.
3973  *
3974  * Note: no need to care about tableoid here because it will be
3975  * initialized in ExecProcessReturning().
3976  */
3980  }
3981 
3982  /*
3983  * And return the result tuple.
3984  */
3985  return resultSlot;
3986 }
3987 
3988 /*
3989  * Prepare for processing of parameters used in remote query.
3990  */
3991 static void
3993  List *fdw_exprs,
3994  int numParams,
3996  List **param_exprs,
3997  const char ***param_values)
3998 {
3999  int i;
4000  ListCell *lc;
4001 
4002  Assert(numParams > 0);
4003 
4004  /* Prepare for output conversion of parameters used in remote query. */
4005  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4006 
4007  i = 0;
4008  foreach(lc, fdw_exprs)
4009  {
4010  Node *param_expr = (Node *) lfirst(lc);
4011  Oid typefnoid;
4012  bool isvarlena;
4013 
4014  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4015  fmgr_info(typefnoid, &(*param_flinfo)[i]);
4016  i++;
4017  }
4018 
4019  /*
4020  * Prepare remote-parameter expressions for evaluation. (Note: in
4021  * practice, we expect that all these expressions will be just Params, so
4022  * we could possibly do something more efficient than using the full
4023  * expression-eval machinery for this. But probably there would be little
4024  * benefit, and it'd require postgres_fdw to know more than is desirable
4025  * about Param evaluation.)
4026  */
4027  *param_exprs = ExecInitExprList(fdw_exprs, node);
4028 
4029  /* Allocate buffer for text form of query parameters. */
4030  *param_values = (const char **) palloc0(numParams * sizeof(char *));
4031 }
4032 
4033 /*
4034  * Construct array of query parameter values in text format.
4035  */
4036 static void
4039  List *param_exprs,
4040  const char **param_values)
4041 {
4042  int nestlevel;
4043  int i;
4044  ListCell *lc;
4045 
4046  nestlevel = set_transmission_modes();
4047 
4048  i = 0;
4049  foreach(lc, param_exprs)
4050  {
4051  ExprState *expr_state = (ExprState *) lfirst(lc);
4052  Datum expr_value;
4053  bool isNull;
4054 
4055  /* Evaluate the parameter expression */
4056  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4057 
4058  /*
4059  * Get string representation of each parameter value by invoking
4060  * type-specific output function, unless the value is null.
4061  */
4062  if (isNull)
4063  param_values[i] = NULL;
4064  else
4065  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4066 
4067  i++;
4068  }
4069 
4070  reset_transmission_modes(nestlevel);
4071 }
4072 
4073 /*
4074  * postgresAnalyzeForeignTable
4075  * Test whether analyzing this foreign table is supported
4076  */
4077 static bool
4079  AcquireSampleRowsFunc *func,
4080  BlockNumber *totalpages)
4081 {
4082  ForeignTable *table;
4083  UserMapping *user;
4084  PGconn *conn;
4085  StringInfoData sql;
4086  PGresult *volatile res = NULL;
4087 
4088  /* Return the row-analysis function pointer */
4090 
4091  /*
4092  * Now we have to get the number of pages. It's annoying that the ANALYZE
4093  * API requires us to return that now, because it forces some duplication
4094  * of effort between this routine and postgresAcquireSampleRowsFunc. But
4095  * it's probably not worth redefining that API at this point.
4096  */
4097 
4098  /*
4099  * Get the connection to use. We do the remote access as the table's
4100  * owner, even if the ANALYZE was started by some other user.
4101  */
4102  table = GetForeignTable(RelationGetRelid(relation));
4103  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4104  conn = GetConnection(user, false);
4105 
4106  /*
4107  * Construct command to get page count for relation.
4108  */
4109  initStringInfo(&sql);
4110  deparseAnalyzeSizeSql(&sql, relation);
4111 
4112  /* In what follows, do not risk leaking any PGresults. */
4113  PG_TRY();
4114  {
4115  res = pgfdw_exec_query(conn, sql.data);
4116  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4117  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4118 
4119  if (PQntuples(res) != 1 || PQnfields(res) != 1)
4120  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4121  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4122 
4123  PQclear(res);
4124  res = NULL;
4125  }
4126  PG_CATCH();
4127  {
4128  if (res)
4129  PQclear(res);
4130  PG_RE_THROW();
4131  }
4132  PG_END_TRY();
4133 
4134  ReleaseConnection(conn);
4135 
4136  return true;
4137 }
4138 
4139 /*
4140  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4141  *
4142  * We fetch the whole table from the remote side and pick out some sample rows.
4143  *
4144  * Selected rows are returned in the caller-allocated array rows[],
4145  * which must have at least targrows entries.
4146  * The actual number of rows selected is returned as the function result.
4147  * We also count the total number of rows in the table and return it into
4148  * *totalrows. Note that *totaldeadrows is always set to 0.
4149  *
4150  * Note that the returned list of rows is not always in order by physical
4151  * position in the table. Therefore, correlation estimates derived later
4152  * may be meaningless, but it's OK because we don't use the estimates
4153  * currently (the planner only pays attention to correlation for indexscans).
4154  */
4155 static int
4157  HeapTuple *rows, int targrows,
4158  double *totalrows,
4159  double *totaldeadrows)
4160 {
4161  PgFdwAnalyzeState astate;
4162  ForeignTable *table;
4163  ForeignServer *server;
4164  UserMapping *user;
4165  PGconn *conn;
4166  unsigned int cursor_number;
4167  StringInfoData sql;
4168  PGresult *volatile res = NULL;
4169 
4170  /* Initialize workspace state */
4171  astate.rel = relation;
4173 
4174  astate.rows = rows;
4175  astate.targrows = targrows;
4176  astate.numrows = 0;
4177  astate.samplerows = 0;
4178  astate.rowstoskip = -1; /* -1 means not set yet */
4179  reservoir_init_selection_state(&astate.rstate, targrows);
4180 
4181  /* Remember ANALYZE context, and create a per-tuple temp context */
4182  astate.anl_cxt = CurrentMemoryContext;
4184  "postgres_fdw temporary data",
4186 
4187  /*
4188  * Get the connection to use. We do the remote access as the table's
4189  * owner, even if the ANALYZE was started by some other user.
4190  */
4191  table = GetForeignTable(RelationGetRelid(relation));
4192  server = GetForeignServer(table->serverid);
4193  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4194  conn = GetConnection(user, false);
4195 
4196  /*
4197  * Construct cursor that retrieves whole rows from remote.
4198  */
4199  cursor_number = GetCursorNumber(conn);
4200  initStringInfo(&sql);
4201  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4202  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4203 
4204  /* In what follows, do not risk leaking any PGresults. */
4205  PG_TRY();
4206  {
4207  res = pgfdw_exec_query(conn, sql.data);
4208  if (PQresultStatus(res) != PGRES_COMMAND_OK)
4209  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4210  PQclear(res);
4211  res = NULL;
4212 
4213  /* Retrieve and process rows a batch at a time. */
4214  for (;;)
4215  {
4216  char fetch_sql[64];
4217  int fetch_size;
4218  int numrows;
4219  int i;
4220  ListCell *lc;
4221 
4222  /* Allow users to cancel long query */
4224 
4225  /*
4226  * XXX possible future improvement: if rowstoskip is large, we
4227  * could issue a MOVE rather than physically fetching the rows,
4228  * then just adjust rowstoskip and samplerows appropriately.
4229  */
4230 
4231  /* The fetch size is arbitrary, but shouldn't be enormous. */
4232  fetch_size = 100;
4233  foreach(lc, server->options)
4234  {
4235  DefElem *def = (DefElem *) lfirst(lc);
4236 
4237  if (strcmp(def->defname, "fetch_size") == 0)
4238  {
4239  fetch_size = strtol(defGetString(def), NULL, 10);
4240  break;
4241  }
4242  }
4243  foreach(lc, table->options)
4244  {
4245  DefElem *def = (DefElem *) lfirst(lc);
4246 
4247  if (strcmp(def->defname, "fetch_size") == 0)
4248  {
4249  fetch_size = strtol(defGetString(def), NULL, 10);
4250  break;
4251  }
4252  }
4253 
4254  /* Fetch some rows */
4255  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4256  fetch_size, cursor_number);
4257 
4258  res = pgfdw_exec_query(conn, fetch_sql);
4259  /* On error, report the original query, not the FETCH. */
4260  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4261  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4262 
4263  /* Process whatever we got. */
4264  numrows = PQntuples(res);
4265  for (i = 0; i < numrows; i++)
4266  analyze_row_processor(res, i, &astate);
4267 
4268  PQclear(res);
4269  res = NULL;
4270 
4271  /* Must be EOF if we didn't get all the rows requested. */
4272  if (numrows < fetch_size)
4273  break;
4274  }
4275 
4276  /* Close the cursor, just to be tidy. */
4277  close_cursor(conn, cursor_number);
4278  }
4279  PG_CATCH();
4280  {
4281  if (res)
4282  PQclear(res);
4283  PG_RE_THROW();
4284  }
4285  PG_END_TRY();
4286 
4287  ReleaseConnection(conn);
4288 
4289  /* We assume that we have no dead tuple. */
4290  *totaldeadrows = 0.0;
4291 
4292  /* We've retrieved all living tuples from foreign server. */
4293  *totalrows = astate.samplerows;
4294 
4295  /*
4296  * Emit some interesting relation info
4297  */
4298  ereport(elevel,
4299  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4300  RelationGetRelationName(relation),
4301  astate.samplerows, astate.numrows)));
4302 
4303  return astate.numrows;
4304 }
4305 
4306 /*
4307  * Collect sample rows from the result of query.
4308  * - Use all tuples in sample until target # of samples are collected.
4309  * - Subsequently, replace already-sampled tuples randomly.
4310  */
4311 static void
4313 {
4314  int targrows = astate->targrows;
4315  int pos; /* array index to store tuple in */
4316  MemoryContext oldcontext;
4317 
4318  /* Always increment sample row counter. */
4319  astate->samplerows += 1;
4320 
4321  /*
4322  * Determine the slot where this sample row should be stored. Set pos to
4323  * negative value to indicate the row should be skipped.
4324  */
4325  if (astate->numrows < targrows)
4326  {
4327  /* First targrows rows are always included into the sample */
4328  pos = astate->numrows++;
4329  }
4330  else
4331  {
4332  /*
4333  * Now we start replacing tuples in the sample until we reach the end
4334  * of the relation. Same algorithm as in acquire_sample_rows in
4335  * analyze.c; see Jeff Vitter's paper.
4336  */
4337  if (astate->rowstoskip < 0)
4338  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4339 
4340  if (astate->rowstoskip <= 0)
4341  {
4342  /* Choose a random reservoir element to replace. */
4343  pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4344  Assert(pos >= 0 && pos < targrows);
4345  heap_freetuple(astate->rows[pos]);
4346  }
4347  else
4348  {
4349  /* Skip this tuple. */
4350  pos = -1;
4351  }
4352 
4353  astate->rowstoskip -= 1;
4354  }
4355 
4356  if (pos >= 0)
4357  {
4358  /*
4359  * Create sample tuple from current result row, and store it in the
4360  * position determined above. The tuple has to be created in anl_cxt.
4361  */
4362  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4363 
4364  astate->rows[pos] = make_tuple_from_result_row(res, row,
4365  astate->rel,
4366  astate->attinmeta,
4367  astate->retrieved_attrs,
4368  NULL,
4369  astate->temp_cxt);
4370 
4371  MemoryContextSwitchTo(oldcontext);
4372  }
4373 }
4374 
4375 /*
4376  * Import a foreign schema
4377  */
4378 static List *
4380 {
4381  List *commands = NIL;
4382  bool import_collate = true;
4383  bool import_default = false;
4384  bool import_not_null = true;
4385  ForeignServer *server;
4386  UserMapping *mapping;
4387  PGconn *conn;
4389  PGresult *volatile res = NULL;
4390  int numrows,
4391  i;
4392  ListCell *lc;
4393 
4394  /* Parse statement options */
4395  foreach(lc, stmt->options)
4396  {
4397  DefElem *def = (DefElem *) lfirst(lc);
4398 
4399  if (strcmp(def->defname, "import_collate") == 0)
4400  import_collate = defGetBoolean(def);
4401  else if (strcmp(def->defname, "import_default") == 0)
4402  import_default = defGetBoolean(def);
4403  else if (strcmp(def->defname, "import_not_null") == 0)
4404  import_not_null = defGetBoolean(def);
4405  else
4406  ereport(ERROR,
4407  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4408  errmsg("invalid option \"%s\"", def->defname)));
4409  }
4410 
4411  /*
4412  * Get connection to the foreign server. Connection manager will
4413  * establish new connection if necessary.
4414  */
4415  server = GetForeignServer(serverOid);
4416  mapping = GetUserMapping(GetUserId(), server->serverid);
4417  conn = GetConnection(mapping, false);
4418 
4419  /* Don't attempt to import collation if remote server hasn't got it */
4420  if (PQserverVersion(conn) < 90100)
4421  import_collate = false;
4422 
4423  /* Create workspace for strings */
4424  initStringInfo(&buf);
4425 
4426  /* In what follows, do not risk leaking any PGresults. */
4427  PG_TRY();
4428  {
4429  /* Check that the schema really exists */
4430  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4431  deparseStringLiteral(&buf, stmt->remote_schema);
4432 
4433  res = pgfdw_exec_query(conn, buf.data);
4434  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4435  pgfdw_report_error(ERROR, res, conn, false, buf.data);
4436 
4437  if (PQntuples(res) != 1)
4438  ereport(ERROR,
4439  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4440  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4441  stmt->remote_schema, server->servername)));
4442 
4443  PQclear(res);
4444  res = NULL;
4445  resetStringInfo(&buf);
4446 
4447  /*
4448  * Fetch all table data from this schema, possibly restricted by
4449  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
4450  * to EXCEPT/LIMIT TO here, because the core code will filter the
4451  * statements we return according to those lists anyway. But it
4452  * should save a few cycles to not process excluded tables in the
4453  * first place.)
4454  *
4455  * Ignore table data for partitions and only include the definitions
4456  * of the root partitioned tables to allow access to the complete
4457  * remote data set locally in the schema imported.
4458  *
4459  * Note: because we run the connection with search_path restricted to
4460  * pg_catalog, the format_type() and pg_get_expr() outputs will always
4461  * include a schema name for types/functions in other schemas, which
4462  * is what we want.
4463  */
4464  if (import_collate)
4466  "SELECT relname, "
4467  " attname, "
4468  " format_type(atttypid, atttypmod), "
4469  " attnotnull, "
4470  " pg_get_expr(adbin, adrelid), "
4471  " collname, "
4472  " collnsp.nspname "
4473  "FROM pg_class c "
4474  " JOIN pg_namespace n ON "
4475  " relnamespace = n.oid "
4476  " LEFT JOIN pg_attribute a ON "
4477  " attrelid = c.oid AND attnum > 0 "
4478  " AND NOT attisdropped "
4479  " LEFT JOIN pg_attrdef ad ON "
4480  " adrelid = c.oid AND adnum = attnum "
4481  " LEFT JOIN pg_collation coll ON "
4482  " coll.oid = attcollation "
4483  " LEFT JOIN pg_namespace collnsp ON "
4484  " collnsp.oid = collnamespace ");
4485  else
4487  "SELECT relname, "
4488  " attname, "
4489  " format_type(atttypid, atttypmod), "
4490  " attnotnull, "
4491  " pg_get_expr(adbin, adrelid), "
4492  " NULL, NULL "
4493  "FROM pg_class c "
4494  " JOIN pg_namespace n ON "
4495  " relnamespace = n.oid "
4496  " LEFT JOIN pg_attribute a ON "
4497  " attrelid = c.oid AND attnum > 0 "
4498  " AND NOT attisdropped "
4499  " LEFT JOIN pg_attrdef ad ON "
4500  " adrelid = c.oid AND adnum = attnum ");
4501 
4503  "WHERE c.relkind IN ("
4504  CppAsString2(RELKIND_RELATION) ","
4505  CppAsString2(RELKIND_VIEW) ","
4506  CppAsString2(RELKIND_FOREIGN_TABLE) ","
4507  CppAsString2(RELKIND_MATVIEW) ","
4508  CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
4509  " AND n.nspname = ");
4510  deparseStringLiteral(&buf, stmt->remote_schema);
4511 
4512  /* Partitions are supported since Postgres 10 */
4513  if (PQserverVersion(conn) >= 100000)
4514  appendStringInfoString(&buf, " AND NOT c.relispartition ");
4515 
4516  /* Apply restrictions for LIMIT TO and EXCEPT */
4517  if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4519  {
4520  bool first_item = true;
4521 
4522  appendStringInfoString(&buf, " AND c.relname ");
4523  if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4524  appendStringInfoString(&buf, "NOT ");
4525  appendStringInfoString(&buf, "IN (");
4526 
4527  /* Append list of table names within IN clause */
4528  foreach(lc, stmt->table_list)
4529  {
4530  RangeVar *rv = (RangeVar *) lfirst(lc);
4531 
4532  if (first_item)
4533  first_item = false;
4534  else
4535  appendStringInfoString(&buf, ", ");
4536  deparseStringLiteral(&buf, rv->relname);
4537  }
4538  appendStringInfoChar(&buf, ')');
4539  }
4540 
4541  /* Append ORDER BY at the end of query to ensure output ordering */
4542  appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4543 
4544  /* Fetch the data */
4545  res = pgfdw_exec_query(conn, buf.data);
4546  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4547  pgfdw_report_error(ERROR, res, conn, false, buf.data);
4548 
4549  /* Process results */
4550  numrows = PQntuples(res);
4551  /* note: incrementation of i happens in inner loop's while() test */
4552  for (i = 0; i < numrows;)
4553  {
4554  char *tablename = PQgetvalue(res, i, 0);
4555  bool first_item = true;
4556 
4557  resetStringInfo(&buf);
4558  appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4559  quote_identifier(tablename));
4560 
4561  /* Scan all rows for this table */
4562  do
4563  {
4564  char *attname;
4565  char *typename;
4566  char *attnotnull;
4567  char *attdefault;
4568  char *collname;
4569  char *collnamespace;
4570 
4571  /* If table has no columns, we'll see nulls here */
4572  if (PQgetisnull(res, i, 1))
4573  continue;
4574 
4575  attname = PQgetvalue(res, i, 1);
4576  typename = PQgetvalue(res, i, 2);
4577  attnotnull = PQgetvalue(res, i, 3);
4578  attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
4579  PQgetvalue(res, i, 4);
4580  collname = PQgetisnull(res, i, 5) ? (char *) NULL :
4581  PQgetvalue(res, i, 5);
4582  collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
4583  PQgetvalue(res, i, 6);
4584 
4585  if (first_item)
4586  first_item = false;
4587  else
4588  appendStringInfoString(&buf, ",\n");
4589 
4590  /* Print column name and type */
4591  appendStringInfo(&buf, " %s %s",
4592  quote_identifier(attname),
4593  typename);
4594 
4595  /*
4596  * Add column_name option so that renaming the foreign table's
4597  * column doesn't break the association to the underlying
4598  * column.
4599  */
4600  appendStringInfoString(&buf, " OPTIONS (column_name ");
4601  deparseStringLiteral(&buf, attname);
4602  appendStringInfoChar(&buf, ')');
4603 
4604  /* Add COLLATE if needed */
4605  if (import_collate && collname != NULL && collnamespace != NULL)
4606  appendStringInfo(&buf, " COLLATE %s.%s",
4607  quote_identifier(collnamespace),
4608  quote_identifier(collname));
4609 
4610  /* Add DEFAULT if needed */
4611  if (import_default && attdefault != NULL)
4612  appendStringInfo(&buf, " DEFAULT %s", attdefault);
4613 
4614  /* Add NOT NULL if needed */
4615  if (import_not_null && attnotnull[0] == 't')
4616  appendStringInfoString(&buf, " NOT NULL");
4617  }
4618  while (++i < numrows &&
4619  strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4620 
4621  /*
4622  * Add server name and table-level options. We specify remote
4623  * schema and table name as options (the latter to ensure that
4624  * renaming the foreign table doesn't break the association).
4625  */
4626  appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4627  quote_identifier(server->servername));
4628 
4629  appendStringInfoString(&buf, "schema_name ");
4630  deparseStringLiteral(&buf, stmt->remote_schema);
4631  appendStringInfoString(&buf, ", table_name ");
4632  deparseStringLiteral(&buf, tablename);
4633 
4634  appendStringInfoString(&buf, ");");
4635 
4636  commands = lappend(commands, pstrdup(buf.data));
4637  }
4638 
4639  /* Clean up */
4640  PQclear(res);
4641  res = NULL;
4642  }
4643  PG_CATCH();
4644  {
4645  if (res)
4646  PQclear(res);
4647  PG_RE_THROW();
4648  }
4649  PG_END_TRY();
4650 
4651  ReleaseConnection(conn);
4652 
4653  return commands;
4654 }
4655 
4656 /*
4657  * Assess whether the join between inner and outer relations can be pushed down
4658  * to the foreign server. As a side effect, save information we obtain in this
4659  * function to PgFdwRelationInfo passed in.
4660  */
4661 static bool
4663  RelOptInfo *outerrel, RelOptInfo *innerrel,
4664  JoinPathExtraData *extra)
4665 {
4666  PgFdwRelationInfo *fpinfo;
4667  PgFdwRelationInfo *fpinfo_o;
4668  PgFdwRelationInfo *fpinfo_i;
4669  ListCell *lc;
4670  List *joinclauses;
4671 
4672  /*
4673  * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4674  * Constructing queries representing SEMI and ANTI joins is hard, hence
4675  * not considered right now.
4676  */
4677  if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4678  jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4679  return false;
4680 
4681  /*
4682  * If either of the joining relations is marked as unsafe to pushdown, the
4683  * join can not be pushed down.
4684  */
4685  fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4686  fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4687  fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4688  if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4689  !fpinfo_i || !fpinfo_i->pushdown_safe)
4690  return false;
4691 
4692  /*
4693  * If joining relations have local conditions, those conditions are
4694  * required to be applied before joining the relations. Hence the join can
4695  * not be pushed down.
4696  */
4697  if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4698  return false;
4699 
4700  /*
4701  * Merge FDW options. We might be tempted to do this after we have deemed
4702  * the foreign join to be OK. But we must do this beforehand so that we
4703  * know which quals can be evaluated on the foreign server, which might
4704  * depend on shippable_extensions.
4705  */
4706  fpinfo->server = fpinfo_o->server;
4707  merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
4708 
4709  /*
4710  * Separate restrict list into join quals and pushed-down (other) quals.
4711  *
4712  * Join quals belonging to an outer join must all be shippable, else we
4713  * cannot execute the join remotely. Add such quals to 'joinclauses'.
4714  *
4715  * Add other quals to fpinfo->remote_conds if they are shippable, else to
4716  * fpinfo->local_conds. In an inner join it's okay to execute conditions
4717  * either locally or remotely; the same is true for pushed-down conditions
4718  * at an outer join.
4719  *
4720  * Note we might return failure after having already scribbled on
4721  * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
4722  * won't consult those lists again if we deem the join unshippable.
4723  */
4724  joinclauses = NIL;
4725  foreach(lc, extra->restrictlist)
4726  {
4727  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4728  bool is_remote_clause = is_foreign_expr(root, joinrel,
4729  rinfo->clause);
4730 
4731  if (IS_OUTER_JOIN(jointype) &&
4732  !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
4733  {
4734  if (!is_remote_clause)
4735  return false;
4736  joinclauses = lappend(joinclauses, rinfo);
4737  }
4738  else
4739  {
4740  if (is_remote_clause)
4741  fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4742  else
4743  fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4744  }
4745  }
4746 
4747  /*
4748  * deparseExplicitTargetList() isn't smart enough to handle anything other
4749  * than a Var. In particular, if there's some PlaceHolderVar that would
4750  * need to be evaluated within this join tree (because there's an upper
4751  * reference to a quantity that may go to NULL as a result of an outer
4752  * join), then we can't try to push the join down because we'll fail when
4753  * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
4754  * needs to be evaluated *at the top* of this join tree is OK, because we
4755  * can do that locally after fetching the results from the remote side.
4756  */
4757  foreach(lc, root->placeholder_list)
4758  {
4759  PlaceHolderInfo *phinfo = lfirst(lc);
4760  Relids relids;
4761 
4762  /* PlaceHolderInfo refers to parent relids, not child relids. */
4763  relids = IS_OTHER_REL(joinrel) ?
4764  joinrel->top_parent_relids : joinrel->relids;
4765 
4766  if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4767  bms_nonempty_difference(relids, phinfo->ph_eval_at))
4768  return false;
4769  }
4770 
4771  /* Save the join clauses, for later use. */
4772  fpinfo->joinclauses = joinclauses;
4773 
4774  fpinfo->outerrel = outerrel;
4775  fpinfo->innerrel = innerrel;
4776  fpinfo->jointype = jointype;
4777 
4778  /*
4779  * By default, both the input relations are not required to be deparsed as
4780  * subqueries, but there might be some relations covered by the input
4781  * relations that are required to be deparsed as subqueries, so save the
4782  * relids of those relations for later use by the deparser.
4783  */
4784  fpinfo->make_outerrel_subquery = false;
4785  fpinfo->make_innerrel_subquery = false;
4786  Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
4787  Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
4789  fpinfo_i->lower_subquery_rels);
4790 
4791  /*
4792  * Pull the other remote conditions from the joining relations into join
4793  * clauses or other remote clauses (remote_conds) of this relation
4794  * wherever possible. This avoids building subqueries at every join step.
4795  *
4796  * For an inner join, clauses from both the relations are added to the
4797  * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4798  * the outer side are added to remote_conds since those can be evaluated
4799  * after the join is evaluated. The clauses from inner side are added to
4800  * the joinclauses, since they need to be evaluated while constructing the
4801  * join.
4802  *
4803  * For a FULL OUTER JOIN, the other clauses from either relation can not
4804  * be added to the joinclauses or remote_conds, since each relation acts
4805  * as an outer relation for the other.
4806  *
4807  * The joining sides can not have local conditions, thus no need to test
4808  * shippability of the clauses being pulled up.
4809  */
4810  switch (jointype)
4811  {
4812  case JOIN_INNER:
4813  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4814  list_copy(fpinfo_i->remote_conds));
4815  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4816  list_copy(fpinfo_o->remote_conds));
4817  break;
4818 
4819  case JOIN_LEFT:
4820  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4821  list_copy(fpinfo_i->remote_conds));
4822  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4823  list_copy(fpinfo_o->remote_conds));
4824  break;
4825 
4826  case JOIN_RIGHT:
4827  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4828  list_copy(fpinfo_o->remote_conds));
4829  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4830  list_copy(fpinfo_i->remote_conds));
4831  break;
4832 
4833  case JOIN_FULL:
4834 
4835  /*
4836  * In this case, if any of the input relations has conditions, we
4837  * need to deparse that relation as a subquery so that the
4838  * conditions can be evaluated before the join. Remember it in
4839  * the fpinfo of this relation so that the deparser can take
4840  * appropriate action. Also, save the relids of base relations
4841  * covered by that relation for later use by the deparser.
4842  */
4843  if (fpinfo_o->remote_conds)
4844  {
4845  fpinfo->make_outerrel_subquery = true;
4846  fpinfo->lower_subquery_rels =
4848  outerrel->relids);
4849  }
4850  if (fpinfo_i->remote_conds)
4851  {
4852  fpinfo->make_innerrel_subquery = true;
4853  fpinfo->lower_subquery_rels =
4855  innerrel->relids);
4856  }
4857  break;
4858 
4859  default:
4860  /* Should not happen, we have just checked this above */
4861  elog(ERROR, "unsupported join type %d", jointype);
4862  }
4863 
4864  /*
4865  * For an inner join, all restrictions can be treated alike. Treating the
4866  * pushed down conditions as join conditions allows a top level full outer
4867  * join to be deparsed without requiring subqueries.
4868  */
4869  if (jointype == JOIN_INNER)
4870  {
4871  Assert(!fpinfo->joinclauses);
4872  fpinfo->joinclauses = fpinfo->remote_conds;
4873  fpinfo->remote_conds = NIL;
4874  }
4875 
4876  /* Mark that this join can be pushed down safely */
4877  fpinfo->pushdown_safe = true;
4878 
4879  /* Get user mapping */
4880  if (fpinfo->use_remote_estimate)
4881  {
4882  if (fpinfo_o->use_remote_estimate)
4883  fpinfo->user = fpinfo_o->user;
4884  else
4885  fpinfo->user = fpinfo_i->user;
4886  }
4887  else
4888  fpinfo->user = NULL;
4889 
4890  /*
4891  * Set cached relation costs to some negative value, so that we can detect
4892  * when they are set to some sensible costs, during one (usually the
4893  * first) of the calls to estimate_path_cost_size().
4894  */
4895  fpinfo->rel_startup_cost = -1;
4896  fpinfo->rel_total_cost = -1;
4897 
4898  /*
4899  * Set the string describing this join relation to be used in EXPLAIN
4900  * output of corresponding ForeignScan.
4901  */
4902  fpinfo->relation_name = makeStringInfo();
4903  appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4904  fpinfo_o->relation_name->data,
4905  get_jointype_name(fpinfo->jointype),
4906  fpinfo_i->relation_name->data);
4907 
4908  /*
4909  * Set the relation index. This is defined as the position of this
4910  * joinrel in the join_rel_list list plus the length of the rtable list.
4911  * Note that since this joinrel is at the end of the join_rel_list list
4912  * when we are called, we can get the position by list_length.
4913  */
4914  Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
4915  fpinfo->relation_index =
4917 
4918  return true;
4919 }
4920 
4921 static void
4923  Path *epq_path)
4924 {
4925  List *useful_pathkeys_list = NIL; /* List of all pathkeys */
4926  ListCell *lc;
4927 
4928  useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4929 
4930  /* Create one path for each set of pathkeys we found above. */
4931  foreach(lc, useful_pathkeys_list)
4932  {
4933  double rows;
4934  int width;
4935  Cost startup_cost;
4936  Cost total_cost;
4937  List *useful_pathkeys = lfirst(lc);
4938  Path *sorted_epq_path;
4939 
4940  estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4941  &rows, &width, &startup_cost, &total_cost);
4942 
4943  /*
4944  * The EPQ path must be at least as well sorted as the path itself, in
4945  * case it gets used as input to a mergejoin.
4946  */
4947  sorted_epq_path = epq_path;
4948  if (sorted_epq_path != NULL &&
4949  !pathkeys_contained_in(useful_pathkeys,
4950  sorted_epq_path->pathkeys))
4951  sorted_epq_path = (Path *)
4952  create_sort_path(root,
4953  rel,
4954  sorted_epq_path,
4955  useful_pathkeys,
4956  -1.0);
4957 
4958  add_path(rel, (Path *)
4959  create_foreignscan_path(root, rel,
4960  NULL,
4961  rows,
4962  startup_cost,
4963  total_cost,
4964  useful_pathkeys,
4965  NULL,
4966  sorted_epq_path,
4967  NIL));
4968  }
4969 }
4970 
4971 /*
4972  * Parse options from foreign server and apply them to fpinfo.
4973  *
4974  * New options might also require tweaking merge_fdw_options().
4975  */
4976 static void
4978 {
4979  ListCell *lc;
4980 
4981  foreach(lc, fpinfo->server->options)
4982  {
4983  DefElem *def = (DefElem *) lfirst(lc);
4984 
4985  if (strcmp(def->defname, "use_remote_estimate") == 0)
4986  fpinfo->use_remote_estimate = defGetBoolean(def);
4987  else if (strcmp(def->defname, "fdw_startup_cost") == 0)
4988  fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
4989  else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
4990  fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
4991  else if (strcmp(def->defname, "extensions") == 0)
4992  fpinfo->shippable_extensions =
4993  ExtractExtensionList(defGetString(def), false);
4994  else if (strcmp(def->defname, "fetch_size") == 0)
4995  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4996  }
4997 }
4998 
4999 /*
5000  * Parse options from foreign table and apply them to fpinfo.
5001  *
5002  * New options might also require tweaking merge_fdw_options().
5003  */
5004 static void
5006 {
5007  ListCell *lc;
5008 
5009  foreach(lc, fpinfo->table->options)
5010  {
5011  DefElem *def = (DefElem *) lfirst(lc);
5012 
5013  if (strcmp(def->defname, "use_remote_estimate") == 0)
5014  fpinfo->use_remote_estimate = defGetBoolean(def);
5015  else if (strcmp(def->defname, "fetch_size") == 0)
5016  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5017  }
5018 }
5019 
5020 /*
5021  * Merge FDW options from input relations into a new set of options for a join
5022  * or an upper rel.
5023  *
5024  * For a join relation, FDW-specific information about the inner and outer
5025  * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
5026  * fpinfo_o provides the information for the input relation; fpinfo_i is
5027  * expected to NULL.
5028  */
5029 static void
5031  const PgFdwRelationInfo *fpinfo_o,
5032  const PgFdwRelationInfo *fpinfo_i)
5033 {
5034  /* We must always have fpinfo_o. */
5035  Assert(fpinfo_o);
5036 
5037  /* fpinfo_i may be NULL, but if present the servers must both match. */
5038  Assert(!fpinfo_i ||
5039  fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5040 
5041  /*
5042  * Copy the server specific FDW options. (For a join, both relations come
5043  * from the same server, so the server options should have the same value
5044  * for both relations.)
5045  */
5046  fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5047  fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5048  fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5049  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5050  fpinfo->fetch_size = fpinfo_o->fetch_size;
5051 
5052  /* Merge the table level options from either side of the join. */
5053  if (fpinfo_i)
5054  {
5055  /*
5056  * We'll prefer to use remote estimates for this join if any table
5057  * from either side of the join is using remote estimates. This is
5058  * most likely going to be preferred since they're already willing to
5059  * pay the price of a round trip to get the remote EXPLAIN. In any
5060  * case it's not entirely clear how we might otherwise handle this
5061  * best.
5062  */
5063  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5064  fpinfo_i->use_remote_estimate;
5065 
5066  /*
5067  * Set fetch size to maximum of the joining sides, since we are
5068  * expecting the rows returned by the join to be proportional to the
5069  * relation sizes.
5070  */
5071  fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5072  }
5073 }
5074 
5075 /*
5076  * postgresGetForeignJoinPaths
5077  * Add possible ForeignPath to joinrel, if join is safe to push down.
5078  */
5079 static void
5081  RelOptInfo *joinrel,
5082  RelOptInfo *outerrel,
5083  RelOptInfo *innerrel,
5084  JoinType jointype,
5085  JoinPathExtraData *extra)
5086 {
5087  PgFdwRelationInfo *fpinfo;
5088  ForeignPath *joinpath;
5089  double rows;
5090  int width;
5091  Cost startup_cost;
5092  Cost total_cost;
5093  Path *epq_path; /* Path to create plan to be executed when
5094  * EvalPlanQual gets triggered. */
5095 
5096  /*
5097  * Skip if this join combination has been considered already.
5098  */
5099  if (joinrel->fdw_private)
5100  return;
5101 
5102  /*
5103  * Create unfinished PgFdwRelationInfo entry which is used to indicate
5104  * that the join relation is already considered, so that we won't waste
5105  * time in judging safety of join pushdown and adding the same paths again
5106  * if found safe. Once we know that this join can be pushed down, we fill
5107  * the entry.
5108  */
5109  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5110  fpinfo->pushdown_safe = false;
5111  joinrel->fdw_private = fpinfo;
5112  /* attrs_used is only for base relations. */
5113  fpinfo->attrs_used = NULL;
5114 
5115  /*
5116  * If there is a possibility that EvalPlanQual will be executed, we need
5117  * to be able to reconstruct the row using scans of the base relations.
5118  * GetExistingLocalJoinPath will find a suitable path for this purpose in
5119  * the path list of the joinrel, if one exists. We must be careful to
5120  * call it before adding any ForeignPath, since the ForeignPath might
5121  * dominate the only suitable local path available. We also do it before
5122  * calling foreign_join_ok(), since that function updates fpinfo and marks
5123  * it as pushable if the join is found to be pushable.
5124  */
5125  if (root->parse->commandType == CMD_DELETE ||
5126  root->parse->commandType == CMD_UPDATE ||
5127  root->rowMarks)
5128  {
5129  epq_path = GetExistingLocalJoinPath(joinrel);
5130  if (!epq_path)
5131  {
5132  elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
5133  return;
5134  }
5135  }
5136  else
5137  epq_path = NULL;
5138 
5139  if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
5140  {
5141  /* Free path required for EPQ if we copied one; we don't need it now */
5142  if (epq_path)
5143  pfree(epq_path);
5144  return;
5145  }
5146 
5147  /*
5148  * Compute the selectivity and cost of the local_conds, so we don't have
5149  * to do it over again for each path. The best we can do for these
5150  * conditions is to estimate selectivity on the basis of local statistics.
5151  * The local conditions are applied after the join has been computed on
5152  * the remote side like quals in WHERE clause, so pass jointype as
5153  * JOIN_INNER.
5154  */
5155  fpinfo->local_conds_sel = clauselist_selectivity(root,
5156  fpinfo->local_conds,
5157  0,
5158  JOIN_INNER,
5159  NULL);
5160  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5161 
5162  /*
5163  * If we are going to estimate costs locally, estimate the join clause
5164  * selectivity here while we have special join info.
5165  */
5166  if (!fpinfo->use_remote_estimate)
5167  fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
5168  0, fpinfo->jointype,
5169  extra->sjinfo);
5170 
5171  /* Estimate costs for bare join relation */
5172  estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
5173  &width, &startup_cost, &total_cost);
5174  /* Now update this information in the joinrel */
5175  joinrel->rows = rows;
5176  joinrel->reltarget->width = width;
5177  fpinfo->rows = rows;
5178  fpinfo->width = width;
5179  fpinfo->startup_cost = startup_cost;
5180  fpinfo->total_cost = total_cost;
5181 
5182  /*
5183  * Create a new join path and add it to the joinrel which represents a
5184  * join between foreign tables.
5185  */
5186  joinpath = create_foreignscan_path(root,
5187  joinrel,
5188  NULL, /* default pathtarget */
5189  rows,
5190  startup_cost,
5191  total_cost,
5192  NIL, /* no pathkeys */
5193  NULL, /* no required_outer */
5194  epq_path,
5195  NIL); /* no fdw_private */
5196 
5197  /* Add generated path into joinrel by add_path(). */
5198  add_path(joinrel, (Path *) joinpath);
5199 
5200  /* Consider pathkeys for the join relation */
5201  add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5202 
5203  /* XXX Consider parameterized paths for the join relation */
5204 }
5205 
5206 /*
5207  * Assess whether the aggregation, grouping and having operations can be pushed
5208  * down to the foreign server. As a side effect, save information we obtain in
5209  * this function to PgFdwRelationInfo of the input relation.
5210  */
5211 static bool
5213  Node *havingQual)
5214 {
5215  Query *query = root->parse;
5216  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5217  PathTarget *grouping_target = grouped_rel->reltarget;
5218  PgFdwRelationInfo *ofpinfo;
5219  List *aggvars;
5220  ListCell *lc;
5221  int i;
5222  List *tlist = NIL;
5223 
5224  /* We currently don't support pushing Grouping Sets. */
5225  if (query->groupingSets)
5226  return false;
5227 
5228  /* Get the fpinfo of the underlying scan relation. */
5229  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5230 
5231  /*
5232  * If underlying scan relation has any local conditions, those conditions
5233  * are required to be applied before performing aggregation. Hence the
5234  * aggregate cannot be pushed down.
5235  */
5236  if (ofpinfo->local_conds)
5237  return false;
5238 
5239  /*
5240  * Examine grouping expressions, as well as other expressions we'd need to
5241  * compute, and check whether they are safe to push down to the foreign
5242  * server. All GROUP BY expressions will be part of the grouping target
5243  * and thus there is no need to search for them separately. Add grouping
5244  * expressions into target list which will be passed to foreign server.
5245  */
5246  i = 0;
5247  foreach(lc, grouping_target->exprs)
5248  {
5249  Expr *expr = (Expr *) lfirst(lc);
5250  Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
5251  ListCell *l;
5252 
5253  /* Check whether this expression is part of GROUP BY clause */
5254  if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5255  {
5256  TargetEntry *tle;
5257 
5258  /*
5259  * If any GROUP BY expression is not shippable, then we cannot
5260  * push down aggregation to the foreign server.
5261  */
5262  if (!is_foreign_expr(root, grouped_rel, expr))
5263  return false;
5264 
5265  /*
5266  * Pushable, so add to tlist. We need to create a TLE for this
5267  * expression and apply the sortgroupref to it. We cannot use
5268  * add_to_flat_tlist() here because that avoids making duplicate
5269  * entries in the tlist. If there are duplicate entries with
5270  * distinct sortgrouprefs, we have to duplicate that situation in
5271  * the output tlist.
5272  */
5273  tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
5274  tle->ressortgroupref = sgref;
5275  tlist = lappend(tlist, tle);
5276  }
5277  else
5278  {
5279  /*
5280  * Non-grouping expression we need to compute. Is it shippable?
5281  */
5282  if (is_foreign_expr(root, grouped_rel, expr))
5283  {
5284  /* Yes, so add to tlist as-is; OK to suppress duplicates */
5285  tlist = add_to_flat_tlist(tlist, list_make1(expr));
5286  }
5287  else
5288  {
5289  /* Not pushable as a whole; extract its Vars and aggregates */
5290  aggvars = pull_var_clause((Node *) expr,
5292 
5293  /*
5294  * If any aggregate expression is not shippable, then we
5295  * cannot push down aggregation to the foreign server.
5296  */
5297  if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
5298  return false;
5299 
5300  /*
5301  * Add aggregates, if any, into the targetlist. Plain Vars
5302  * outside an aggregate can be ignored, because they should be
5303  * either same as some GROUP BY column or part of some GROUP
5304  * BY expression. In either case, they are already part of
5305  * the targetlist and thus no need to add them again. In fact
5306  * including plain Vars in the tlist when they do not match a
5307  * GROUP BY column would cause the foreign server to complain
5308  * that the shipped query is invalid.
5309  */
5310  foreach(l, aggvars)
5311  {
5312  Expr *expr = (Expr *) lfirst(l);
5313 
5314  if (IsA(expr, Aggref))
5315  tlist = add_to_flat_tlist(tlist, list_make1(expr));
5316  }
5317  }
5318  }
5319 
5320  i++;
5321  }
5322 
5323  /*
5324  * Classify the pushable and non-pushable HAVING clauses and save them in
5325  * remote_conds and local_conds of the grouped rel's fpinfo.
5326  */
5327  if (havingQual)
5328  {
5329  ListCell *lc;
5330 
5331  foreach(lc, (List *) havingQual)
5332  {
5333  Expr *expr = (Expr *) lfirst(lc);
5334  RestrictInfo *rinfo;
5335 
5336  /*
5337  * Currently, the core code doesn't wrap havingQuals in
5338  * RestrictInfos, so we must make our own.
5339  */
5340  Assert(!IsA(expr, RestrictInfo));
5341  rinfo = make_restrictinfo(expr,
5342  true,
5343  false,
5344  false,
5345  root->qual_security_level,
5346  grouped_rel->relids,
5347  NULL,
5348  NULL);
5349  if (is_foreign_expr(root, grouped_rel, expr))
5350  fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5351  else
5352  fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5353  }
5354  }
5355 
5356  /*
5357  * If there are any local conditions, pull Vars and aggregates from it and
5358  * check whether they are safe to pushdown or not.
5359  */
5360  if (fpinfo->local_conds)
5361  {
5362  List *aggvars = NIL;
5363  ListCell *lc;
5364 
5365  foreach(lc, fpinfo->local_conds)
5366  {
5367  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5368 
5369  aggvars = list_concat(aggvars,
5370  pull_var_clause((Node *) rinfo->clause,
5372  }
5373 
5374  foreach(lc, aggvars)
5375  {
5376  Expr *expr = (Expr *) lfirst(lc);
5377 
5378  /*
5379  * If aggregates within local conditions are not safe to push
5380  * down, then we cannot push down the query. Vars are already
5381  * part of GROUP BY clause which are checked above, so no need to
5382  * access them again here.
5383  */
5384  if (IsA(expr, Aggref))
5385  {
5386  if (!is_foreign_expr(root, grouped_rel, expr))
5387  return false;
5388 
5389  tlist = add_to_flat_tlist(tlist, list_make1(expr));
5390  }
5391  }
5392  }
5393 
5394  /* Store generated targetlist */
5395  fpinfo->grouped_tlist = tlist;
5396 
5397  /* Safe to pushdown */
5398  fpinfo->pushdown_safe = true;
5399 
5400  /*
5401  * Set cached relation costs to some negative value, so that we can detect
5402  * when they are set to some sensible costs, during one (usually the
5403  * first) of the calls to estimate_path_cost_size().
5404  */
5405  fpinfo->rel_startup_cost = -1;
5406  fpinfo->rel_total_cost = -1;
5407 
5408  /*
5409  * Set the string describing this grouped relation to be used in EXPLAIN
5410  * output of corresponding ForeignScan.
5411  */
5412  fpinfo->relation_name = makeStringInfo();
5413  appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
5414  ofpinfo->relation_name->data);
5415 
5416  return true;
5417 }
5418 
5419 /*
5420  * postgresGetForeignUpperPaths
5421  * Add paths for post-join operations like aggregation, grouping etc. if
5422  * corresponding operations are safe to push down.
5423  *
5424  * Right now, we only support aggregate, grouping and having clause pushdown.
5425  */
5426 static void
5428  RelOptInfo *input_rel, RelOptInfo *output_rel,
5429  void *extra)
5430 {
5431  PgFdwRelationInfo *fpinfo;
5432 
5433  /*
5434  * If input rel is not safe to pushdown, then simply return as we cannot
5435  * perform any post-join operations on the foreign server.
5436  */
5437  if (!input_rel->fdw_private ||
5438  !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
5439  return;
5440 
5441  /* Ignore stages we don't support; and skip any duplicate calls. */
5442  if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
5443  return;
5444 
5445  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5446  fpinfo->pushdown_safe = false;
5447  output_rel->fdw_private = fpinfo;
5448 
5449  add_foreign_grouping_paths(root, input_rel, output_rel,
5450  (GroupPathExtraData *) extra);
5451 }
5452 
5453 /*
5454  * add_foreign_grouping_paths
5455  * Add foreign path for grouping and/or aggregation.
5456  *
5457  * Given input_rel represents the underlying scan. The paths are added to the
5458  * given grouped_rel.
5459  */
5460 static void
5462  RelOptInfo *grouped_rel,
5463  GroupPathExtraData *extra)
5464 {
5465  Query *parse = root->parse;
5466  PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5467  PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
5468  ForeignPath *grouppath;
5469  double rows;
5470  int width;
5471  Cost startup_cost;
5472  Cost total_cost;
5473 
5474  /* Nothing to be done, if there is no grouping or aggregation required. */
5475  if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
5476  !root->hasHavingQual)
5477  return;
5478 
5481 
5482  /* save the input_rel as outerrel in fpinfo */
5483  fpinfo->outerrel = input_rel;
5484 
5485  /*
5486  * Copy foreign table, foreign server, user mapping, FDW options etc.
5487  * details from the input relation's fpinfo.
5488  */
5489  fpinfo->table = ifpinfo->table;
5490  fpinfo->server = ifpinfo->server;
5491  fpinfo->user = ifpinfo->user;
5492  merge_fdw_options(fpinfo, ifpinfo, NULL);
5493 
5494  /*
5495  * Assess if it is safe to push down aggregation and grouping.
5496  *
5497  * Use HAVING qual from extra. In case of child partition, it will have
5498  * translated Vars.
5499  */
5500  if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
5501  return;
5502 
5503  /* Estimate the cost of push down */
5504  estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
5505  &width, &startup_cost, &total_cost);
5506 
5507  /* Now update this information in the fpinfo */
5508  fpinfo->rows = rows;
5509  fpinfo->width = width;
5510  fpinfo->startup_cost = startup_cost;
5511  fpinfo->total_cost = total_cost;
5512 
5513  /* Create and add foreign path to the grouping relation. */
5514  grouppath = create_foreignscan_path(root,
5515  grouped_rel,
5516  grouped_rel->reltarget,
5517  rows,
5518  startup_cost,
5519  total_cost,
5520  NIL, /* no pathkeys */
5521  NULL, /* no required_outer */
5522  NULL,
5523  NIL); /* no fdw_private */
5524 
5525  /* Add generated path into grouped_rel by add_path(). */
5526  add_path(grouped_rel, (Path *) grouppath);
5527 }
5528 
5529 /*
5530  * Create a tuple from the specified row of the PGresult.
5531  *
5532  * rel is the local representation of the foreign table, attinmeta is
5533  * conversion data for the rel's tupdesc, and retrieved_attrs is an
5534  * integer list of the table column numbers present in the PGresult.
5535  * temp_context is a working context that can be reset after each tuple.
5536  */
5537 static HeapTuple
5539  int row,
5540  Relation rel,
5543  ForeignScanState *fsstate,
5544  MemoryContext temp_context)
5545 {
5546  HeapTuple tuple;
5548  Datum *values;
5549  bool *nulls;
5550  ItemPointer ctid = NULL;
5551  Oid oid = InvalidOid;
5552  ConversionLocation errpos;
5553  ErrorContextCallback errcallback;
5554  MemoryContext oldcontext;
5555  ListCell *lc;
5556  int j;
5557 
5558  Assert(row < PQntuples(res));
5559 
5560  /*
5561  * Do the following work in a temp context that we reset after each tuple.
5562  * This cleans up not only the data we have direct access to, but any
5563  * cruft the I/O functions might leak.
5564  */
5565  oldcontext = MemoryContextSwitchTo(temp_context);
5566 
5567  if (rel)
5568  tupdesc = RelationGetDescr(rel);
5569  else
5570  {
5571  Assert(fsstate);
5572  tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
5573  }
5574 
5575  values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
5576  nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
5577  /* Initialize to nulls for any columns not present in result */
5578  memset(nulls, true, tupdesc->natts * sizeof(bool));
5579 
5580  /*
5581  * Set up and install callback to report where conversion error occurs.
5582  */
5583  errpos.rel = rel;
5584  errpos.cur_attno = 0;
5585  errpos.fsstate = fsstate;
5586  errcallback.callback = conversion_error_callback;
5587  errcallback.arg = (void *) &errpos;
5588  errcallback.previous = error_context_stack;
5589  error_context_stack = &errcallback;
5590 
5591  /*
5592  * i indexes columns in the relation, j indexes columns in the PGresult.
5593  */
5594  j = 0;
5595  foreach(lc, retrieved_attrs)
5596  {
5597  int i = lfirst_int(lc);
5598  char *valstr;
5599 
5600  /* fetch next column's textual value */
5601  if (PQgetisnull(res, row, j))
5602  valstr = NULL;
5603  else
5604  valstr = PQgetvalue(res, row, j);
5605 
5606  /*
5607  * convert value to internal representation
5608  *
5609  * Note: we ignore system columns other than ctid and oid in result
5610  */
5611  errpos.cur_attno = i;
5612  if (i > 0)
5613  {
5614  /* ordinary column */
5615  Assert(i <= tupdesc->natts);
5616  nulls[i - 1] = (valstr == NULL);
5617  /* Apply the input function even to nulls, to support domains */
5618  values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
5619  valstr,
5620  attinmeta->attioparams[i - 1],
5621  attinmeta->atttypmods[i - 1]);
5622  }
5623  else if (i == SelfItemPointerAttributeNumber)
5624  {
5625  /* ctid */
5626  if (valstr != NULL)
5627  {
5628  Datum datum;
5629 
5630  datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
5631  ctid = (ItemPointer) DatumGetPointer(datum);
5632  }
5633  }
5634  else if (i == ObjectIdAttributeNumber)
5635  {
5636  /* oid */
5637  if (valstr != NULL)
5638  {
5639  Datum datum;
5640 
5641  datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
5642  oid = DatumGetObjectId(datum);
5643  }
5644  }
5645  errpos.cur_attno = 0;
5646 
5647  j++;
5648  }
5649 
5650  /* Uninstall error context callback. */
5651  error_context_stack = errcallback.previous;
5652 
5653  /*
5654  * Check we got the expected number of columns. Note: j == 0 and
5655  * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
5656  */
5657  if (j > 0 && j != PQnfields(res))
5658  elog(ERROR, "remote query result does not match the foreign table");
5659 
5660  /*
5661  * Build the result tuple in caller's memory context.
5662  */
5663  MemoryContextSwitchTo(oldcontext);
5664 
5665  tuple = heap_form_tuple(tupdesc, values, nulls);
5666 
5667  /*
5668  * If we have a CTID to return, install it in both t_self and t_ctid.
5669  * t_self is the normal place, but if the tuple is converted to a
5670  * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
5671  * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
5672  */
5673  if (ctid)
5674  tuple->t_self = tuple->t_data->t_ctid = *ctid;
5675 
5676  /*
5677  * Stomp on the xmin, xmax, and cmin fields from the tuple created by
5678  * heap_form_tuple. heap_form_tuple actually creates the tuple with
5679  * DatumTupleFields, not HeapTupleFields, but the executor expects
5680  * HeapTupleFields and will happily extract system columns on that
5681  * assumption. If we don't do this then, for example, the tuple length
5682  * ends up in the xmin field, which isn't what we want.
5683  */