PostgreSQL Source Code  git master
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "commands/defrem.h"
21 #include "commands/explain.h"
22 #include "commands/vacuum.h"
23 #include "foreign/fdwapi.h"
24 #include "funcapi.h"
25 #include "miscadmin.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/cost.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/pathnode.h"
31 #include "optimizer/paths.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "optimizer/tlist.h"
36 #include "parser/parsetree.h"
37 #include "utils/builtins.h"
38 #include "utils/guc.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/rel.h"
42 #include "utils/sampling.h"
43 #include "utils/selfuncs.h"
44 
46 
47 /* Default CPU cost to start up a foreign query. */
48 #define DEFAULT_FDW_STARTUP_COST 100.0
49 
50 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
51 #define DEFAULT_FDW_TUPLE_COST 0.01
52 
53 /* If no remote estimates, assume a sort costs 20% extra */
54 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
55 
56 /*
57  * Indexes of FDW-private information stored in fdw_private lists.
58  *
59  * These items are indexed with the enum FdwScanPrivateIndex, so an item
60  * can be fetched with list_nth(). For example, to get the SELECT statement:
61  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
62  */
64 {
65  /* SQL statement to execute remotely (as a String node) */
67  /* Integer list of attribute numbers retrieved by the SELECT */
69  /* Integer representing the desired fetch_size */
71 
72  /*
73  * String describing join i.e. names of relations being joined and types
74  * of join, added when the scan is join
75  */
77 };
78 
79 /*
80  * Similarly, this enum describes what's kept in the fdw_private list for
81  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
82  *
83  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
84  * 2) Integer list of target attribute numbers for INSERT/UPDATE
85  * (NIL for a DELETE)
86  * 3) Boolean flag showing if the remote query has a RETURNING clause
87  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
88  */
90 {
91  /* SQL statement to execute remotely (as a String node) */
93  /* Integer list of target attribute numbers for INSERT/UPDATE */
95  /* has-returning flag (as an integer Value node) */
97  /* Integer list of attribute numbers retrieved by RETURNING */
99 };
100 
101 /*
102  * Similarly, this enum describes what's kept in the fdw_private list for
103  * a ForeignScan node that modifies a foreign table directly. We store:
104  *
105  * 1) UPDATE/DELETE statement text to be sent to the remote server
106  * 2) Boolean flag showing if the remote query has a RETURNING clause
107  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
108  * 4) Boolean flag showing if we set the command es_processed
109  */
111 {
112  /* SQL statement to execute remotely (as a String node) */
114  /* has-returning flag (as an integer Value node) */
116  /* Integer list of attribute numbers retrieved by RETURNING */
118  /* set-processed flag (as an integer Value node) */
120 };
121 
122 /*
123  * Execution state of a foreign scan using postgres_fdw.
124  */
125 typedef struct PgFdwScanState
126 {
127  Relation rel; /* relcache entry for the foreign table. NULL
128  * for a foreign join scan. */
129  TupleDesc tupdesc; /* tuple descriptor of scan */
130  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
131 
132  /* extracted fdw_private data */
133  char *query; /* text of SELECT command */
134  List *retrieved_attrs; /* list of retrieved attribute numbers */
135 
136  /* for remote query execution */
137  PGconn *conn; /* connection for the scan */
138  unsigned int cursor_number; /* quasi-unique ID for my cursor */
139  bool cursor_exists; /* have we created the cursor? */
140  int numParams; /* number of parameters passed to query */
141  FmgrInfo *param_flinfo; /* output conversion functions for them */
142  List *param_exprs; /* executable expressions for param values */
143  const char **param_values; /* textual values of query parameters */
144 
145  /* for storing result tuples */
146  HeapTuple *tuples; /* array of currently-retrieved tuples */
147  int num_tuples; /* # of tuples in array */
148  int next_tuple; /* index of next one to return */
149 
150  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
151  int fetch_ct_2; /* Min(# of fetches done, 2) */
152  bool eof_reached; /* true if last fetch reached EOF */
153 
154  /* working memory contexts */
155  MemoryContext batch_cxt; /* context holding current batch of tuples */
156  MemoryContext temp_cxt; /* context for per-tuple temporary data */
157 
158  int fetch_size; /* number of tuples per fetch */
160 
161 /*
162  * Execution state of a foreign insert/update/delete operation.
163  */
164 typedef struct PgFdwModifyState
165 {
166  Relation rel; /* relcache entry for the foreign table */
167  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
168 
169  /* for remote query execution */
170  PGconn *conn; /* connection for the scan */
171  char *p_name; /* name of prepared statement, if created */
172 
173  /* extracted fdw_private data */
174  char *query; /* text of INSERT/UPDATE/DELETE command */
175  List *target_attrs; /* list of target attribute numbers */
176  bool has_returning; /* is there a RETURNING clause? */
177  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
178 
179  /* info about parameters for prepared statement */
180  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
181  int p_nums; /* number of parameters to transmit */
182  FmgrInfo *p_flinfo; /* output conversion functions for them */
183 
184  /* working memory context */
185  MemoryContext temp_cxt; /* context for per-tuple temporary data */
187 
188 /*
189  * Execution state of a foreign scan that modifies a foreign table directly.
190  */
192 {
193  Relation rel; /* relcache entry for the foreign table */
194  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
195 
196  /* extracted fdw_private data */
197  char *query; /* text of UPDATE/DELETE command */
198  bool has_returning; /* is there a RETURNING clause? */
199  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
200  bool set_processed; /* do we set the command es_processed? */
201 
202  /* for remote query execution */
203  PGconn *conn; /* connection for the update */
204  int numParams; /* number of parameters passed to query */
205  FmgrInfo *param_flinfo; /* output conversion functions for them */
206  List *param_exprs; /* executable expressions for param values */
207  const char **param_values; /* textual values of query parameters */
208 
209  /* for storing result tuples */
210  PGresult *result; /* result for query */
211  int num_tuples; /* # of result tuples */
212  int next_tuple; /* index of next one to return */
213  Relation resultRel; /* relcache entry for the target relation */
214  AttrNumber *attnoMap; /* array of attnums of input user columns */
215  AttrNumber ctidAttno; /* attnum of input ctid column */
216  AttrNumber oidAttno; /* attnum of input oid column */
217  bool hasSystemCols; /* are there system columns of resultRel? */
218 
219  /* working memory context */
220  MemoryContext temp_cxt; /* context for per-tuple temporary data */
222 
223 /*
224  * Workspace for analyzing a foreign table.
225  */
226 typedef struct PgFdwAnalyzeState
227 {
228  Relation rel; /* relcache entry for the foreign table */
229  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
230  List *retrieved_attrs; /* attr numbers retrieved by query */
231 
232  /* collected sample rows */
233  HeapTuple *rows; /* array of size targrows */
234  int targrows; /* target # of sample rows */
235  int numrows; /* # of sample rows collected */
236 
237  /* for random sampling */
238  double samplerows; /* # of rows fetched */
239  double rowstoskip; /* # of rows to skip before next sample */
240  ReservoirStateData rstate; /* state for reservoir sampling */
241 
242  /* working memory contexts */
243  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
244  MemoryContext temp_cxt; /* context for per-tuple temporary data */
246 
247 /*
248  * Identify the attribute where data conversion fails.
249  */
250 typedef struct ConversionLocation
251 {
252  Relation rel; /* foreign table's relcache entry. */
253  AttrNumber cur_attno; /* attribute number being processed, or 0 */
254 
255  /*
256  * In case of foreign join push down, fdw_scan_tlist is used to identify
257  * the Var node corresponding to the error location and
258  * fsstate->ss.ps.state gives access to the RTEs of corresponding relation
259  * to get the relation name and attribute name.
260  */
263 
264 /* Callback argument for ec_member_matches_foreign */
265 typedef struct
266 {
267  Expr *current; /* current expr, or NULL if not yet found */
268  List *already_used; /* expressions already dealt with */
270 
271 /*
272  * SQL functions
273  */
275 
276 /*
277  * FDW callback routines
278  */
279 static void postgresGetForeignRelSize(PlannerInfo *root,
280  RelOptInfo *baserel,
281  Oid foreigntableid);
282 static void postgresGetForeignPaths(PlannerInfo *root,
283  RelOptInfo *baserel,
284  Oid foreigntableid);
286  RelOptInfo *foreignrel,
287  Oid foreigntableid,
288  ForeignPath *best_path,
289  List *tlist,
290  List *scan_clauses,
291  Plan *outer_plan);
292 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
295 static void postgresEndForeignScan(ForeignScanState *node);
296 static void postgresAddForeignUpdateTargets(Query *parsetree,
297  RangeTblEntry *target_rte,
298  Relation target_relation);
300  ModifyTable *plan,
301  Index resultRelation,
302  int subplan_index);
303 static void postgresBeginForeignModify(ModifyTableState *mtstate,
304  ResultRelInfo *resultRelInfo,
305  List *fdw_private,
306  int subplan_index,
307  int eflags);
309  ResultRelInfo *resultRelInfo,
310  TupleTableSlot *slot,
311  TupleTableSlot *planSlot);
313  ResultRelInfo *resultRelInfo,
314  TupleTableSlot *slot,
315  TupleTableSlot *planSlot);
317  ResultRelInfo *resultRelInfo,
318  TupleTableSlot *slot,
319  TupleTableSlot *planSlot);
320 static void postgresEndForeignModify(EState *estate,
321  ResultRelInfo *resultRelInfo);
322 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
323  ResultRelInfo *resultRelInfo);
324 static void postgresEndForeignInsert(EState *estate,
325  ResultRelInfo *resultRelInfo);
327 static bool postgresPlanDirectModify(PlannerInfo *root,
328  ModifyTable *plan,
329  Index resultRelation,
330  int subplan_index);
331 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
333 static void postgresEndDirectModify(ForeignScanState *node);
335  ExplainState *es);
337  ResultRelInfo *rinfo,
338  List *fdw_private,
339  int subplan_index,
340  ExplainState *es);
342  ExplainState *es);
343 static bool postgresAnalyzeForeignTable(Relation relation,
344  AcquireSampleRowsFunc *func,
345  BlockNumber *totalpages);
347  Oid serverOid);
348 static void postgresGetForeignJoinPaths(PlannerInfo *root,
349  RelOptInfo *joinrel,
350  RelOptInfo *outerrel,
351  RelOptInfo *innerrel,
352  JoinType jointype,
353  JoinPathExtraData *extra);
355  TupleTableSlot *slot);
356 static void postgresGetForeignUpperPaths(PlannerInfo *root,
357  UpperRelationKind stage,
358  RelOptInfo *input_rel,
359  RelOptInfo *output_rel,
360  void *extra);
361 
362 /*
363  * Helper functions
364  */
365 static void estimate_path_cost_size(PlannerInfo *root,
366  RelOptInfo *foreignrel,
367  List *param_join_conds,
368  List *pathkeys,
369  double *p_rows, int *p_width,
370  Cost *p_startup_cost, Cost *p_total_cost);
371 static void get_remote_estimate(const char *sql,
372  PGconn *conn,
373  double *rows,
374  int *width,
375  Cost *startup_cost,
376  Cost *total_cost);
379  void *arg);
380 static void create_cursor(ForeignScanState *node);
381 static void fetch_more_data(ForeignScanState *node);
382 static void close_cursor(PGconn *conn, unsigned int cursor_number);
384  ResultRelInfo *resultRelInfo,
385  CmdType operation,
386  Plan *subplan,
387  char *query,
388  List *target_attrs,
389  bool has_returning,
391 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
392 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
393  ItemPointer tupleid,
394  TupleTableSlot *slot);
395 static void store_returning_result(PgFdwModifyState *fmstate,
396  TupleTableSlot *slot, PGresult *res);
397 static void finish_foreign_modify(PgFdwModifyState *fmstate);
398 static List *build_remote_returning(Index rtindex, Relation rel,
399  List *returningList);
400 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
401 static void execute_dml_stmt(ForeignScanState *node);
403 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
404  List *fdw_scan_tlist,
405  Index rtindex);
407  TupleTableSlot *slot,
408  EState *estate);
409 static void prepare_query_params(PlanState *node,
410  List *fdw_exprs,
411  int numParams,
413  List **param_exprs,
414  const char ***param_values);
415 static void process_query_params(ExprContext *econtext,
417  List *param_exprs,
418  const char **param_values);
419 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
420  HeapTuple *rows, int targrows,
421  double *totalrows,
422  double *totaldeadrows);
423 static void analyze_row_processor(PGresult *res, int row,
424  PgFdwAnalyzeState *astate);
426  int row,
427  Relation rel,
430  ForeignScanState *fsstate,
431  MemoryContext temp_context);
432 static void conversion_error_callback(void *arg);
433 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
434  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
435  JoinPathExtraData *extra);
436 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
437  Node *havingQual);
439  RelOptInfo *rel);
442  Path *epq_path);
443 static void add_foreign_grouping_paths(PlannerInfo *root,
444  RelOptInfo *input_rel,
445  RelOptInfo *grouped_rel,
446  GroupPathExtraData *extra);
447 static void apply_server_options(PgFdwRelationInfo *fpinfo);
448 static void apply_table_options(PgFdwRelationInfo *fpinfo);
449 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
450  const PgFdwRelationInfo *fpinfo_o,
451  const PgFdwRelationInfo *fpinfo_i);
452 
453 
454 /*
455  * Foreign-data wrapper handler function: return a struct with pointers
456  * to my callback routines.
457  */
458 Datum
460 {
461  FdwRoutine *routine = makeNode(FdwRoutine);
462 
463  /* Functions for scanning foreign tables */
471 
472  /* Functions for updating foreign tables */
487 
488  /* Function for EvalPlanQual rechecks */
490  /* Support functions for EXPLAIN */
494 
495  /* Support functions for ANALYZE */
497 
498  /* Support functions for IMPORT FOREIGN SCHEMA */
500 
501  /* Support functions for join push-down */
503 
504  /* Support functions for upper relation push-down */
506 
507  PG_RETURN_POINTER(routine);
508 }
509 
510 /*
511  * postgresGetForeignRelSize
512  * Estimate # of rows and width of the result of the scan
513  *
514  * We should consider the effect of all baserestrictinfo clauses here, but
515  * not any join clauses.
516  */
517 static void
519  RelOptInfo *baserel,
520  Oid foreigntableid)
521 {
522  PgFdwRelationInfo *fpinfo;
523  ListCell *lc;
524  RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
525  const char *namespace;
526  const char *relname;
527  const char *refname;
528 
529  /*
530  * We use PgFdwRelationInfo to pass various information to subsequent
531  * functions.
532  */
533  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
534  baserel->fdw_private = (void *) fpinfo;
535 
536  /* Base foreign tables need to be pushed down always. */
537  fpinfo->pushdown_safe = true;
538 
539  /* Look up foreign-table catalog info. */
540  fpinfo->table = GetForeignTable(foreigntableid);
541  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
542 
543  /*
544  * Extract user-settable option values. Note that per-table setting of
545  * use_remote_estimate overrides per-server setting.
546  */
547  fpinfo->use_remote_estimate = false;
550  fpinfo->shippable_extensions = NIL;
551  fpinfo->fetch_size = 100;
552 
553  apply_server_options(fpinfo);
554  apply_table_options(fpinfo);
555 
556  /*
557  * If the table or the server is configured to use remote estimates,
558  * identify which user to do remote access as during planning. This
559  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
560  * permissions, the query would have failed at runtime anyway.
561  */
562  if (fpinfo->use_remote_estimate)
563  {
564  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
565 
566  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
567  }
568  else
569  fpinfo->user = NULL;
570 
571  /*
572  * Identify which baserestrictinfo clauses can be sent to the remote
573  * server and which can't.
574  */
575  classifyConditions(root, baserel, baserel->baserestrictinfo,
576  &fpinfo->remote_conds, &fpinfo->local_conds);
577 
578  /*
579  * Identify which attributes will need to be retrieved from the remote
580  * server. These include all attrs needed for joins or final output, plus
581  * all attrs used in the local_conds. (Note: if we end up using a
582  * parameterized scan, it's possible that some of the join clauses will be
583  * sent to the remote and thus we wouldn't really need to retrieve the
584  * columns used in them. Doesn't seem worth detecting that case though.)
585  */
586  fpinfo->attrs_used = NULL;
587  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
588  &fpinfo->attrs_used);
589  foreach(lc, fpinfo->local_conds)
590  {
591  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
592 
593  pull_varattnos((Node *) rinfo->clause, baserel->relid,
594  &fpinfo->attrs_used);
595  }
596 
597  /*
598  * Compute the selectivity and cost of the local_conds, so we don't have
599  * to do it over again for each path. The best we can do for these
600  * conditions is to estimate selectivity on the basis of local statistics.
601  */
603  fpinfo->local_conds,
604  baserel->relid,
605  JOIN_INNER,
606  NULL);
607 
608  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
609 
610  /*
611  * Set cached relation costs to some negative value, so that we can detect
612  * when they are set to some sensible costs during one (usually the first)
613  * of the calls to estimate_path_cost_size().
614  */
615  fpinfo->rel_startup_cost = -1;
616  fpinfo->rel_total_cost = -1;
617 
618  /*
619  * If the table or the server is configured to use remote estimates,
620  * connect to the foreign server and execute EXPLAIN to estimate the
621  * number of rows selected by the restriction clauses, as well as the
622  * average row width. Otherwise, estimate using whatever statistics we
623  * have locally, in a way similar to ordinary tables.
624  */
625  if (fpinfo->use_remote_estimate)
626  {
627  /*
628  * Get cost/size estimates with help of remote server. Save the
629  * values in fpinfo so we don't need to do it again to generate the
630  * basic foreign path.
631  */
632  estimate_path_cost_size(root, baserel, NIL, NIL,
633  &fpinfo->rows, &fpinfo->width,
634  &fpinfo->startup_cost, &fpinfo->total_cost);
635 
636  /* Report estimated baserel size to planner. */
637  baserel->rows = fpinfo->rows;
638  baserel->reltarget->width = fpinfo->width;
639  }
640  else
641  {
642  /*
643  * If the foreign table has never been ANALYZEd, it will have relpages
644  * and reltuples equal to zero, which most likely has nothing to do
645  * with reality. We can't do a whole lot about that if we're not
646  * allowed to consult the remote server, but we can use a hack similar
647  * to plancat.c's treatment of empty relations: use a minimum size
648  * estimate of 10 pages, and divide by the column-datatype-based width
649  * estimate to get the corresponding number of tuples.
650  */
651  if (baserel->pages == 0 && baserel->tuples == 0)
652  {
653  baserel->pages = 10;
654  baserel->tuples =
655  (10 * BLCKSZ) / (baserel->reltarget->width +
657  }
658 
659  /* Estimate baserel size as best we can with local statistics. */
660  set_baserel_size_estimates(root, baserel);
661 
662  /* Fill in basically-bogus cost estimates for use later. */
663  estimate_path_cost_size(root, baserel, NIL, NIL,
664  &fpinfo->rows, &fpinfo->width,
665  &fpinfo->startup_cost, &fpinfo->total_cost);
666  }
667 
668  /*
669  * Set the name of relation in fpinfo, while we are constructing it here.
670  * It will be used to build the string describing the join relation in
671  * EXPLAIN output. We can't know whether VERBOSE option is specified or
672  * not, so always schema-qualify the foreign table name.
673  */
674  fpinfo->relation_name = makeStringInfo();
675  namespace = get_namespace_name(get_rel_namespace(foreigntableid));
676  relname = get_rel_name(foreigntableid);
677  refname = rte->eref->aliasname;
678  appendStringInfo(fpinfo->relation_name, "%s.%s",
679  quote_identifier(namespace),
680  quote_identifier(relname));
681  if (*refname && strcmp(refname, relname) != 0)
682  appendStringInfo(fpinfo->relation_name, " %s",
684 
685  /* No outer and inner relations. */
686  fpinfo->make_outerrel_subquery = false;
687  fpinfo->make_innerrel_subquery = false;
688  fpinfo->lower_subquery_rels = NULL;
689  /* Set the relation index. */
690  fpinfo->relation_index = baserel->relid;
691 }
692 
693 /*
694  * get_useful_ecs_for_relation
695  * Determine which EquivalenceClasses might be involved in useful
696  * orderings of this relation.
697  *
698  * This function is in some respects a mirror image of the core function
699  * pathkeys_useful_for_merging: for a regular table, we know what indexes
700  * we have and want to test whether any of them are useful. For a foreign
701  * table, we don't know what indexes are present on the remote side but
702  * want to speculate about which ones we'd like to use if they existed.
703  *
704  * This function returns a list of potentially-useful equivalence classes,
705  * but it does not guarantee that an EquivalenceMember exists which contains
706  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
707  * ft1.x + t1.x = 0, this function will say that the equivalence class
708  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
709  * t1 is local (or on a different server), it will turn out that no useful
710  * ORDER BY clause can be generated. It's not our job to figure that out
711  * here; we're only interested in identifying relevant ECs.
712  */
713 static List *
715 {
716  List *useful_eclass_list = NIL;
717  ListCell *lc;
718  Relids relids;
719 
720  /*
721  * First, consider whether any active EC is potentially useful for a merge
722  * join against this relation.
723  */
724  if (rel->has_eclass_joins)
725  {
726  foreach(lc, root->eq_classes)
727  {
728  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
729 
730  if (eclass_useful_for_merging(root, cur_ec, rel))
731  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
732  }
733  }
734 
735  /*
736  * Next, consider whether there are any non-EC derivable join clauses that
737  * are merge-joinable. If the joininfo list is empty, we can exit
738  * quickly.
739  */
740  if (rel->joininfo == NIL)
741  return useful_eclass_list;
742 
743  /* If this is a child rel, we must use the topmost parent rel to search. */
744  if (IS_OTHER_REL(rel))
745  {
747  relids = rel->top_parent_relids;
748  }
749  else
750  relids = rel->relids;
751 
752  /* Check each join clause in turn. */
753  foreach(lc, rel->joininfo)
754  {
755  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
756 
757  /* Consider only mergejoinable clauses */
758  if (restrictinfo->mergeopfamilies == NIL)
759  continue;
760 
761  /* Make sure we've got canonical ECs. */
762  update_mergeclause_eclasses(root, restrictinfo);
763 
764  /*
765  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
766  * that left_ec and right_ec will be initialized, per comments in
767  * distribute_qual_to_rels.
768  *
769  * We want to identify which side of this merge-joinable clause
770  * contains columns from the relation produced by this RelOptInfo. We
771  * test for overlap, not containment, because there could be extra
772  * relations on either side. For example, suppose we've got something
773  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
774  * A.y = D.y. The input rel might be the joinrel between A and B, and
775  * we'll consider the join clause A.y = D.y. relids contains a
776  * relation not involved in the join class (B) and the equivalence
777  * class for the left-hand side of the clause contains a relation not
778  * involved in the input rel (C). Despite the fact that we have only
779  * overlap and not containment in either direction, A.y is potentially
780  * useful as a sort column.
781  *
782  * Note that it's even possible that relids overlaps neither side of
783  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
784  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
785  * but overlaps neither side of B. In that case, we just skip this
786  * join clause, since it doesn't suggest a useful sort order for this
787  * relation.
788  */
789  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
790  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
791  restrictinfo->right_ec);
792  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
793  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
794  restrictinfo->left_ec);
795  }
796 
797  return useful_eclass_list;
798 }
799 
800 /*
801  * get_useful_pathkeys_for_relation
802  * Determine which orderings of a relation might be useful.
803  *
804  * Getting data in sorted order can be useful either because the requested
805  * order matches the final output ordering for the overall query we're
806  * planning, or because it enables an efficient merge join. Here, we try
807  * to figure out which pathkeys to consider.
808  */
809 static List *
811 {
812  List *useful_pathkeys_list = NIL;
813  List *useful_eclass_list;
815  EquivalenceClass *query_ec = NULL;
816  ListCell *lc;
817 
818  /*
819  * Pushing the query_pathkeys to the remote server is always worth
820  * considering, because it might let us avoid a local sort.
821  */
822  if (root->query_pathkeys)
823  {
824  bool query_pathkeys_ok = true;
825 
826  foreach(lc, root->query_pathkeys)
827  {
828  PathKey *pathkey = (PathKey *) lfirst(lc);
829  EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
830  Expr *em_expr;
831 
832  /*
833  * The planner and executor don't have any clever strategy for
834  * taking data sorted by a prefix of the query's pathkeys and
835  * getting it to be sorted by all of those pathkeys. We'll just
836  * end up resorting the entire data set. So, unless we can push
837  * down all of the query pathkeys, forget it.
838  *
839  * is_foreign_expr would detect volatile expressions as well, but
840  * checking ec_has_volatile here saves some cycles.
841  */
842  if (pathkey_ec->ec_has_volatile ||
843  !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
844  !is_foreign_expr(root, rel, em_expr))
845  {
846  query_pathkeys_ok = false;
847  break;
848  }
849  }
850 
851  if (query_pathkeys_ok)
852  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
853  }
854 
855  /*
856  * Even if we're not using remote estimates, having the remote side do the
857  * sort generally won't be any worse than doing it locally, and it might
858  * be much better if the remote side can generate data in the right order
859  * without needing a sort at all. However, what we're going to do next is
860  * try to generate pathkeys that seem promising for possible merge joins,
861  * and that's more speculative. A wrong choice might hurt quite a bit, so
862  * bail out if we can't use remote estimates.
863  */
864  if (!fpinfo->use_remote_estimate)
865  return useful_pathkeys_list;
866 
867  /* Get the list of interesting EquivalenceClasses. */
868  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
869 
870  /* Extract unique EC for query, if any, so we don't consider it again. */
871  if (list_length(root->query_pathkeys) == 1)
872  {
873  PathKey *query_pathkey = linitial(root->query_pathkeys);
874 
875  query_ec = query_pathkey->pk_eclass;
876  }
877 
878  /*
879  * As a heuristic, the only pathkeys we consider here are those of length
880  * one. It's surely possible to consider more, but since each one we
881  * choose to consider will generate a round-trip to the remote side, we
882  * need to be a bit cautious here. It would sure be nice to have a local
883  * cache of information about remote index definitions...
884  */
885  foreach(lc, useful_eclass_list)
886  {
887  EquivalenceClass *cur_ec = lfirst(lc);
888  Expr *em_expr;
889  PathKey *pathkey;
890 
891  /* If redundant with what we did above, skip it. */
892  if (cur_ec == query_ec)
893  continue;
894 
895  /* If no pushable expression for this rel, skip it. */
896  em_expr = find_em_expr_for_rel(cur_ec, rel);
897  if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
898  continue;
899 
900  /* Looks like we can generate a pathkey, so let's do it. */
901  pathkey = make_canonical_pathkey(root, cur_ec,
902  linitial_oid(cur_ec->ec_opfamilies),
904  false);
905  useful_pathkeys_list = lappend(useful_pathkeys_list,
906  list_make1(pathkey));
907  }
908 
909  return useful_pathkeys_list;
910 }
911 
912 /*
913  * postgresGetForeignPaths
914  * Create possible scan paths for a scan on the foreign table
915  */
916 static void
918  RelOptInfo *baserel,
919  Oid foreigntableid)
920 {
921  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
922  ForeignPath *path;
923  List *ppi_list;
924  ListCell *lc;
925 
926  /*
927  * Create simplest ForeignScan path node and add it to baserel. This path
928  * corresponds to SeqScan path of regular tables (though depending on what
929  * baserestrict conditions we were able to send to remote, there might
930  * actually be an indexscan happening there). We already did all the work
931  * to estimate cost and size of this path.
932  */
933  path = create_foreignscan_path(root, baserel,
934  NULL, /* default pathtarget */
935  fpinfo->rows,
936  fpinfo->startup_cost,
937  fpinfo->total_cost,
938  NIL, /* no pathkeys */
939  NULL, /* no outer rel either */
940  NULL, /* no extra plan */
941  NIL); /* no fdw_private list */
942  add_path(baserel, (Path *) path);
943 
944  /* Add paths with pathkeys */
945  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
946 
947  /*
948  * If we're not using remote estimates, stop here. We have no way to
949  * estimate whether any join clauses would be worth sending across, so
950  * don't bother building parameterized paths.
951  */
952  if (!fpinfo->use_remote_estimate)
953  return;
954 
955  /*
956  * Thumb through all join clauses for the rel to identify which outer
957  * relations could supply one or more safe-to-send-to-remote join clauses.
958  * We'll build a parameterized path for each such outer relation.
959  *
960  * It's convenient to manage this by representing each candidate outer
961  * relation by the ParamPathInfo node for it. We can then use the
962  * ppi_clauses list in the ParamPathInfo node directly as a list of the
963  * interesting join clauses for that rel. This takes care of the
964  * possibility that there are multiple safe join clauses for such a rel,
965  * and also ensures that we account for unsafe join clauses that we'll
966  * still have to enforce locally (since the parameterized-path machinery
967  * insists that we handle all movable clauses).
968  */
969  ppi_list = NIL;
970  foreach(lc, baserel->joininfo)
971  {
972  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
973  Relids required_outer;
974  ParamPathInfo *param_info;
975 
976  /* Check if clause can be moved to this rel */
977  if (!join_clause_is_movable_to(rinfo, baserel))
978  continue;
979 
980  /* See if it is safe to send to remote */
981  if (!is_foreign_expr(root, baserel, rinfo->clause))
982  continue;
983 
984  /* Calculate required outer rels for the resulting path */
985  required_outer = bms_union(rinfo->clause_relids,
986  baserel->lateral_relids);
987  /* We do not want the foreign rel itself listed in required_outer */
988  required_outer = bms_del_member(required_outer, baserel->relid);
989 
990  /*
991  * required_outer probably can't be empty here, but if it were, we
992  * couldn't make a parameterized path.
993  */
994  if (bms_is_empty(required_outer))
995  continue;
996 
997  /* Get the ParamPathInfo */
998  param_info = get_baserel_parampathinfo(root, baserel,
999  required_outer);
1000  Assert(param_info != NULL);
1001 
1002  /*
1003  * Add it to list unless we already have it. Testing pointer equality
1004  * is OK since get_baserel_parampathinfo won't make duplicates.
1005  */
1006  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1007  }
1008 
1009  /*
1010  * The above scan examined only "generic" join clauses, not those that
1011  * were absorbed into EquivalenceClauses. See if we can make anything out
1012  * of EquivalenceClauses.
1013  */
1014  if (baserel->has_eclass_joins)
1015  {
1016  /*
1017  * We repeatedly scan the eclass list looking for column references
1018  * (or expressions) belonging to the foreign rel. Each time we find
1019  * one, we generate a list of equivalence joinclauses for it, and then
1020  * see if any are safe to send to the remote. Repeat till there are
1021  * no more candidate EC members.
1022  */
1024 
1025  arg.already_used = NIL;
1026  for (;;)
1027  {
1028  List *clauses;
1029 
1030  /* Make clauses, skipping any that join to lateral_referencers */
1031  arg.current = NULL;
1033  baserel,
1035  (void *) &arg,
1036  baserel->lateral_referencers);
1037 
1038  /* Done if there are no more expressions in the foreign rel */
1039  if (arg.current == NULL)
1040  {
1041  Assert(clauses == NIL);
1042  break;
1043  }
1044 
1045  /* Scan the extracted join clauses */
1046  foreach(lc, clauses)
1047  {
1048  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1049  Relids required_outer;
1050  ParamPathInfo *param_info;
1051 
1052  /* Check if clause can be moved to this rel */
1053  if (!join_clause_is_movable_to(rinfo, baserel))
1054  continue;
1055 
1056  /* See if it is safe to send to remote */
1057  if (!is_foreign_expr(root, baserel, rinfo->clause))
1058  continue;
1059 
1060  /* Calculate required outer rels for the resulting path */
1061  required_outer = bms_union(rinfo->clause_relids,
1062  baserel->lateral_relids);
1063  required_outer = bms_del_member(required_outer, baserel->relid);
1064  if (bms_is_empty(required_outer))
1065  continue;
1066 
1067  /* Get the ParamPathInfo */
1068  param_info = get_baserel_parampathinfo(root, baserel,
1069  required_outer);
1070  Assert(param_info != NULL);
1071 
1072  /* Add it to list unless we already have it */
1073  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1074  }
1075 
1076  /* Try again, now ignoring the expression we found this time */
1077  arg.already_used = lappend(arg.already_used, arg.current);
1078  }
1079  }
1080 
1081  /*
1082  * Now build a path for each useful outer relation.
1083  */
1084  foreach(lc, ppi_list)
1085  {
1086  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1087  double rows;
1088  int width;
1089  Cost startup_cost;
1090  Cost total_cost;
1091 
1092  /* Get a cost estimate from the remote */
1093  estimate_path_cost_size(root, baserel,
1094  param_info->ppi_clauses, NIL,
1095  &rows, &width,
1096  &startup_cost, &total_cost);
1097 
1098  /*
1099  * ppi_rows currently won't get looked at by anything, but still we
1100  * may as well ensure that it matches our idea of the rowcount.
1101  */
1102  param_info->ppi_rows = rows;
1103 
1104  /* Make the path */
1105  path = create_foreignscan_path(root, baserel,
1106  NULL, /* default pathtarget */
1107  rows,
1108  startup_cost,
1109  total_cost,
1110  NIL, /* no pathkeys */
1111  param_info->ppi_req_outer,
1112  NULL,
1113  NIL); /* no fdw_private list */
1114  add_path(baserel, (Path *) path);
1115  }
1116 }
1117 
1118 /*
1119  * postgresGetForeignPlan
1120  * Create ForeignScan plan node which implements selected best path
1121  */
1122 static ForeignScan *
1124  RelOptInfo *foreignrel,
1125  Oid foreigntableid,
1126  ForeignPath *best_path,
1127  List *tlist,
1128  List *scan_clauses,
1129  Plan *outer_plan)
1130 {
1131  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1132  Index scan_relid;
1133  List *fdw_private;
1134  List *remote_exprs = NIL;
1135  List *local_exprs = NIL;
1136  List *params_list = NIL;
1137  List *fdw_scan_tlist = NIL;
1138  List *fdw_recheck_quals = NIL;
1140  StringInfoData sql;
1141  ListCell *lc;
1142 
1143  if (IS_SIMPLE_REL(foreignrel))
1144  {
1145  /*
1146  * For base relations, set scan_relid as the relid of the relation.
1147  */
1148  scan_relid = foreignrel->relid;
1149 
1150  /*
1151  * In a base-relation scan, we must apply the given scan_clauses.
1152  *
1153  * Separate the scan_clauses into those that can be executed remotely
1154  * and those that can't. baserestrictinfo clauses that were
1155  * previously determined to be safe or unsafe by classifyConditions
1156  * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1157  * else in the scan_clauses list will be a join clause, which we have
1158  * to check for remote-safety.
1159  *
1160  * Note: the join clauses we see here should be the exact same ones
1161  * previously examined by postgresGetForeignPaths. Possibly it'd be
1162  * worth passing forward the classification work done then, rather
1163  * than repeating it here.
1164  *
1165  * This code must match "extract_actual_clauses(scan_clauses, false)"
1166  * except for the additional decision about remote versus local
1167  * execution.
1168  */
1169  foreach(lc, scan_clauses)
1170  {
1171  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1172 
1173  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1174  if (rinfo->pseudoconstant)
1175  continue;
1176 
1177  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1178  remote_exprs = lappend(remote_exprs, rinfo->clause);
1179  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1180  local_exprs = lappend(local_exprs, rinfo->clause);
1181  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1182  remote_exprs = lappend(remote_exprs, rinfo->clause);
1183  else
1184  local_exprs = lappend(local_exprs, rinfo->clause);
1185  }
1186 
1187  /*
1188  * For a base-relation scan, we have to support EPQ recheck, which
1189  * should recheck all the remote quals.
1190  */
1191  fdw_recheck_quals = remote_exprs;
1192  }
1193  else
1194  {
1195  /*
1196  * Join relation or upper relation - set scan_relid to 0.
1197  */
1198  scan_relid = 0;
1199 
1200  /*
1201  * For a join rel, baserestrictinfo is NIL and we are not considering
1202  * parameterization right now, so there should be no scan_clauses for
1203  * a joinrel or an upper rel either.
1204  */
1205  Assert(!scan_clauses);
1206 
1207  /*
1208  * Instead we get the conditions to apply from the fdw_private
1209  * structure.
1210  */
1211  remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1212  local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1213 
1214  /*
1215  * We leave fdw_recheck_quals empty in this case, since we never need
1216  * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1217  * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1218  * If we're planning an upperrel (ie, remote grouping or aggregation)
1219  * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1220  * allowed, and indeed we *can't* put the remote clauses into
1221  * fdw_recheck_quals because the unaggregated Vars won't be available
1222  * locally.
1223  */
1224 
1225  /* Build the list of columns to be fetched from the foreign server. */
1226  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1227 
1228  /*
1229  * Ensure that the outer plan produces a tuple whose descriptor
1230  * matches our scan tuple slot. This is safe because all scans and
1231  * joins support projection, so we never need to insert a Result node.
1232  * Also, remove the local conditions from outer plan's quals, lest
1233  * they will be evaluated twice, once by the local plan and once by
1234  * the scan.
1235  */
1236  if (outer_plan)
1237  {
1238  ListCell *lc;
1239 
1240  /*
1241  * Right now, we only consider grouping and aggregation beyond
1242  * joins. Queries involving aggregates or grouping do not require
1243  * EPQ mechanism, hence should not have an outer plan here.
1244  */
1245  Assert(!IS_UPPER_REL(foreignrel));
1246 
1247  outer_plan->targetlist = fdw_scan_tlist;
1248 
1249  foreach(lc, local_exprs)
1250  {
1251  Join *join_plan = (Join *) outer_plan;
1252  Node *qual = lfirst(lc);
1253 
1254  outer_plan->qual = list_delete(outer_plan->qual, qual);
1255 
1256  /*
1257  * For an inner join the local conditions of foreign scan plan
1258  * can be part of the joinquals as well.
1259  */
1260  if (join_plan->jointype == JOIN_INNER)
1261  join_plan->joinqual = list_delete(join_plan->joinqual,
1262  qual);
1263  }
1264  }
1265  }
1266 
1267  /*
1268  * Build the query string to be sent for execution, and identify
1269  * expressions to be sent as parameters.
1270  */
1271  initStringInfo(&sql);
1272  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1273  remote_exprs, best_path->path.pathkeys,
1274  false, &retrieved_attrs, &params_list);
1275 
1276  /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1277  fpinfo->final_remote_exprs = remote_exprs;
1278 
1279  /*
1280  * Build the fdw_private list that will be available to the executor.
1281  * Items in the list must match order in enum FdwScanPrivateIndex.
1282  */
1283  fdw_private = list_make3(makeString(sql.data),
1285  makeInteger(fpinfo->fetch_size));
1286  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1287  fdw_private = lappend(fdw_private,
1288  makeString(fpinfo->relation_name->data));
1289 
1290  /*
1291  * Create the ForeignScan node for the given relation.
1292  *
1293  * Note that the remote parameter expressions are stored in the fdw_exprs
1294  * field of the finished plan node; we can't keep them in private state
1295  * because then they wouldn't be subject to later planner processing.
1296  */
1297  return make_foreignscan(tlist,
1298  local_exprs,
1299  scan_relid,
1300  params_list,
1301  fdw_private,
1302  fdw_scan_tlist,
1303  fdw_recheck_quals,
1304  outer_plan);
1305 }
1306 
1307 /*
1308  * postgresBeginForeignScan
1309  * Initiate an executor scan of a foreign PostgreSQL table.
1310  */
1311 static void
1313 {
1314  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1315  EState *estate = node->ss.ps.state;
1316  PgFdwScanState *fsstate;
1317  RangeTblEntry *rte;
1318  Oid userid;
1319  ForeignTable *table;
1320  UserMapping *user;
1321  int rtindex;
1322  int numParams;
1323 
1324  /*
1325  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1326  */
1327  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1328  return;
1329 
1330  /*
1331  * We'll save private state in node->fdw_state.
1332  */
1333  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1334  node->fdw_state = (void *) fsstate;
1335 
1336  /*
1337  * Identify which user to do the remote access as. This should match what
1338  * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1339  * lowest-numbered member RTE as a representative; we would get the same
1340  * result from any.
1341  */
1342  if (fsplan->scan.scanrelid > 0)
1343  rtindex = fsplan->scan.scanrelid;
1344  else
1345  rtindex = bms_next_member(fsplan->fs_relids, -1);
1346  rte = rt_fetch(rtindex, estate->es_range_table);
1347  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1348 
1349  /* Get info about foreign table. */
1350  table = GetForeignTable(rte->relid);
1351  user = GetUserMapping(userid, table->serverid);
1352 
1353  /*
1354  * Get connection to the foreign server. Connection manager will
1355  * establish new connection if necessary.
1356  */
1357  fsstate->conn = GetConnection(user, false);
1358 
1359  /* Assign a unique ID for my cursor */
1360  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1361  fsstate->cursor_exists = false;
1362 
1363  /* Get private info created by planner functions. */
1364  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1366  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1368  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1370 
1371  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1372  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1373  "postgres_fdw tuple data",
1375  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1376  "postgres_fdw temporary data",
1378 
1379  /*
1380  * Get info we'll need for converting data fetched from the foreign server
1381  * into local representation and error reporting during that process.
1382  */
1383  if (fsplan->scan.scanrelid > 0)
1384  {
1385  fsstate->rel = node->ss.ss_currentRelation;
1386  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1387  }
1388  else
1389  {
1390  fsstate->rel = NULL;
1391  fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1392  }
1393 
1394  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1395 
1396  /*
1397  * Prepare for processing of parameters used in remote query, if any.
1398  */
1399  numParams = list_length(fsplan->fdw_exprs);
1400  fsstate->numParams = numParams;
1401  if (numParams > 0)
1403  fsplan->fdw_exprs,
1404  numParams,
1405  &fsstate->param_flinfo,
1406  &fsstate->param_exprs,
1407  &fsstate->param_values);
1408 }
1409 
1410 /*
1411  * postgresIterateForeignScan
1412  * Retrieve next row from the result set, or clear tuple slot to indicate
1413  * EOF.
1414  */
1415 static TupleTableSlot *
1417 {
1418  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1419  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1420 
1421  /*
1422  * If this is the first call after Begin or ReScan, we need to create the
1423  * cursor on the remote side.
1424  */
1425  if (!fsstate->cursor_exists)
1426  create_cursor(node);
1427 
1428  /*
1429  * Get some more tuples, if we've run out.
1430  */
1431  if (fsstate->next_tuple >= fsstate->num_tuples)
1432  {
1433  /* No point in another fetch if we already detected EOF, though. */
1434  if (!fsstate->eof_reached)
1435  fetch_more_data(node);
1436  /* If we didn't get any tuples, must be end of data. */
1437  if (fsstate->next_tuple >= fsstate->num_tuples)
1438  return ExecClearTuple(slot);
1439  }
1440 
1441  /*
1442  * Return the next tuple.
1443  */
1444  ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1445  slot,
1446  InvalidBuffer,
1447  false);
1448 
1449  return slot;
1450 }
1451 
1452 /*
1453  * postgresReScanForeignScan
1454  * Restart the scan.
1455  */
1456 static void
1458 {
1459  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1460  char sql[64];
1461  PGresult *res;
1462 
1463  /* If we haven't created the cursor yet, nothing to do. */
1464  if (!fsstate->cursor_exists)
1465  return;
1466 
1467  /*
1468  * If any internal parameters affecting this node have changed, we'd
1469  * better destroy and recreate the cursor. Otherwise, rewinding it should
1470  * be good enough. If we've only fetched zero or one batch, we needn't
1471  * even rewind the cursor, just rescan what we have.
1472  */
1473  if (node->ss.ps.chgParam != NULL)
1474  {
1475  fsstate->cursor_exists = false;
1476  snprintf(sql, sizeof(sql), "CLOSE c%u",
1477  fsstate->cursor_number);
1478  }
1479  else if (fsstate->fetch_ct_2 > 1)
1480  {
1481  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1482  fsstate->cursor_number);
1483  }
1484  else
1485  {
1486  /* Easy: just rescan what we already have in memory, if anything */
1487  fsstate->next_tuple = 0;
1488  return;
1489  }
1490 
1491  /*
1492  * We don't use a PG_TRY block here, so be careful not to throw error
1493  * without releasing the PGresult.
1494  */
1495  res = pgfdw_exec_query(fsstate->conn, sql);
1496  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1497  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1498  PQclear(res);
1499 
1500  /* Now force a fresh FETCH. */
1501  fsstate->tuples = NULL;
1502  fsstate->num_tuples = 0;
1503  fsstate->next_tuple = 0;
1504  fsstate->fetch_ct_2 = 0;
1505  fsstate->eof_reached = false;
1506 }
1507 
1508 /*
1509  * postgresEndForeignScan
1510  * Finish scanning foreign table and dispose objects used for this scan
1511  */
1512 static void
1514 {
1515  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1516 
1517  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1518  if (fsstate == NULL)
1519  return;
1520 
1521  /* Close the cursor if open, to prevent accumulation of cursors */
1522  if (fsstate->cursor_exists)
1523  close_cursor(fsstate->conn, fsstate->cursor_number);
1524 
1525  /* Release remote connection */
1526  ReleaseConnection(fsstate->conn);
1527  fsstate->conn = NULL;
1528 
1529  /* MemoryContexts will be deleted automatically. */
1530 }
1531 
1532 /*
1533  * postgresAddForeignUpdateTargets
1534  * Add resjunk column(s) needed for update/delete on a foreign table
1535  */
1536 static void
1538  RangeTblEntry *target_rte,
1539  Relation target_relation)
1540 {
1541  Var *var;
1542  const char *attrname;
1543  TargetEntry *tle;
1544 
1545  /*
1546  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1547  */
1548 
1549  /* Make a Var representing the desired value */
1550  var = makeVar(parsetree->resultRelation,
1552  TIDOID,
1553  -1,
1554  InvalidOid,
1555  0);
1556 
1557  /* Wrap it in a resjunk TLE with the right name ... */
1558  attrname = "ctid";
1559 
1560  tle = makeTargetEntry((Expr *) var,
1561  list_length(parsetree->targetList) + 1,
1562  pstrdup(attrname),
1563  true);
1564 
1565  /* ... and add it to the query's targetlist */
1566  parsetree->targetList = lappend(parsetree->targetList, tle);
1567 }
1568 
1569 /*
1570  * postgresPlanForeignModify
1571  * Plan an insert/update/delete operation on a foreign table
1572  */
1573 static List *
1575  ModifyTable *plan,
1576  Index resultRelation,
1577  int subplan_index)
1578 {
1579  CmdType operation = plan->operation;
1580  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1581  Relation rel;
1582  StringInfoData sql;
1583  List *targetAttrs = NIL;
1584  List *returningList = NIL;
1586  bool doNothing = false;
1587 
1588  initStringInfo(&sql);
1589 
1590  /*
1591  * Core code already has some lock on each rel being planned, so we can
1592  * use NoLock here.
1593  */
1594  rel = heap_open(rte->relid, NoLock);
1595 
1596  /*
1597  * In an INSERT, we transmit all columns that are defined in the foreign
1598  * table. In an UPDATE, we transmit only columns that were explicitly
1599  * targets of the UPDATE, so as to avoid unnecessary data transmission.
1600  * (We can't do that for INSERT since we would miss sending default values
1601  * for columns not listed in the source statement.)
1602  */
1603  if (operation == CMD_INSERT)
1604  {
1606  int attnum;
1607 
1608  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1609  {
1610  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1611 
1612  if (!attr->attisdropped)
1613  targetAttrs = lappend_int(targetAttrs, attnum);
1614  }
1615  }
1616  else if (operation == CMD_UPDATE)
1617  {
1618  int col;
1619 
1620  col = -1;
1621  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1622  {
1623  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1625 
1626  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1627  elog(ERROR, "system-column update is not supported");
1628  targetAttrs = lappend_int(targetAttrs, attno);
1629  }
1630  }
1631 
1632  /*
1633  * Extract the relevant RETURNING list if any.
1634  */
1635  if (plan->returningLists)
1636  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1637 
1638  /*
1639  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1640  * should have already been rejected in the optimizer, as presently there
1641  * is no way to recognize an arbiter index on a foreign table. Only DO
1642  * NOTHING is supported without an inference specification.
1643  */
1644  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1645  doNothing = true;
1646  else if (plan->onConflictAction != ONCONFLICT_NONE)
1647  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1648  (int) plan->onConflictAction);
1649 
1650  /*
1651  * Construct the SQL command string.
1652  */
1653  switch (operation)
1654  {
1655  case CMD_INSERT:
1656  deparseInsertSql(&sql, root, resultRelation, rel,
1657  targetAttrs, doNothing, returningList,
1658  &retrieved_attrs);
1659  break;
1660  case CMD_UPDATE:
1661  deparseUpdateSql(&sql, root, resultRelation, rel,
1662  targetAttrs, returningList,
1663  &retrieved_attrs);
1664  break;
1665  case CMD_DELETE:
1666  deparseDeleteSql(&sql, root, resultRelation, rel,
1667  returningList,
1668  &retrieved_attrs);
1669  break;
1670  default:
1671  elog(ERROR, "unexpected operation: %d", (int) operation);
1672  break;
1673  }
1674 
1675  heap_close(rel, NoLock);
1676 
1677  /*
1678  * Build the fdw_private list that will be available to the executor.
1679  * Items in the list must match enum FdwModifyPrivateIndex, above.
1680  */
1681  return list_make4(makeString(sql.data),
1682  targetAttrs,
1683  makeInteger((retrieved_attrs != NIL)),
1684  retrieved_attrs);
1685 }
1686 
1687 /*
1688  * postgresBeginForeignModify
1689  * Begin an insert/update/delete operation on a foreign table
1690  */
1691 static void
1693  ResultRelInfo *resultRelInfo,
1694  List *fdw_private,
1695  int subplan_index,
1696  int eflags)
1697 {
1698  PgFdwModifyState *fmstate;
1699  char *query;
1700  List *target_attrs;
1701  bool has_returning;
1703 
1704  /*
1705  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1706  * stays NULL.
1707  */
1708  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1709  return;
1710 
1711  /* Deconstruct fdw_private data. */
1712  query = strVal(list_nth(fdw_private,
1714  target_attrs = (List *) list_nth(fdw_private,
1716  has_returning = intVal(list_nth(fdw_private,
1718  retrieved_attrs = (List *) list_nth(fdw_private,
1720 
1721  /* Construct an execution state. */
1722  fmstate = create_foreign_modify(mtstate->ps.state,
1723  resultRelInfo,
1724  mtstate->operation,
1725  mtstate->mt_plans[subplan_index]->plan,
1726  query,
1727  target_attrs,
1728  has_returning,
1729  retrieved_attrs);
1730 
1731  resultRelInfo->ri_FdwState = fmstate;
1732 }
1733 
1734 /*
1735  * postgresExecForeignInsert
1736  * Insert one row into a foreign table
1737  */
1738 static TupleTableSlot *
1740  ResultRelInfo *resultRelInfo,
1741  TupleTableSlot *slot,
1742  TupleTableSlot *planSlot)
1743 {
1744  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1745  const char **p_values;
1746  PGresult *res;
1747  int n_rows;
1748 
1749  /* Set up the prepared statement on the remote server, if we didn't yet */
1750  if (!fmstate->p_name)
1751  prepare_foreign_modify(fmstate);
1752 
1753  /* Convert parameters needed by prepared statement to text form */
1754  p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1755 
1756  /*
1757  * Execute the prepared statement.
1758  */
1759  if (!PQsendQueryPrepared(fmstate->conn,
1760  fmstate->p_name,
1761  fmstate->p_nums,
1762  p_values,
1763  NULL,
1764  NULL,
1765  0))
1766  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1767 
1768  /*
1769  * Get the result, and check for success.
1770  *
1771  * We don't use a PG_TRY block here, so be careful not to throw error
1772  * without releasing the PGresult.
1773  */
1774  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1775  if (PQresultStatus(res) !=
1777  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1778 
1779  /* Check number of rows affected, and fetch RETURNING tuple if any */
1780  if (fmstate->has_returning)
1781  {
1782  n_rows = PQntuples(res);
1783  if (n_rows > 0)
1784  store_returning_result(fmstate, slot, res);
1785  }
1786  else
1787  n_rows = atoi(PQcmdTuples(res));
1788 
1789  /* And clean up */
1790  PQclear(res);
1791 
1792  MemoryContextReset(fmstate->temp_cxt);
1793 
1794  /* Return NULL if nothing was inserted on the remote end */
1795  return (n_rows > 0) ? slot : NULL;
1796 }
1797 
1798 /*
1799  * postgresExecForeignUpdate
1800  * Update one row in a foreign table
1801  */
1802 static TupleTableSlot *
1804  ResultRelInfo *resultRelInfo,
1805  TupleTableSlot *slot,
1806  TupleTableSlot *planSlot)
1807 {
1808  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1809  Datum datum;
1810  bool isNull;
1811  const char **p_values;
1812  PGresult *res;
1813  int n_rows;
1814 
1815  /* Set up the prepared statement on the remote server, if we didn't yet */
1816  if (!fmstate->p_name)
1817  prepare_foreign_modify(fmstate);
1818 
1819  /* Get the ctid that was passed up as a resjunk column */
1820  datum = ExecGetJunkAttribute(planSlot,
1821  fmstate->ctidAttno,
1822  &isNull);
1823  /* shouldn't ever get a null result... */
1824  if (isNull)
1825  elog(ERROR, "ctid is NULL");
1826 
1827  /* Convert parameters needed by prepared statement to text form */
1828  p_values = convert_prep_stmt_params(fmstate,
1829  (ItemPointer) DatumGetPointer(datum),
1830  slot);
1831 
1832  /*
1833  * Execute the prepared statement.
1834  */
1835  if (!PQsendQueryPrepared(fmstate->conn,
1836  fmstate->p_name,
1837  fmstate->p_nums,
1838  p_values,
1839  NULL,
1840  NULL,
1841  0))
1842  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1843 
1844  /*
1845  * Get the result, and check for success.
1846  *
1847  * We don't use a PG_TRY block here, so be careful not to throw error
1848  * without releasing the PGresult.
1849  */
1850  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1851  if (PQresultStatus(res) !=
1853  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1854 
1855  /* Check number of rows affected, and fetch RETURNING tuple if any */
1856  if (fmstate->has_returning)
1857  {
1858  n_rows = PQntuples(res);
1859  if (n_rows > 0)
1860  store_returning_result(fmstate, slot, res);
1861  }
1862  else
1863  n_rows = atoi(PQcmdTuples(res));
1864 
1865  /* And clean up */
1866  PQclear(res);
1867 
1868  MemoryContextReset(fmstate->temp_cxt);
1869 
1870  /* Return NULL if nothing was updated on the remote end */
1871  return (n_rows > 0) ? slot : NULL;
1872 }
1873 
1874 /*
1875  * postgresExecForeignDelete
1876  * Delete one row from a foreign table
1877  */
1878 static TupleTableSlot *
1880  ResultRelInfo *resultRelInfo,
1881  TupleTableSlot *slot,
1882  TupleTableSlot *planSlot)
1883 {
1884  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1885  Datum datum;
1886  bool isNull;
1887  const char **p_values;
1888  PGresult *res;
1889  int n_rows;
1890 
1891  /* Set up the prepared statement on the remote server, if we didn't yet */
1892  if (!fmstate->p_name)
1893  prepare_foreign_modify(fmstate);
1894 
1895  /* Get the ctid that was passed up as a resjunk column */
1896  datum = ExecGetJunkAttribute(planSlot,
1897  fmstate->ctidAttno,
1898  &isNull);
1899  /* shouldn't ever get a null result... */
1900  if (isNull)
1901  elog(ERROR, "ctid is NULL");
1902 
1903  /* Convert parameters needed by prepared statement to text form */
1904  p_values = convert_prep_stmt_params(fmstate,
1905  (ItemPointer) DatumGetPointer(datum),
1906  NULL);
1907 
1908  /*
1909  * Execute the prepared statement.
1910  */
1911  if (!PQsendQueryPrepared(fmstate->conn,
1912  fmstate->p_name,
1913  fmstate->p_nums,
1914  p_values,
1915  NULL,
1916  NULL,
1917  0))
1918  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1919 
1920  /*
1921  * Get the result, and check for success.
1922  *
1923  * We don't use a PG_TRY block here, so be careful not to throw error
1924  * without releasing the PGresult.
1925  */
1926  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1927  if (PQresultStatus(res) !=
1929  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1930 
1931  /* Check number of rows affected, and fetch RETURNING tuple if any */
1932  if (fmstate->has_returning)
1933  {
1934  n_rows = PQntuples(res);
1935  if (n_rows > 0)
1936  store_returning_result(fmstate, slot, res);
1937  }
1938  else
1939  n_rows = atoi(PQcmdTuples(res));
1940 
1941  /* And clean up */
1942  PQclear(res);
1943 
1944  MemoryContextReset(fmstate->temp_cxt);
1945 
1946  /* Return NULL if nothing was deleted on the remote end */
1947  return (n_rows > 0) ? slot : NULL;
1948 }
1949 
1950 /*
1951  * postgresEndForeignModify
1952  * Finish an insert/update/delete operation on a foreign table
1953  */
1954 static void
1956  ResultRelInfo *resultRelInfo)
1957 {
1958  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1959 
1960  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1961  if (fmstate == NULL)
1962  return;
1963 
1964  /* Destroy the execution state */
1965  finish_foreign_modify(fmstate);
1966 }
1967 
1968 /*
1969  * postgresBeginForeignInsert
1970  * Begin an insert operation on a foreign table
1971  */
1972 static void
1974  ResultRelInfo *resultRelInfo)
1975 {
1976  PgFdwModifyState *fmstate;
1977  Plan *plan = mtstate->ps.plan;
1978  Relation rel = resultRelInfo->ri_RelationDesc;
1979  RangeTblEntry *rte;
1980  Query *query;
1981  PlannerInfo *root;
1983  int attnum;
1984  StringInfoData sql;
1985  List *targetAttrs = NIL;
1987  bool doNothing = false;
1988 
1989  initStringInfo(&sql);
1990 
1991  /* Set up largely-dummy planner state. */
1992  rte = makeNode(RangeTblEntry);
1993  rte->rtekind = RTE_RELATION;
1994  rte->relid = RelationGetRelid(rel);
1995  rte->relkind = RELKIND_FOREIGN_TABLE;
1996  query = makeNode(Query);
1997  query->commandType = CMD_INSERT;
1998  query->resultRelation = 1;
1999  query->rtable = list_make1(rte);
2000  root = makeNode(PlannerInfo);
2001  root->parse = query;
2002 
2003  /* We transmit all columns that are defined in the foreign table. */
2004  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2005  {
2006  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
2007 
2008  if (!attr->attisdropped)
2009  targetAttrs = lappend_int(targetAttrs, attnum);
2010  }
2011 
2012  /* Check if we add the ON CONFLICT clause to the remote query. */
2013  if (plan)
2014  {
2015  OnConflictAction onConflictAction = ((ModifyTable *) plan)->onConflictAction;
2016 
2017  /* We only support DO NOTHING without an inference specification. */
2018  if (onConflictAction == ONCONFLICT_NOTHING)
2019  doNothing = true;
2020  else if (onConflictAction != ONCONFLICT_NONE)
2021  elog(ERROR, "unexpected ON CONFLICT specification: %d",
2022  (int) onConflictAction);
2023  }
2024 
2025  /* Construct the SQL command string. */
2026  deparseInsertSql(&sql, root, 1, rel, targetAttrs, doNothing,
2027  resultRelInfo->ri_returningList, &retrieved_attrs);
2028 
2029  /* Construct an execution state. */
2030  fmstate = create_foreign_modify(mtstate->ps.state,
2031  resultRelInfo,
2032  CMD_INSERT,
2033  NULL,
2034  sql.data,
2035  targetAttrs,
2036  retrieved_attrs != NIL,
2037  retrieved_attrs);
2038 
2039  resultRelInfo->ri_FdwState = fmstate;
2040 }
2041 
2042 /*
2043  * postgresEndForeignInsert
2044  * Finish an insert operation on a foreign table
2045  */
2046 static void
2048  ResultRelInfo *resultRelInfo)
2049 {
2050  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2051 
2052  Assert(fmstate != NULL);
2053 
2054  /* Destroy the execution state */
2055  finish_foreign_modify(fmstate);
2056 }
2057 
2058 /*
2059  * postgresIsForeignRelUpdatable
2060  * Determine whether a foreign table supports INSERT, UPDATE and/or
2061  * DELETE.
2062  */
2063 static int
2065 {
2066  bool updatable;
2067  ForeignTable *table;
2068  ForeignServer *server;
2069  ListCell *lc;
2070 
2071  /*
2072  * By default, all postgres_fdw foreign tables are assumed updatable. This
2073  * can be overridden by a per-server setting, which in turn can be
2074  * overridden by a per-table setting.
2075  */
2076  updatable = true;
2077 
2078  table = GetForeignTable(RelationGetRelid(rel));
2079  server = GetForeignServer(table->serverid);
2080 
2081  foreach(lc, server->options)
2082  {
2083  DefElem *def = (DefElem *) lfirst(lc);
2084 
2085  if (strcmp(def->defname, "updatable") == 0)
2086  updatable = defGetBoolean(def);
2087  }
2088  foreach(lc, table->options)
2089  {
2090  DefElem *def = (DefElem *) lfirst(lc);
2091 
2092  if (strcmp(def->defname, "updatable") == 0)
2093  updatable = defGetBoolean(def);
2094  }
2095 
2096  /*
2097  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2098  */
2099  return updatable ?
2100  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2101 }
2102 
2103 /*
2104  * postgresRecheckForeignScan
2105  * Execute a local join execution plan for a foreign join
2106  */
2107 static bool
2109 {
2110  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2112  TupleTableSlot *result;
2113 
2114  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2115  if (scanrelid > 0)
2116  return true;
2117 
2118  Assert(outerPlan != NULL);
2119 
2120  /* Execute a local join execution plan */
2121  result = ExecProcNode(outerPlan);
2122  if (TupIsNull(result))
2123  return false;
2124 
2125  /* Store result in the given slot */
2126  ExecCopySlot(slot, result);
2127 
2128  return true;
2129 }
2130 
2131 /*
2132  * postgresPlanDirectModify
2133  * Consider a direct foreign table modification
2134  *
2135  * Decide whether it is safe to modify a foreign table directly, and if so,
2136  * rewrite subplan accordingly.
2137  */
2138 static bool
2140  ModifyTable *plan,
2141  Index resultRelation,
2142  int subplan_index)
2143 {
2144  CmdType operation = plan->operation;
2145  Plan *subplan;
2146  RelOptInfo *foreignrel;
2147  RangeTblEntry *rte;
2148  PgFdwRelationInfo *fpinfo;
2149  Relation rel;
2150  StringInfoData sql;
2151  ForeignScan *fscan;
2152  List *targetAttrs = NIL;
2153  List *remote_exprs;
2154  List *params_list = NIL;
2155  List *returningList = NIL;
2157 
2158  /*
2159  * Decide whether it is safe to modify a foreign table directly.
2160  */
2161 
2162  /*
2163  * The table modification must be an UPDATE or DELETE.
2164  */
2165  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2166  return false;
2167 
2168  /*
2169  * It's unsafe to modify a foreign table directly if there are any local
2170  * joins needed.
2171  */
2172  subplan = (Plan *) list_nth(plan->plans, subplan_index);
2173  if (!IsA(subplan, ForeignScan))
2174  return false;
2175  fscan = (ForeignScan *) subplan;
2176 
2177  /*
2178  * It's unsafe to modify a foreign table directly if there are any quals
2179  * that should be evaluated locally.
2180  */
2181  if (subplan->qual != NIL)
2182  return false;
2183 
2184  /* Safe to fetch data about the target foreign rel */
2185  if (fscan->scan.scanrelid == 0)
2186  {
2187  foreignrel = find_join_rel(root, fscan->fs_relids);
2188  /* We should have a rel for this foreign join. */
2189  Assert(foreignrel);
2190  }
2191  else
2192  foreignrel = root->simple_rel_array[resultRelation];
2193  rte = root->simple_rte_array[resultRelation];
2194  fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2195 
2196  /*
2197  * It's unsafe to update a foreign table directly, if any expressions to
2198  * assign to the target columns are unsafe to evaluate remotely.
2199  */
2200  if (operation == CMD_UPDATE)
2201  {
2202  int col;
2203 
2204  /*
2205  * We transmit only columns that were explicitly targets of the
2206  * UPDATE, so as to avoid unnecessary data transmission.
2207  */
2208  col = -1;
2209  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2210  {
2211  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2213  TargetEntry *tle;
2214 
2215  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2216  elog(ERROR, "system-column update is not supported");
2217 
2218  tle = get_tle_by_resno(subplan->targetlist, attno);
2219 
2220  if (!tle)
2221  elog(ERROR, "attribute number %d not found in subplan targetlist",
2222  attno);
2223 
2224  if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2225  return false;
2226 
2227  targetAttrs = lappend_int(targetAttrs, attno);
2228  }
2229  }
2230 
2231  /*
2232  * Ok, rewrite subplan so as to modify the foreign table directly.
2233  */
2234  initStringInfo(&sql);
2235 
2236  /*
2237  * Core code already has some lock on each rel being planned, so we can
2238  * use NoLock here.
2239  */
2240  rel = heap_open(rte->relid, NoLock);
2241 
2242  /*
2243  * Recall the qual clauses that must be evaluated remotely. (These are
2244  * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2245  * doesn't care.)
2246  */
2247  remote_exprs = fpinfo->final_remote_exprs;
2248 
2249  /*
2250  * Extract the relevant RETURNING list if any.
2251  */
2252  if (plan->returningLists)
2253  {
2254  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2255 
2256  /*
2257  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2258  * we fetch from the foreign server any Vars specified in RETURNING
2259  * that refer not only to the target relation but to non-target
2260  * relations. So we'll deparse them into the RETURNING clause of the
2261  * remote query; use a targetlist consisting of them instead, which
2262  * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2263  * node below.
2264  */
2265  if (fscan->scan.scanrelid == 0)
2266  returningList = build_remote_returning(resultRelation, rel,
2267  returningList);
2268  }
2269 
2270  /*
2271  * Construct the SQL command string.
2272  */
2273  switch (operation)
2274  {
2275  case CMD_UPDATE:
2276  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2277  foreignrel,
2278  ((Plan *) fscan)->targetlist,
2279  targetAttrs,
2280  remote_exprs, &params_list,
2281  returningList, &retrieved_attrs);
2282  break;
2283  case CMD_DELETE:
2284  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2285  foreignrel,
2286  remote_exprs, &params_list,
2287  returningList, &retrieved_attrs);
2288  break;
2289  default:
2290  elog(ERROR, "unexpected operation: %d", (int) operation);
2291  break;
2292  }
2293 
2294  /*
2295  * Update the operation info.
2296  */
2297  fscan->operation = operation;
2298 
2299  /*
2300  * Update the fdw_exprs list that will be available to the executor.
2301  */
2302  fscan->fdw_exprs = params_list;
2303 
2304  /*
2305  * Update the fdw_private list that will be available to the executor.
2306  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2307  */
2308  fscan->fdw_private = list_make4(makeString(sql.data),
2309  makeInteger((retrieved_attrs != NIL)),
2310  retrieved_attrs,
2311  makeInteger(plan->canSetTag));
2312 
2313  /*
2314  * Update the foreign-join-related fields.
2315  */
2316  if (fscan->scan.scanrelid == 0)
2317  {
2318  /* No need for the outer subplan. */
2319  fscan->scan.plan.lefttree = NULL;
2320 
2321  /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2322  if (returningList)
2323  rebuild_fdw_scan_tlist(fscan, returningList);
2324  }
2325 
2326  heap_close(rel, NoLock);
2327  return true;
2328 }
2329 
2330 /*
2331  * postgresBeginDirectModify
2332  * Prepare a direct foreign table modification
2333  */
2334 static void
2336 {
2337  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2338  EState *estate = node->ss.ps.state;
2339  PgFdwDirectModifyState *dmstate;
2340  Index rtindex;
2341  RangeTblEntry *rte;
2342  Oid userid;
2343  ForeignTable *table;
2344  UserMapping *user;
2345  int numParams;
2346 
2347  /*
2348  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2349  */
2350  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2351  return;
2352 
2353  /*
2354  * We'll save private state in node->fdw_state.
2355  */
2356  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2357  node->fdw_state = (void *) dmstate;
2358 
2359  /*
2360  * Identify which user to do the remote access as. This should match what
2361  * ExecCheckRTEPerms() does.
2362  */
2363  rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2364  rte = rt_fetch(rtindex, estate->es_range_table);
2365  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2366 
2367  /* Get info about foreign table. */
2368  if (fsplan->scan.scanrelid == 0)
2369  dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2370  else
2371  dmstate->rel = node->ss.ss_currentRelation;
2372  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2373  user = GetUserMapping(userid, table->serverid);
2374 
2375  /*
2376  * Get connection to the foreign server. Connection manager will
2377  * establish new connection if necessary.
2378  */
2379  dmstate->conn = GetConnection(user, false);
2380 
2381  /* Update the foreign-join-related fields. */
2382  if (fsplan->scan.scanrelid == 0)
2383  {
2384  /* Save info about foreign table. */
2385  dmstate->resultRel = dmstate->rel;
2386 
2387  /*
2388  * Set dmstate->rel to NULL to teach get_returning_data() and
2389  * make_tuple_from_result_row() that columns fetched from the remote
2390  * server are described by fdw_scan_tlist of the foreign-scan plan
2391  * node, not the tuple descriptor for the target relation.
2392  */
2393  dmstate->rel = NULL;
2394  }
2395 
2396  /* Initialize state variable */
2397  dmstate->num_tuples = -1; /* -1 means not set yet */
2398 
2399  /* Get private info created by planner functions. */
2400  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2402  dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2404  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2406  dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2408 
2409  /* Create context for per-tuple temp workspace. */
2410  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2411  "postgres_fdw temporary data",
2413 
2414  /* Prepare for input conversion of RETURNING results. */
2415  if (dmstate->has_returning)
2416  {
2418 
2419  if (fsplan->scan.scanrelid == 0)
2420  tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2421  else
2422  tupdesc = RelationGetDescr(dmstate->rel);
2423 
2424  dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2425 
2426  /*
2427  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2428  * initialize a filter to extract an updated/deleted tuple from a scan
2429  * tuple.
2430  */
2431  if (fsplan->scan.scanrelid == 0)
2432  init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2433  }
2434 
2435  /*
2436  * Prepare for processing of parameters used in remote query, if any.
2437  */
2438  numParams = list_length(fsplan->fdw_exprs);
2439  dmstate->numParams = numParams;
2440  if (numParams > 0)
2442  fsplan->fdw_exprs,
2443  numParams,
2444  &dmstate->param_flinfo,
2445  &dmstate->param_exprs,
2446  &dmstate->param_values);
2447 }
2448 
2449 /*
2450  * postgresIterateDirectModify
2451  * Execute a direct foreign table modification
2452  */
2453 static TupleTableSlot *
2455 {
2457  EState *estate = node->ss.ps.state;
2458  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2459 
2460  /*
2461  * If this is the first call after Begin, execute the statement.
2462  */
2463  if (dmstate->num_tuples == -1)
2464  execute_dml_stmt(node);
2465 
2466  /*
2467  * If the local query doesn't specify RETURNING, just clear tuple slot.
2468  */
2469  if (!resultRelInfo->ri_projectReturning)
2470  {
2471  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2472  Instrumentation *instr = node->ss.ps.instrument;
2473 
2474  Assert(!dmstate->has_returning);
2475 
2476  /* Increment the command es_processed count if necessary. */
2477  if (dmstate->set_processed)
2478  estate->es_processed += dmstate->num_tuples;
2479 
2480  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2481  if (instr)
2482  instr->tuplecount += dmstate->num_tuples;
2483 
2484  return ExecClearTuple(slot);
2485  }
2486 
2487  /*
2488  * Get the next RETURNING tuple.
2489  */
2490  return get_returning_data(node);
2491 }
2492 
2493 /*
2494  * postgresEndDirectModify
2495  * Finish a direct foreign table modification
2496  */
2497 static void
2499 {
2501 
2502  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2503  if (dmstate == NULL)
2504  return;
2505 
2506  /* Release PGresult */
2507  if (dmstate->result)
2508  PQclear(dmstate->result);
2509 
2510  /* Release remote connection */
2511  ReleaseConnection(dmstate->conn);
2512  dmstate->conn = NULL;
2513 
2514  /* close the target relation. */
2515  if (dmstate->resultRel)
2516  ExecCloseScanRelation(dmstate->resultRel);
2517 
2518  /* MemoryContext will be deleted automatically. */
2519 }
2520 
2521 /*
2522  * postgresExplainForeignScan
2523  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2524  */
2525 static void
2527 {
2528  List *fdw_private;
2529  char *sql;
2530  char *relations;
2531 
2532  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2533 
2534  /*
2535  * Add names of relation handled by the foreign scan when the scan is a
2536  * join
2537  */
2538  if (list_length(fdw_private) > FdwScanPrivateRelations)
2539  {
2540  relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2541  ExplainPropertyText("Relations", relations, es);
2542  }
2543 
2544  /*
2545  * Add remote query, when VERBOSE option is specified.
2546  */
2547  if (es->verbose)
2548  {
2549  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2550  ExplainPropertyText("Remote SQL", sql, es);
2551  }
2552 }
2553 
2554 /*
2555  * postgresExplainForeignModify
2556  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2557  */
2558 static void
2560  ResultRelInfo *rinfo,
2561  List *fdw_private,
2562  int subplan_index,
2563  ExplainState *es)
2564 {
2565  if (es->verbose)
2566  {
2567  char *sql = strVal(list_nth(fdw_private,
2569 
2570  ExplainPropertyText("Remote SQL", sql, es);
2571  }
2572 }
2573 
2574 /*
2575  * postgresExplainDirectModify
2576  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2577  * foreign table directly
2578  */
2579 static void
2581 {
2582  List *fdw_private;
2583  char *sql;
2584 
2585  if (es->verbose)
2586  {
2587  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2588  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2589  ExplainPropertyText("Remote SQL", sql, es);
2590  }
2591 }
2592 
2593 
2594 /*
2595  * estimate_path_cost_size
2596  * Get cost and size estimates for a foreign scan on given foreign relation
2597  * either a base relation or a join between foreign relations or an upper
2598  * relation containing foreign relations.
2599  *
2600  * param_join_conds are the parameterization clauses with outer relations.
2601  * pathkeys specify the expected sort order if any for given path being costed.
2602  *
2603  * The function returns the cost and size estimates in p_row, p_width,
2604  * p_startup_cost and p_total_cost variables.
2605  */
2606 static void
2608  RelOptInfo *foreignrel,
2609  List *param_join_conds,
2610  List *pathkeys,
2611  double *p_rows, int *p_width,
2612  Cost *p_startup_cost, Cost *p_total_cost)
2613 {
2614  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2615  double rows;
2616  double retrieved_rows;
2617  int width;
2618  Cost startup_cost;
2619  Cost total_cost;
2620  Cost cpu_per_tuple;
2621 
2622  /*
2623  * If the table or the server is configured to use remote estimates,
2624  * connect to the foreign server and execute EXPLAIN to estimate the
2625  * number of rows selected by the restriction+join clauses. Otherwise,
2626  * estimate rows using whatever statistics we have locally, in a way
2627  * similar to ordinary tables.
2628  */
2629  if (fpinfo->use_remote_estimate)
2630  {
2631  List *remote_param_join_conds;
2632  List *local_param_join_conds;
2633  StringInfoData sql;
2634  PGconn *conn;
2635  Selectivity local_sel;
2636  QualCost local_cost;
2637  List *fdw_scan_tlist = NIL;
2638  List *remote_conds;
2639 
2640  /* Required only to be passed to deparseSelectStmtForRel */
2642 
2643  /*
2644  * param_join_conds might contain both clauses that are safe to send
2645  * across, and clauses that aren't.
2646  */
2647  classifyConditions(root, foreignrel, param_join_conds,
2648  &remote_param_join_conds, &local_param_join_conds);
2649 
2650  /* Build the list of columns to be fetched from the foreign server. */
2651  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2652  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2653  else
2654  fdw_scan_tlist = NIL;
2655 
2656  /*
2657  * The complete list of remote conditions includes everything from
2658  * baserestrictinfo plus any extra join_conds relevant to this
2659  * particular path.
2660  */
2661  remote_conds = list_concat(list_copy(remote_param_join_conds),
2662  fpinfo->remote_conds);
2663 
2664  /*
2665  * Construct EXPLAIN query including the desired SELECT, FROM, and
2666  * WHERE clauses. Params and other-relation Vars are replaced by dummy
2667  * values, so don't request params_list.
2668  */
2669  initStringInfo(&sql);
2670  appendStringInfoString(&sql, "EXPLAIN ");
2671  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2672  remote_conds, pathkeys, false,
2673  &retrieved_attrs, NULL);
2674 
2675  /* Get the remote estimate */
2676  conn = GetConnection(fpinfo->user, false);
2677  get_remote_estimate(sql.data, conn, &rows, &width,
2678  &startup_cost, &total_cost);
2679  ReleaseConnection(conn);
2680 
2681  retrieved_rows = rows;
2682 
2683  /* Factor in the selectivity of the locally-checked quals */
2684  local_sel = clauselist_selectivity(root,
2685  local_param_join_conds,
2686  foreignrel->relid,
2687  JOIN_INNER,
2688  NULL);
2689  local_sel *= fpinfo->local_conds_sel;
2690 
2691  rows = clamp_row_est(rows * local_sel);
2692 
2693  /* Add in the eval cost of the locally-checked quals */
2694  startup_cost += fpinfo->local_conds_cost.startup;
2695  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2696  cost_qual_eval(&local_cost, local_param_join_conds, root);
2697  startup_cost += local_cost.startup;
2698  total_cost += local_cost.per_tuple * retrieved_rows;
2699  }
2700  else
2701  {
2702  Cost run_cost = 0;
2703 
2704  /*
2705  * We don't support join conditions in this mode (hence, no
2706  * parameterized paths can be made).
2707  */
2708  Assert(param_join_conds == NIL);
2709 
2710  /*
2711  * Use rows/width estimates made by set_baserel_size_estimates() for
2712  * base foreign relations and set_joinrel_size_estimates() for join
2713  * between foreign relations.
2714  */
2715  rows = foreignrel->rows;
2716  width = foreignrel->reltarget->width;
2717 
2718  /* Back into an estimate of the number of retrieved rows. */
2719  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2720 
2721  /*
2722  * We will come here again and again with different set of pathkeys
2723  * that caller wants to cost. We don't need to calculate the cost of
2724  * bare scan each time. Instead, use the costs if we have cached them
2725  * already.
2726  */
2727  if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2728  {
2729  startup_cost = fpinfo->rel_startup_cost;
2730  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2731  }
2732  else if (IS_JOIN_REL(foreignrel))
2733  {
2734  PgFdwRelationInfo *fpinfo_i;
2735  PgFdwRelationInfo *fpinfo_o;
2736  QualCost join_cost;
2737  QualCost remote_conds_cost;
2738  double nrows;
2739 
2740  /* For join we expect inner and outer relations set */
2741  Assert(fpinfo->innerrel && fpinfo->outerrel);
2742 
2743  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2744  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2745 
2746  /* Estimate of number of rows in cross product */
2747  nrows = fpinfo_i->rows * fpinfo_o->rows;
2748  /* Clamp retrieved rows estimate to at most size of cross product */
2749  retrieved_rows = Min(retrieved_rows, nrows);
2750 
2751  /*
2752  * The cost of foreign join is estimated as cost of generating
2753  * rows for the joining relations + cost for applying quals on the
2754  * rows.
2755  */
2756 
2757  /*
2758  * Calculate the cost of clauses pushed down to the foreign server
2759  */
2760  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2761  /* Calculate the cost of applying join clauses */
2762  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2763 
2764  /*
2765  * Startup cost includes startup cost of joining relations and the
2766  * startup cost for join and other clauses. We do not include the
2767  * startup cost specific to join strategy (e.g. setting up hash
2768  * tables) since we do not know what strategy the foreign server
2769  * is going to use.
2770  */
2771  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2772  startup_cost += join_cost.startup;
2773  startup_cost += remote_conds_cost.startup;
2774  startup_cost += fpinfo->local_conds_cost.startup;
2775 
2776  /*
2777  * Run time cost includes:
2778  *
2779  * 1. Run time cost (total_cost - startup_cost) of relations being
2780  * joined
2781  *
2782  * 2. Run time cost of applying join clauses on the cross product
2783  * of the joining relations.
2784  *
2785  * 3. Run time cost of applying pushed down other clauses on the
2786  * result of join
2787  *
2788  * 4. Run time cost of applying nonpushable other clauses locally
2789  * on the result fetched from the foreign server.
2790  */
2791  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2792  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2793  run_cost += nrows * join_cost.per_tuple;
2794  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2795  run_cost += nrows * remote_conds_cost.per_tuple;
2796  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2797  }
2798  else if (IS_UPPER_REL(foreignrel))
2799  {
2800  PgFdwRelationInfo *ofpinfo;
2801  PathTarget *ptarget = foreignrel->reltarget;
2802  AggClauseCosts aggcosts;
2803  double input_rows;
2804  int numGroupCols;
2805  double numGroups = 1;
2806 
2807  /* Make sure the core code set the pathtarget. */
2808  Assert(ptarget != NULL);
2809 
2810  /*
2811  * This cost model is mixture of costing done for sorted and
2812  * hashed aggregates in cost_agg(). We are not sure which
2813  * strategy will be considered at remote side, thus for
2814  * simplicity, we put all startup related costs in startup_cost
2815  * and all finalization and run cost are added in total_cost.
2816  *
2817  * Also, core does not care about costing HAVING expressions and
2818  * adding that to the costs. So similarly, here too we are not
2819  * considering remote and local conditions for costing.
2820  */
2821 
2822  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2823 
2824  /* Get rows and width from input rel */
2825  input_rows = ofpinfo->rows;
2826  width = ofpinfo->width;
2827 
2828  /* Collect statistics about aggregates for estimating costs. */
2829  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2830  if (root->parse->hasAggs)
2831  {
2832  get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2833  AGGSPLIT_SIMPLE, &aggcosts);
2834 
2835  /*
2836  * The cost of aggregates in the HAVING qual will be the same
2837  * for each child as it is for the parent, so there's no need
2838  * to use a translated version of havingQual.
2839  */
2840  get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2841  AGGSPLIT_SIMPLE, &aggcosts);
2842  }
2843 
2844  /* Get number of grouping columns and possible number of groups */
2845  numGroupCols = list_length(root->parse->groupClause);
2846  numGroups = estimate_num_groups(root,
2848  fpinfo->grouped_tlist),
2849  input_rows, NULL);
2850 
2851  /*
2852  * Number of rows expected from foreign server will be same as
2853  * that of number of groups.
2854  */
2855  rows = retrieved_rows = numGroups;
2856 
2857  /*-----
2858  * Startup cost includes:
2859  * 1. Startup cost for underneath input * relation
2860  * 2. Cost of performing aggregation, per cost_agg()
2861  * 3. Startup cost for PathTarget eval
2862  *-----
2863  */
2864  startup_cost = ofpinfo->rel_startup_cost;
2865  startup_cost += aggcosts.transCost.startup;
2866  startup_cost += aggcosts.transCost.per_tuple * input_rows;
2867  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2868  startup_cost += ptarget->cost.startup;
2869 
2870  /*-----
2871  * Run time cost includes:
2872  * 1. Run time cost of underneath input relation
2873  * 2. Run time cost of performing aggregation, per cost_agg()
2874  * 3. PathTarget eval cost for each output row
2875  *-----
2876  */
2877  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2878  run_cost += aggcosts.finalCost * numGroups;
2879  run_cost += cpu_tuple_cost * numGroups;
2880  run_cost += ptarget->cost.per_tuple * numGroups;
2881  }
2882  else
2883  {
2884  /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2885  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2886 
2887  /*
2888  * Cost as though this were a seqscan, which is pessimistic. We
2889  * effectively imagine the local_conds are being evaluated
2890  * remotely, too.
2891  */
2892  startup_cost = 0;
2893  run_cost = 0;
2894  run_cost += seq_page_cost * foreignrel->pages;
2895 
2896  startup_cost += foreignrel->baserestrictcost.startup;
2897  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2898  run_cost += cpu_per_tuple * foreignrel->tuples;
2899  }
2900 
2901  /*
2902  * Without remote estimates, we have no real way to estimate the cost
2903  * of generating sorted output. It could be free if the query plan
2904  * the remote side would have chosen generates properly-sorted output
2905  * anyway, but in most cases it will cost something. Estimate a value
2906  * high enough that we won't pick the sorted path when the ordering
2907  * isn't locally useful, but low enough that we'll err on the side of
2908  * pushing down the ORDER BY clause when it's useful to do so.
2909  */
2910  if (pathkeys != NIL)
2911  {
2912  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2913  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2914  }
2915 
2916  total_cost = startup_cost + run_cost;
2917  }
2918 
2919  /*
2920  * Cache the costs for scans without any pathkeys or parameterization
2921  * before adding the costs for transferring data from the foreign server.
2922  * These costs are useful for costing the join between this relation and
2923  * another foreign relation or to calculate the costs of paths with
2924  * pathkeys for this relation, when the costs can not be obtained from the
2925  * foreign server. This function will be called at least once for every
2926  * foreign relation without pathkeys and parameterization.
2927  */
2928  if (pathkeys == NIL && param_join_conds == NIL)
2929  {
2930  fpinfo->rel_startup_cost = startup_cost;
2931  fpinfo->rel_total_cost = total_cost;
2932  }
2933 
2934  /*
2935  * Add some additional cost factors to account for connection overhead
2936  * (fdw_startup_cost), transferring data across the network
2937  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2938  * (cpu_tuple_cost per retrieved row).
2939  */
2940  startup_cost += fpinfo->fdw_startup_cost;
2941  total_cost += fpinfo->fdw_startup_cost;
2942  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2943  total_cost += cpu_tuple_cost * retrieved_rows;
2944 
2945  /* Return results. */
2946  *p_rows = rows;
2947  *p_width = width;
2948  *p_startup_cost = startup_cost;
2949  *p_total_cost = total_cost;
2950 }
2951 
2952 /*
2953  * Estimate costs of executing a SQL statement remotely.
2954  * The given "sql" must be an EXPLAIN command.
2955  */
2956 static void
2957 get_remote_estimate(const char *sql, PGconn *conn,
2958  double *rows, int *width,
2959  Cost *startup_cost, Cost *total_cost)
2960 {
2961  PGresult *volatile res = NULL;
2962 
2963  /* PGresult must be released before leaving this function. */
2964  PG_TRY();
2965  {
2966  char *line;
2967  char *p;
2968  int n;
2969 
2970  /*
2971  * Execute EXPLAIN remotely.
2972  */
2973  res = pgfdw_exec_query(conn, sql);
2974  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2975  pgfdw_report_error(ERROR, res, conn, false, sql);
2976 
2977  /*
2978  * Extract cost numbers for topmost plan node. Note we search for a
2979  * left paren from the end of the line to avoid being confused by
2980  * other uses of parentheses.
2981  */
2982  line = PQgetvalue(res, 0, 0);
2983  p = strrchr(line, '(');
2984  if (p == NULL)
2985  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2986  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2987  startup_cost, total_cost, rows, width);
2988  if (n != 4)
2989  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2990 
2991  PQclear(res);
2992  res = NULL;
2993  }
2994  PG_CATCH();
2995  {
2996  if (res)
2997  PQclear(res);
2998  PG_RE_THROW();
2999  }
3000  PG_END_TRY();
3001 }
3002 
3003 /*
3004  * Detect whether we want to process an EquivalenceClass member.
3005  *
3006  * This is a callback for use by generate_implied_equalities_for_column.
3007  */
3008 static bool
3011  void *arg)
3012 {
3014  Expr *expr = em->em_expr;
3015 
3016  /*
3017  * If we've identified what we're processing in the current scan, we only
3018  * want to match that expression.
3019  */
3020  if (state->current != NULL)
3021  return equal(expr, state->current);
3022 
3023  /*
3024  * Otherwise, ignore anything we've already processed.
3025  */
3026  if (list_member(state->already_used, expr))
3027  return false;
3028 
3029  /* This is the new target to process. */
3030  state->current = expr;
3031  return true;
3032 }
3033 
3034 /*
3035  * Create cursor for node's query with current parameter values.
3036  */
3037 static void
3039 {
3040  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3041  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3042  int numParams = fsstate->numParams;
3043  const char **values = fsstate->param_values;
3044  PGconn *conn = fsstate->conn;
3046  PGresult *res;
3047 
3048  /*
3049  * Construct array of query parameter values in text format. We do the
3050  * conversions in the short-lived per-tuple context, so as not to cause a
3051  * memory leak over repeated scans.
3052  */
3053  if (numParams > 0)
3054  {
3055  MemoryContext oldcontext;
3056 
3057  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3058 
3059  process_query_params(econtext,
3060  fsstate->param_flinfo,
3061  fsstate->param_exprs,
3062  values);
3063 
3064  MemoryContextSwitchTo(oldcontext);
3065  }
3066 
3067  /* Construct the DECLARE CURSOR command */
3068  initStringInfo(&buf);
3069  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3070  fsstate->cursor_number, fsstate->query);
3071 
3072  /*
3073  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3074  * to infer types for all parameters. Since we explicitly cast every
3075  * parameter (see deparse.c), the "inference" is trivial and will produce
3076  * the desired result. This allows us to avoid assuming that the remote
3077  * server has the same OIDs we do for the parameters' types.
3078  */
3079  if (!PQsendQueryParams(conn, buf.data, numParams,
3080  NULL, values, NULL, NULL, 0))
3081  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3082 
3083  /*
3084  * Get the result, and check for success.
3085  *
3086  * We don't use a PG_TRY block here, so be careful not to throw error
3087  * without releasing the PGresult.
3088  */
3089  res = pgfdw_get_result(conn, buf.data);
3090  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3091  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3092  PQclear(res);
3093 
3094  /* Mark the cursor as created, and show no tuples have been retrieved */
3095  fsstate->cursor_exists = true;
3096  fsstate->tuples = NULL;
3097  fsstate->num_tuples = 0;
3098  fsstate->next_tuple = 0;
3099  fsstate->fetch_ct_2 = 0;
3100  fsstate->eof_reached = false;
3101 
3102  /* Clean up */
3103  pfree(buf.data);
3104 }
3105 
3106 /*
3107  * Fetch some more rows from the node's cursor.
3108  */
3109 static void
3111 {
3112  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3113  PGresult *volatile res = NULL;
3114  MemoryContext oldcontext;
3115 
3116  /*
3117  * We'll store the tuples in the batch_cxt. First, flush the previous
3118  * batch.
3119  */
3120  fsstate->tuples = NULL;
3121  MemoryContextReset(fsstate->batch_cxt);
3122  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3123 
3124  /* PGresult must be released before leaving this function. */
3125  PG_TRY();
3126  {
3127  PGconn *conn = fsstate->conn;
3128  char sql[64];
3129  int numrows;
3130  int i;
3131 
3132  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3133  fsstate->fetch_size, fsstate->cursor_number);
3134 
3135  res = pgfdw_exec_query(conn, sql);
3136  /* On error, report the original query, not the FETCH. */
3137  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3138  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3139 
3140  /* Convert the data into HeapTuples */
3141  numrows = PQntuples(res);
3142  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3143  fsstate->num_tuples = numrows;
3144  fsstate->next_tuple = 0;
3145 
3146  for (i = 0; i < numrows; i++)
3147  {
3148  Assert(IsA(node->ss.ps.plan, ForeignScan));
3149 
3150  fsstate->tuples[i] =
3152  fsstate->rel,
3153  fsstate->attinmeta,
3154  fsstate->retrieved_attrs,
3155  node,
3156  fsstate->temp_cxt);
3157  }
3158 
3159  /* Update fetch_ct_2 */
3160  if (fsstate->fetch_ct_2 < 2)
3161  fsstate->fetch_ct_2++;
3162 
3163  /* Must be EOF if we didn't get as many tuples as we asked for. */
3164  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3165 
3166  PQclear(res);
3167  res = NULL;
3168  }
3169  PG_CATCH();
3170  {
3171  if (res)
3172  PQclear(res);
3173  PG_RE_THROW();
3174  }
3175  PG_END_TRY();
3176 
3177  MemoryContextSwitchTo(oldcontext);
3178 }
3179 
3180 /*
3181  * Force assorted GUC parameters to settings that ensure that we'll output
3182  * data values in a form that is unambiguous to the remote server.
3183  *
3184  * This is rather expensive and annoying to do once per row, but there's
3185  * little choice if we want to be sure values are transmitted accurately;
3186  * we can't leave the settings in place between rows for fear of affecting
3187  * user-visible computations.
3188  *
3189  * We use the equivalent of a function SET option to allow the settings to
3190  * persist only until the caller calls reset_transmission_modes(). If an
3191  * error is thrown in between, guc.c will take care of undoing the settings.
3192  *
3193  * The return value is the nestlevel that must be passed to
3194  * reset_transmission_modes() to undo things.
3195  */
3196 int
3198 {
3199  int nestlevel = NewGUCNestLevel();
3200 
3201  /*
3202  * The values set here should match what pg_dump does. See also
3203  * configure_remote_session in connection.c.
3204  */
3205  if (DateStyle != USE_ISO_DATES)
3206  (void) set_config_option("datestyle", "ISO",
3208  GUC_ACTION_SAVE, true, 0, false);
3210  (void) set_config_option("intervalstyle", "postgres",
3212  GUC_ACTION_SAVE, true, 0, false);
3213  if (extra_float_digits < 3)
3214  (void) set_config_option("extra_float_digits", "3",
3216  GUC_ACTION_SAVE, true, 0, false);
3217 
3218  return nestlevel;
3219 }
3220 
3221 /*
3222  * Undo the effects of set_transmission_modes().
3223  */
3224 void
3226 {
3227  AtEOXact_GUC(true, nestlevel);
3228 }
3229 
3230 /*
3231  * Utility routine to close a cursor.
3232  */
3233 static void
3235 {
3236  char sql[64];
3237  PGresult *res;
3238 
3239  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3240 
3241  /*
3242  * We don't use a PG_TRY block here, so be careful not to throw error
3243  * without releasing the PGresult.
3244  */
3245  res = pgfdw_exec_query(conn, sql);
3246  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3247  pgfdw_report_error(ERROR, res, conn, true, sql);
3248  PQclear(res);
3249 }
3250 
3251 /*
3252  * create_foreign_modify
3253  * Construct an execution state of a foreign insert/update/delete
3254  * operation
3255  */
3256 static PgFdwModifyState *
3258  ResultRelInfo *resultRelInfo,
3259  CmdType operation,
3260  Plan *subplan,
3261  char *query,
3262  List *target_attrs,
3263  bool has_returning,
3265 {
3266  PgFdwModifyState *fmstate;
3267  Relation rel = resultRelInfo->ri_RelationDesc;
3269  RangeTblEntry *rte;
3270  Oid userid;
3271  ForeignTable *table;
3272  UserMapping *user;
3273  AttrNumber n_params;
3274  Oid typefnoid;
3275  bool isvarlena;
3276  ListCell *lc;
3277 
3278  /* Begin constructing PgFdwModifyState. */
3279  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3280  fmstate->rel = rel;
3281 
3282  /*
3283  * Identify which user to do the remote access as. This should match what
3284  * ExecCheckRTEPerms() does.
3285  */
3286  rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
3287  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3288 
3289  /* Get info about foreign table. */
3290  table = GetForeignTable(RelationGetRelid(rel));
3291  user = GetUserMapping(userid, table->serverid);
3292 
3293  /* Open connection; report that we'll create a prepared statement. */
3294  fmstate->conn = GetConnection(user, true);
3295  fmstate->p_name = NULL; /* prepared statement not made yet */
3296 
3297  /* Set up remote query information. */
3298  fmstate->query = query;
3299  fmstate->target_attrs = target_attrs;
3300  fmstate->has_returning = has_returning;
3301  fmstate->retrieved_attrs = retrieved_attrs;
3302 
3303  /* Create context for per-tuple temp workspace. */
3304  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3305  "postgres_fdw temporary data",
3307 
3308  /* Prepare for input conversion of RETURNING results. */
3309  if (fmstate->has_returning)
3310  fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3311 
3312  /* Prepare for output conversion of parameters used in prepared stmt. */
3313  n_params = list_length(fmstate->target_attrs) + 1;
3314  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3315  fmstate->p_nums = 0;
3316 
3317  if (operation == CMD_UPDATE || operation == CMD_DELETE)
3318  {
3319  Assert(subplan != NULL);
3320 
3321  /* Find the ctid resjunk column in the subplan's result */
3323  "ctid");
3324  if (!AttributeNumberIsValid(fmstate->ctidAttno))
3325  elog(ERROR, "could not find junk ctid column");
3326 
3327  /* First transmittable parameter will be ctid */
3328  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3329  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3330  fmstate->p_nums++;
3331  }
3332 
3333  if (operation == CMD_INSERT || operation == CMD_UPDATE)
3334  {
3335  /* Set up for remaining transmittable parameters */
3336  foreach(lc, fmstate->target_attrs)
3337  {
3338  int attnum = lfirst_int(lc);
3339  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3340 
3341  Assert(!attr->attisdropped);
3342 
3343  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3344  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3345  fmstate->p_nums++;
3346  }
3347  }
3348 
3349  Assert(fmstate->p_nums <= n_params);
3350 
3351  return fmstate;
3352 }
3353 
3354 /*
3355  * prepare_foreign_modify
3356  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3357  */
3358 static void
3360 {
3361  char prep_name[NAMEDATALEN];
3362  char *p_name;
3363  PGresult *res;
3364 
3365  /* Construct name we'll use for the prepared statement. */
3366  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3367  GetPrepStmtNumber(fmstate->conn));
3368  p_name = pstrdup(prep_name);
3369 
3370  /*
3371  * We intentionally do not specify parameter types here, but leave the
3372  * remote server to derive them by default. This avoids possible problems
3373  * with the remote server using different type OIDs than we do. All of
3374  * the prepared statements we use in this module are simple enough that
3375  * the remote server will make the right choices.
3376  */
3377  if (!PQsendPrepare(fmstate->conn,
3378  p_name,
3379  fmstate->query,
3380  0,
3381  NULL))
3382  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3383 
3384  /*
3385  * Get the result, and check for success.
3386  *
3387  * We don't use a PG_TRY block here, so be careful not to throw error
3388  * without releasing the PGresult.
3389  */
3390  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3391  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3392  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3393  PQclear(res);
3394 
3395  /* This action shows that the prepare has been done. */
3396  fmstate->p_name = p_name;
3397 }
3398 
3399 /*
3400  * convert_prep_stmt_params
3401  * Create array of text strings representing parameter values
3402  *
3403  * tupleid is ctid to send, or NULL if none
3404  * slot is slot to get remaining parameters from, or NULL if none
3405  *
3406  * Data is constructed in temp_cxt; caller should reset that after use.
3407  */
3408 static const char **
3410  ItemPointer tupleid,
3411  TupleTableSlot *slot)
3412 {
3413  const char **p_values;
3414  int pindex = 0;
3415  MemoryContext oldcontext;
3416 
3417  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3418 
3419  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3420 
3421  /* 1st parameter should be ctid, if it's in use */
3422  if (tupleid != NULL)
3423  {
3424  /* don't need set_transmission_modes for TID output */
3425  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3426  PointerGetDatum(tupleid));
3427  pindex++;
3428  }
3429 
3430  /* get following parameters from slot */
3431  if (slot != NULL && fmstate->target_attrs != NIL)
3432  {
3433  int nestlevel;
3434  ListCell *lc;
3435 
3436  nestlevel = set_transmission_modes();
3437 
3438  foreach(lc, fmstate->target_attrs)
3439  {
3440  int attnum = lfirst_int(lc);
3441  Datum value;
3442  bool isnull;
3443 
3444  value = slot_getattr(slot, attnum, &isnull);
3445  if (isnull)
3446  p_values[pindex] = NULL;
3447  else
3448  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3449  value);
3450  pindex++;
3451  }
3452 
3453  reset_transmission_modes(nestlevel);
3454  }
3455 
3456  Assert(pindex == fmstate->p_nums);
3457 
3458  MemoryContextSwitchTo(oldcontext);
3459 
3460  return p_values;
3461 }
3462 
3463 /*
3464  * store_returning_result
3465  * Store the result of a RETURNING clause
3466  *
3467  * On error, be sure to release the PGresult on the way out. Callers do not
3468  * have PG_TRY blocks to ensure this happens.
3469  */
3470 static void
3472  TupleTableSlot *slot, PGresult *res)
3473 {
3474  PG_TRY();
3475  {
3476  HeapTuple newtup;
3477 
3478  newtup = make_tuple_from_result_row(res, 0,
3479  fmstate->rel,
3480  fmstate->attinmeta,
3481  fmstate->retrieved_attrs,
3482  NULL,
3483  fmstate->temp_cxt);
3484  /* tuple will be deleted when it is cleared from the slot */
3485  ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3486  }
3487  PG_CATCH();
3488  {
3489  if (res)
3490  PQclear(res);
3491  PG_RE_THROW();
3492  }
3493  PG_END_TRY();
3494 }
3495 
3496 /*
3497  * finish_foreign_modify
3498  * Release resources for a foreign insert/update/delete operation
3499  */
3500 static void
3502 {
3503  Assert(fmstate != NULL);
3504 
3505  /* If we created a prepared statement, destroy it */
3506  if (fmstate->p_name)
3507  {
3508  char sql[64];
3509  PGresult *res;
3510 
3511  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3512 
3513  /*
3514  * We don't use a PG_TRY block here, so be careful not to throw error
3515  * without releasing the PGresult.
3516  */
3517  res = pgfdw_exec_query(fmstate->conn, sql);
3518  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3519  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3520  PQclear(res);
3521  fmstate->p_name = NULL;
3522  }
3523 
3524  /* Release remote connection */
3525  ReleaseConnection(fmstate->conn);
3526  fmstate->conn = NULL;
3527 }
3528 
3529 /*
3530  * build_remote_returning
3531  * Build a RETURNING targetlist of a remote query for performing an
3532  * UPDATE/DELETE .. RETURNING on a join directly
3533  */
3534 static List *
3535 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3536 {
3537  bool have_wholerow = false;
3538  List *tlist = NIL;
3539  List *vars;
3540  ListCell *lc;
3541 
3542  Assert(returningList);
3543 
3544  vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3545 
3546  /*
3547  * If there's a whole-row reference to the target relation, then we'll
3548  * need all the columns of the relation.
3549  */
3550  foreach(lc, vars)
3551  {
3552  Var *var = (Var *) lfirst(lc);
3553 
3554  if (IsA(var, Var) &&
3555  var->varno == rtindex &&
3556  var->varattno == InvalidAttrNumber)
3557  {
3558  have_wholerow = true;
3559  break;
3560  }
3561  }
3562 
3563  if (have_wholerow)
3564  {
3566  int i;
3567 
3568  for (i = 1; i <= tupdesc->natts; i++)
3569  {
3570  Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3571  Var *var;
3572 
3573  /* Ignore dropped attributes. */
3574  if (attr->attisdropped)
3575  continue;
3576 
3577  var = makeVar(rtindex,
3578  i,
3579  attr->atttypid,
3580  attr->atttypmod,
3581  attr->attcollation,
3582  0);
3583 
3584  tlist = lappend(tlist,
3585  makeTargetEntry((Expr *) var,
3586  list_length(tlist) + 1,
3587  NULL,
3588  false));
3589  }
3590  }
3591 
3592  /* Now add any remaining columns to tlist. */
3593  foreach(lc, vars)
3594  {
3595  Var *var = (Var *) lfirst(lc);
3596 
3597  /*
3598  * No need for whole-row references to the target relation. We don't
3599  * need system columns other than ctid and oid either, since those are
3600  * set locally.
3601  */
3602  if (IsA(var, Var) &&
3603  var->varno == rtindex &&
3604  var->varattno <= InvalidAttrNumber &&
3607  continue; /* don't need it */
3608 
3609  if (tlist_member((Expr *) var, tlist))
3610  continue; /* already got it */
3611 
3612  tlist = lappend(tlist,
3613  makeTargetEntry((Expr *) var,
3614  list_length(tlist) + 1,
3615  NULL,
3616  false));
3617  }
3618 
3619  list_free(vars);
3620 
3621  return tlist;
3622 }
3623 
3624 /*
3625  * rebuild_fdw_scan_tlist
3626  * Build new fdw_scan_tlist of given foreign-scan plan node from given
3627  * tlist
3628  *
3629  * There might be columns that the fdw_scan_tlist of the given foreign-scan
3630  * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
3631  * have contained resjunk columns such as 'ctid' of the target relation and
3632  * 'wholerow' of non-target relations, but the tlist might not contain them,
3633  * for example. So, adjust the tlist so it contains all the columns specified
3634  * in the fdw_scan_tlist; else setrefs.c will get confused.
3635  */
3636 static void
3638 {
3639  List *new_tlist = tlist;
3640  List *old_tlist = fscan->fdw_scan_tlist;
3641  ListCell *lc;
3642 
3643  foreach(lc, old_tlist)
3644  {
3645  TargetEntry *tle = (TargetEntry *) lfirst(lc);
3646 
3647  if (tlist_member(tle->expr, new_tlist))
3648  continue; /* already got it */
3649 
3650  new_tlist = lappend(new_tlist,
3651  makeTargetEntry(tle->expr,
3652  list_length(new_tlist) + 1,
3653  NULL,
3654  false));
3655  }
3656  fscan->fdw_scan_tlist = new_tlist;
3657 }
3658 
3659 /*
3660  * Execute a direct UPDATE/DELETE statement.
3661  */
3662 static void
3664 {
3666  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3667  int numParams = dmstate->numParams;
3668  const char **values = dmstate->param_values;
3669 
3670  /*
3671  * Construct array of query parameter values in text format.
3672  */
3673  if (numParams > 0)
3674  process_query_params(econtext,
3675  dmstate->param_flinfo,
3676  dmstate->param_exprs,
3677  values);
3678 
3679  /*
3680  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3681  * to infer types for all parameters. Since we explicitly cast every
3682  * parameter (see deparse.c), the "inference" is trivial and will produce
3683  * the desired result. This allows us to avoid assuming that the remote
3684  * server has the same OIDs we do for the parameters' types.
3685  */
3686  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3687  NULL, values, NULL, NULL, 0))
3688  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3689 
3690  /*
3691  * Get the result, and check for success.
3692  *
3693  * We don't use a PG_TRY block here, so be careful not to throw error
3694  * without releasing the PGresult.
3695  */
3696  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3697  if (PQresultStatus(dmstate->result) !=
3699  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3700  dmstate->query);
3701 
3702  /* Get the number of rows affected. */
3703  if (dmstate->has_returning)
3704  dmstate->num_tuples = PQntuples(dmstate->result);
3705  else
3706  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3707 }
3708 
3709 /*
3710  * Get the result of a RETURNING clause.
3711  */
3712 static TupleTableSlot *
3714 {
3716  EState *estate = node->ss.ps.state;
3717  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3718  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3719  TupleTableSlot *resultSlot;
3720 
3721  Assert(resultRelInfo->ri_projectReturning);
3722 
3723  /* If we didn't get any tuples, must be end of data. */
3724  if (dmstate->next_tuple >= dmstate->num_tuples)
3725  return ExecClearTuple(slot);
3726 
3727  /* Increment the command es_processed count if necessary. */
3728  if (dmstate->set_processed)
3729  estate->es_processed += 1;
3730 
3731  /*
3732  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3733  * tuple. (has_returning is false when the local query is of the form
3734  * "UPDATE/DELETE .. RETURNING 1" for example.)
3735  */
3736  if (!dmstate->has_returning)
3737  {
3738  ExecStoreAllNullTuple(slot);
3739  resultSlot = slot;
3740  }
3741  else
3742  {
3743  /*
3744  * On error, be sure to release the PGresult on the way out. Callers
3745  * do not have PG_TRY blocks to ensure this happens.
3746  */
3747  PG_TRY();
3748  {
3749  HeapTuple newtup;
3750 
3751  newtup = make_tuple_from_result_row(dmstate->result,
3752  dmstate->next_tuple,
3753  dmstate->rel,
3754  dmstate->attinmeta,
3755  dmstate->retrieved_attrs,
3756  node,
3757  dmstate->temp_cxt);
3758  ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3759  }
3760  PG_CATCH();
3761  {
3762  if (dmstate->result)
3763  PQclear(dmstate->result);
3764  PG_RE_THROW();
3765  }
3766  PG_END_TRY();
3767 
3768  /* Get the updated/deleted tuple. */
3769  if (dmstate->rel)
3770  resultSlot = slot;
3771  else
3772  resultSlot = apply_returning_filter(dmstate, slot, estate);
3773  }
3774  dmstate->next_tuple++;
3775 
3776  /* Make slot available for evaluation of the local query RETURNING list. */
3777  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
3778  resultSlot;
3779 
3780  return slot;
3781 }
3782 
3783 /*
3784  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
3785  */
3786 static void
3788  List *fdw_scan_tlist,
3789  Index rtindex)
3790 {
3791  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
3792  ListCell *lc;
3793  int i;
3794 
3795  /*
3796  * Calculate the mapping between the fdw_scan_tlist's entries and the
3797  * result tuple's attributes.
3798  *
3799  * The "map" is an array of indexes of the result tuple's attributes in
3800  * fdw_scan_tlist, i.e., one entry for every attribute of the result
3801  * tuple. We store zero for any attributes that don't have the
3802  * corresponding entries in that list, marking that a NULL is needed in
3803  * the result tuple.
3804  *
3805  * Also get the indexes of the entries for ctid and oid if any.
3806  */
3807  dmstate->attnoMap = (AttrNumber *)
3808  palloc0(resultTupType->natts * sizeof(AttrNumber));
3809 
3810  dmstate->ctidAttno = dmstate->oidAttno = 0;
3811 
3812  i = 1;
3813  dmstate->hasSystemCols = false;
3814  foreach(lc, fdw_scan_tlist)
3815  {
3816  TargetEntry *tle = (TargetEntry *) lfirst(lc);
3817  Var *var = (Var *) tle->expr;
3818 
3819  Assert(IsA(var, Var));
3820 
3821  /*
3822  * If the Var is a column of the target relation to be retrieved from
3823  * the foreign server, get the index of the entry.
3824  */
3825  if (var->varno == rtindex &&
3826  list_member_int(dmstate->retrieved_attrs, i))
3827  {
3828  int attrno = var->varattno;
3829 
3830  if (attrno < 0)
3831  {
3832  /*
3833  * We don't retrieve system columns other than ctid and oid.
3834  */
3835  if (attrno == SelfItemPointerAttributeNumber)
3836  dmstate->ctidAttno = i;
3837  else if (attrno == ObjectIdAttributeNumber)
3838  dmstate->oidAttno = i;
3839  else
3840  Assert(false);
3841  dmstate->hasSystemCols = true;
3842  }
3843  else
3844  {
3845  /*
3846  * We don't retrieve whole-row references to the target
3847  * relation either.
3848  */
3849  Assert(attrno > 0);
3850 
3851  dmstate->attnoMap[attrno - 1] = i;
3852  }
3853  }
3854  i++;
3855  }
3856 }
3857 
3858 /*
3859  * Extract and return an updated/deleted tuple from a scan tuple.
3860  */
3861 static TupleTableSlot *
3863  TupleTableSlot *slot,
3864  EState *estate)
3865 {
3866  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
3867  TupleTableSlot *resultSlot;
3868  Datum *values;
3869  bool *isnull;
3870  Datum *old_values;
3871  bool *old_isnull;
3872  int i;
3873 
3874  /*
3875  * Use the trigger tuple slot as a place to store the result tuple.
3876  */
3877  resultSlot = estate->es_trig_tuple_slot;
3878  if (resultSlot->tts_tupleDescriptor != resultTupType)
3879  ExecSetSlotDescriptor(resultSlot, resultTupType);
3880 
3881  /*
3882  * Extract all the values of the scan tuple.
3883  */
3884  slot_getallattrs(slot);
3885  old_values = slot->tts_values;
3886  old_isnull = slot->tts_isnull;
3887 
3888  /*
3889  * Prepare to build the result tuple.
3890  */
3891  ExecClearTuple(resultSlot);
3892  values = resultSlot->tts_values;
3893  isnull = resultSlot->tts_isnull;
3894 
3895  /*
3896  * Transpose data into proper fields of the result tuple.
3897  */
3898  for (i = 0; i < resultTupType->natts; i++)
3899  {
3900  int j = dmstate->attnoMap[i];
3901 
3902  if (j == 0)
3903  {
3904  values[i] = (Datum) 0;
3905  isnull[i] = true;
3906  }
3907  else
3908  {
3909  values[i] = old_values[j - 1];
3910  isnull[i] = old_isnull[j - 1];
3911  }
3912  }
3913 
3914  /*
3915  * Build the virtual tuple.
3916  */
3917  ExecStoreVirtualTuple(resultSlot);
3918 
3919  /*
3920  * If we have any system columns to return, install them.
3921  */
3922  if (dmstate->hasSystemCols)
3923  {
3924  HeapTuple resultTup = ExecMaterializeSlot(resultSlot);
3925 
3926  /* ctid */
3927  if (dmstate->ctidAttno)
3928  {
3929  ItemPointer ctid = NULL;
3930 
3931  ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
3932  resultTup->t_self = *ctid;
3933  }
3934 
3935  /* oid */
3936  if (dmstate->oidAttno)
3937  {
3938  Oid oid = InvalidOid;
3939 
3940  oid = DatumGetObjectId(old_values[dmstate->oidAttno - 1]);
3941  HeapTupleSetOid(resultTup, oid);
3942  }
3943 
3944  /*
3945  * And remaining columns
3946  *
3947  * Note: since we currently don't allow the target relation to appear
3948  * on the nullable side of an outer join, any system columns wouldn't
3949  * go to NULL.
3950  *
3951  * Note: no need to care about tableoid here because it will be
3952  * initialized in ExecProcessReturning().
3953  */
3957  }
3958 
3959  /*
3960  * And return the result tuple.
3961  */
3962  return resultSlot;
3963 }
3964 
3965 /*
3966  * Prepare for processing of parameters used in remote query.
3967  */
3968 static void
3970  List *fdw_exprs,
3971  int numParams,
3973  List **param_exprs,
3974  const char ***param_values)
3975 {
3976  int i;
3977  ListCell *lc;
3978 
3979  Assert(numParams > 0);
3980 
3981  /* Prepare for output conversion of parameters used in remote query. */
3982  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
3983 
3984  i = 0;
3985  foreach(lc, fdw_exprs)
3986  {
3987  Node *param_expr = (Node *) lfirst(lc);
3988  Oid typefnoid;
3989  bool isvarlena;
3990 
3991  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
3992  fmgr_info(typefnoid, &(*param_flinfo)[i]);
3993  i++;
3994  }
3995 
3996  /*
3997  * Prepare remote-parameter expressions for evaluation. (Note: in
3998  * practice, we expect that all these expressions will be just Params, so
3999  * we could possibly do something more efficient than using the full
4000  * expression-eval machinery for this. But probably there would be little
4001  * benefit, and it'd require postgres_fdw to know more than is desirable
4002  * about Param evaluation.)
4003  */
4004  *param_exprs = ExecInitExprList(fdw_exprs, node);
4005 
4006  /* Allocate buffer for text form of query parameters. */
4007  *param_values = (const char **) palloc0(numParams * sizeof(char *));
4008 }
4009 
4010 /*
4011  * Construct array of query parameter values in text format.
4012  */
4013 static void
4016  List *param_exprs,
4017  const char **param_values)
4018 {
4019  int nestlevel;
4020  int i;
4021  ListCell *lc;
4022 
4023  nestlevel = set_transmission_modes();
4024 
4025  i = 0;
4026  foreach(lc, param_exprs)
4027  {
4028  ExprState *expr_state = (ExprState *) lfirst(lc);
4029  Datum expr_value;
4030  bool isNull;
4031 
4032  /* Evaluate the parameter expression */
4033  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4034 
4035  /*
4036  * Get string representation of each parameter value by invoking
4037  * type-specific output function, unless the value is null.
4038  */
4039  if (isNull)
4040  param_values[i] = NULL;
4041  else
4042  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4043 
4044  i++;
4045  }
4046 
4047  reset_transmission_modes(nestlevel);
4048 }
4049 
4050 /*
4051  * postgresAnalyzeForeignTable
4052  * Test whether analyzing this foreign table is supported
4053  */
4054 static bool
4056  AcquireSampleRowsFunc *func,
4057  BlockNumber *totalpages)
4058 {
4059  ForeignTable *table;
4060  UserMapping *user;
4061  PGconn *conn;
4062  StringInfoData sql;
4063  PGresult *volatile res = NULL;
4064 
4065  /* Return the row-analysis function pointer */
4067 
4068  /*
4069  * Now we have to get the number of pages. It's annoying that the ANALYZE
4070  * API requires us to return that now, because it forces some duplication
4071  * of effort between this routine and postgresAcquireSampleRowsFunc. But
4072  * it's probably not worth redefining that API at this point.
4073  */
4074 
4075  /*
4076  * Get the connection to use. We do the remote access as the table's
4077  * owner, even if the ANALYZE was started by some other user.
4078  */
4079  table = GetForeignTable(RelationGetRelid(relation));
4080  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4081  conn = GetConnection(user, false);
4082 
4083  /*
4084  * Construct command to get page count for relation.
4085  */
4086  initStringInfo(&sql);
4087  deparseAnalyzeSizeSql(&sql, relation);
4088 
4089  /* In what follows, do not risk leaking any PGresults. */
4090  PG_TRY();
4091  {
4092  res = pgfdw_exec_query(conn, sql.data);
4093  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4094  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4095 
4096  if (PQntuples(res) != 1 || PQnfields(res) != 1)
4097  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4098  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4099 
4100  PQclear(res);
4101  res = NULL;
4102  }
4103  PG_CATCH();
4104  {
4105  if (res)
4106  PQclear(res);
4107  PG_RE_THROW();
4108  }
4109  PG_END_TRY();
4110 
4111  ReleaseConnection(conn);
4112 
4113  return true;
4114 }
4115 
4116 /*
4117  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4118  *
4119  * We fetch the whole table from the remote side and pick out some sample rows.
4120  *
4121  * Selected rows are returned in the caller-allocated array rows[],
4122  * which must have at least targrows entries.
4123  * The actual number of rows selected is returned as the function result.
4124  * We also count the total number of rows in the table and return it into
4125  * *totalrows. Note that *totaldeadrows is always set to 0.
4126  *
4127  * Note that the returned list of rows is not always in order by physical
4128  * position in the table. Therefore, correlation estimates derived later
4129  * may be meaningless, but it's OK because we don't use the estimates
4130  * currently (the planner only pays attention to correlation for indexscans).
4131  */
4132 static int
4134  HeapTuple *rows, int targrows,
4135  double *totalrows,
4136  double *totaldeadrows)
4137 {
4138  PgFdwAnalyzeState astate;
4139  ForeignTable *table;
4140  ForeignServer *server;
4141  UserMapping *user;
4142  PGconn *conn;
4143  unsigned int cursor_number;
4144  StringInfoData sql;
4145  PGresult *volatile res = NULL;
4146 
4147  /* Initialize workspace state */
4148  astate.rel = relation;
4150 
4151  astate.rows = rows;
4152  astate.targrows = targrows;
4153  astate.numrows = 0;
4154  astate.samplerows = 0;
4155  astate.rowstoskip = -1; /* -1 means not set yet */
4156  reservoir_init_selection_state(&astate.rstate, targrows);
4157 
4158  /* Remember ANALYZE context, and create a per-tuple temp context */
4159  astate.anl_cxt = CurrentMemoryContext;
4161  "postgres_fdw temporary data",
4163 
4164  /*
4165  * Get the connection to use. We do the remote access as the table's
4166  * owner, even if the ANALYZE was started by some other user.
4167  */
4168  table = GetForeignTable(RelationGetRelid(relation));
4169  server = GetForeignServer(table->serverid);
4170  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4171  conn = GetConnection(user, false);
4172 
4173  /*
4174  * Construct cursor that retrieves whole rows from remote.
4175  */
4176  cursor_number = GetCursorNumber(conn);
4177  initStringInfo(&sql);
4178  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4179  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4180 
4181  /* In what follows, do not risk leaking any PGresults. */
4182  PG_TRY();
4183  {
4184  res = pgfdw_exec_query(conn, sql.data);
4185  if (PQresultStatus(res) != PGRES_COMMAND_OK)
4186  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4187  PQclear(res);
4188  res = NULL;
4189 
4190  /* Retrieve and process rows a batch at a time. */
4191  for (;;)
4192  {
4193  char fetch_sql[64];
4194  int fetch_size;
4195  int numrows;
4196  int i;
4197  ListCell *lc;
4198 
4199  /* Allow users to cancel long query */
4201 
4202  /*
4203  * XXX possible future improvement: if rowstoskip is large, we
4204  * could issue a MOVE rather than physically fetching the rows,
4205  * then just adjust rowstoskip and samplerows appropriately.
4206  */
4207 
4208  /* The fetch size is arbitrary, but shouldn't be enormous. */
4209  fetch_size = 100;
4210  foreach(lc, server->options)
4211  {
4212  DefElem *def = (DefElem *) lfirst(lc);
4213 
4214  if (strcmp(def->defname, "fetch_size") == 0)
4215  {
4216  fetch_size = strtol(defGetString(def), NULL, 10);
4217  break;
4218  }
4219  }
4220  foreach(lc, table->options)
4221  {
4222  DefElem *def = (DefElem *) lfirst(lc);
4223 
4224  if (strcmp(def->defname, "fetch_size") == 0)
4225  {
4226  fetch_size = strtol(defGetString(def), NULL, 10);
4227  break;
4228  }
4229  }
4230 
4231  /* Fetch some rows */
4232  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4233  fetch_size, cursor_number);
4234 
4235  res = pgfdw_exec_query(conn, fetch_sql);
4236  /* On error, report the original query, not the FETCH. */
4237  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4238  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4239 
4240  /* Process whatever we got. */
4241  numrows = PQntuples(res);
4242  for (i = 0; i < numrows; i++)
4243  analyze_row_processor(res, i, &astate);
4244 
4245  PQclear(res);
4246  res = NULL;
4247 
4248  /* Must be EOF if we didn't get all the rows requested. */
4249  if (numrows < fetch_size)
4250  break;
4251  }
4252 
4253  /* Close the cursor, just to be tidy. */
4254  close_cursor(conn, cursor_number);
4255  }
4256  PG_CATCH();
4257  {
4258  if (res)
4259  PQclear(res);
4260  PG_RE_THROW();
4261  }
4262  PG_END_TRY();
4263 
4264  ReleaseConnection(conn);
4265 
4266  /* We assume that we have no dead tuple. */
4267  *totaldeadrows = 0.0;
4268 
4269  /* We've retrieved all living tuples from foreign server. */
4270  *totalrows = astate.samplerows;
4271 
4272  /*
4273  * Emit some interesting relation info
4274  */
4275  ereport(elevel,
4276  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4277  RelationGetRelationName(relation),
4278  astate.samplerows, astate.numrows)));
4279 
4280  return astate.numrows;
4281 }
4282 
4283 /*
4284  * Collect sample rows from the result of query.
4285  * - Use all tuples in sample until target # of samples are collected.
4286  * - Subsequently, replace already-sampled tuples randomly.
4287  */
4288 static void
4290 {
4291  int targrows = astate->targrows;
4292  int pos; /* array index to store tuple in */
4293  MemoryContext oldcontext;
4294 
4295  /* Always increment sample row counter. */
4296  astate->samplerows += 1;
4297 
4298  /*
4299  * Determine the slot where this sample row should be stored. Set pos to
4300  * negative value to indicate the row should be skipped.
4301  */
4302  if (astate->numrows < targrows)
4303  {
4304  /* First targrows rows are always included into the sample */
4305  pos = astate->numrows++;
4306  }
4307  else
4308  {
4309  /*
4310  * Now we start replacing tuples in the sample until we reach the end
4311  * of the relation. Same algorithm as in acquire_sample_rows in
4312  * analyze.c; see Jeff Vitter's paper.
4313  */
4314  if (astate->rowstoskip < 0)
4315  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4316 
4317  if (astate->rowstoskip <= 0)
4318  {
4319  /* Choose a random reservoir element to replace. */
4320  pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4321  Assert(pos >= 0 && pos < targrows);
4322  heap_freetuple(astate->rows[pos]);
4323  }
4324  else
4325  {
4326  /* Skip this tuple. */
4327  pos = -1;
4328  }
4329 
4330  astate->rowstoskip -= 1;
4331  }
4332 
4333  if (pos >= 0)
4334  {
4335  /*
4336  * Create sample tuple from current result row, and store it in the
4337  * position determined above. The tuple has to be created in anl_cxt.
4338  */
4339  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4340 
4341  astate->rows[pos] = make_tuple_from_result_row(res, row,
4342  astate->rel,
4343  astate->attinmeta,
4344  astate->retrieved_attrs,
4345  NULL,
4346  astate->temp_cxt);
4347 
4348  MemoryContextSwitchTo(oldcontext);
4349  }
4350 }
4351 
4352 /*
4353  * Import a foreign schema
4354  */
4355 static List *
4357 {
4358  List *commands = NIL;
4359  bool import_collate = true;
4360  bool import_default = false;
4361  bool import_not_null = true;
4362  ForeignServer *server;
4363  UserMapping *mapping;
4364  PGconn *conn;
4366  PGresult *volatile res = NULL;
4367  int numrows,
4368  i;
4369  ListCell *lc;
4370 
4371  /* Parse statement options */
4372  foreach(lc, stmt->options)
4373  {
4374  DefElem *def = (DefElem *) lfirst(lc);
4375 
4376  if (strcmp(def->defname, "import_collate") == 0)
4377  import_collate = defGetBoolean(def);
4378  else if (strcmp(def->defname, "import_default") == 0)
4379  import_default = defGetBoolean(def);
4380  else if (strcmp(def->defname, "import_not_null") == 0)
4381  import_not_null = defGetBoolean(def);
4382  else
4383  ereport(ERROR,
4384  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4385  errmsg("invalid option \"%s\"", def->defname)));
4386  }
4387 
4388  /*
4389  * Get connection to the foreign server. Connection manager will
4390  * establish new connection if necessary.
4391  */
4392  server = GetForeignServer(serverOid);
4393  mapping = GetUserMapping(GetUserId(), server->serverid);
4394  conn = GetConnection(mapping, false);
4395 
4396  /* Don't attempt to import collation if remote server hasn't got it */
4397  if (PQserverVersion(conn) < 90100)
4398  import_collate = false;
4399 
4400  /* Create workspace for strings */
4401  initStringInfo(&buf);
4402 
4403  /* In what follows, do not risk leaking any PGresults. */
4404  PG_TRY();
4405  {
4406  /* Check that the schema really exists */
4407  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4408  deparseStringLiteral(&buf, stmt->remote_schema);
4409 
4410  res = pgfdw_exec_query(conn, buf.data);
4411  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4412  pgfdw_report_error(ERROR, res, conn, false, buf.data);
4413 
4414  if (PQntuples(res) != 1)
4415  ereport(ERROR,
4416  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4417  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4418  stmt->remote_schema, server->servername)));
4419 
4420  PQclear(res);
4421  res = NULL;
4422  resetStringInfo(&buf);
4423 
4424  /*
4425  * Fetch all table data from this schema, possibly restricted by
4426  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
4427  * to EXCEPT/LIMIT TO here, because the core code will filter the
4428  * statements we return according to those lists anyway. But it
4429  * should save a few cycles to not process excluded tables in the
4430  * first place.)
4431  *
4432  * Ignore table data for partitions and only include the definitions
4433  * of the root partitioned tables to allow access to the complete
4434  * remote data set locally in the schema imported.
4435  *
4436  * Note: because we run the connection with search_path restricted to
4437  * pg_catalog, the format_type() and pg_get_expr() outputs will always
4438  * include a schema name for types/functions in other schemas, which
4439  * is what we want.
4440  */
4441  if (import_collate)
4443  "SELECT relname, "
4444  " attname, "
4445  " format_type(atttypid, atttypmod), "
4446  " attnotnull, "
4447  " pg_get_expr(adbin, adrelid), "
4448  " collname, "
4449  " collnsp.nspname "
4450  "FROM pg_class c "
4451  " JOIN pg_namespace n ON "
4452  " relnamespace = n.oid "
4453  " LEFT JOIN pg_attribute a ON "
4454  " attrelid = c.oid AND attnum > 0 "
4455  " AND NOT attisdropped "
4456  " LEFT JOIN pg_attrdef ad ON "
4457  " adrelid = c.oid AND adnum = attnum "
4458  " LEFT JOIN pg_collation coll ON "
4459  " coll.oid = attcollation "
4460  " LEFT JOIN pg_namespace collnsp ON "
4461  " collnsp.oid = collnamespace ");
4462  else
4464  "SELECT relname, "
4465  " attname, "
4466  " format_type(atttypid, atttypmod), "
4467  " attnotnull, "
4468  " pg_get_expr(adbin, adrelid), "
4469  " NULL, NULL "
4470  "FROM pg_class c "
4471  " JOIN pg_namespace n ON "
4472  " relnamespace = n.oid "
4473  " LEFT JOIN pg_attribute a ON "
4474  " attrelid = c.oid AND attnum > 0 "
4475  " AND NOT attisdropped "
4476  " LEFT JOIN pg_attrdef ad ON "
4477  " adrelid = c.oid AND adnum = attnum ");
4478 
4480  "WHERE c.relkind IN ("
4481  CppAsString2(RELKIND_RELATION) ","
4482  CppAsString2(RELKIND_VIEW) ","
4483  CppAsString2(RELKIND_FOREIGN_TABLE) ","
4484  CppAsString2(RELKIND_MATVIEW) ","
4485  CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
4486  " AND n.nspname = ");
4487  deparseStringLiteral(&buf, stmt->remote_schema);
4488 
4489  /* Partitions are supported since Postgres 10 */
4490  if (PQserverVersion(conn) >= 100000)
4491  appendStringInfoString(&buf, " AND NOT c.relispartition ");
4492 
4493  /* Apply restrictions for LIMIT TO and EXCEPT */
4494  if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4496  {
4497  bool first_item = true;
4498 
4499  appendStringInfoString(&buf, " AND c.relname ");
4500  if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4501  appendStringInfoString(&buf, "NOT ");
4502  appendStringInfoString(&buf, "IN (");
4503 
4504  /* Append list of table names within IN clause */
4505  foreach(lc, stmt->table_list)
4506  {
4507  RangeVar *rv = (RangeVar *) lfirst(lc);
4508 
4509  if (first_item)
4510  first_item = false;
4511  else
4512  appendStringInfoString(&buf, ", ");
4513  deparseStringLiteral(&buf, rv->relname);
4514  }
4515  appendStringInfoChar(&buf, ')');
4516  }
4517 
4518  /* Append ORDER BY at the end of query to ensure output ordering */
4519  appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4520 
4521  /* Fetch the data */
4522  res = pgfdw_exec_query(conn, buf.data);
4523  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4524  pgfdw_report_error(ERROR, res, conn, false, buf.data);
4525 
4526  /* Process results */
4527  numrows = PQntuples(res);
4528  /* note: incrementation of i happens in inner loop's while() test */
4529  for (i = 0; i < numrows;)
4530  {
4531  char *tablename = PQgetvalue(res, i, 0);
4532  bool first_item = true;
4533 
4534  resetStringInfo(&buf);
4535  appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4536  quote_identifier(tablename));
4537 
4538  /* Scan all rows for this table */
4539  do
4540  {
4541  char *attname;
4542  char *typename;
4543  char *attnotnull;
4544  char *attdefault;
4545  char *collname;
4546  char *collnamespace;
4547 
4548  /* If table has no columns, we'll see nulls here */
4549  if (PQgetisnull(res, i, 1))
4550  continue;
4551 
4552  attname = PQgetvalue(res, i, 1);
4553  typename = PQgetvalue(res, i, 2);
4554  attnotnull = PQgetvalue(res, i, 3);
4555  attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
4556  PQgetvalue(res, i, 4);
4557  collname = PQgetisnull(res, i, 5) ? (char *) NULL :
4558  PQgetvalue(res, i, 5);
4559  collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
4560  PQgetvalue(res, i, 6);
4561 
4562  if (first_item)
4563  first_item = false;
4564  else
4565  appendStringInfoString(&buf, ",\n");
4566 
4567  /* Print column name and type */
4568  appendStringInfo(&buf, " %s %s",
4569  quote_identifier(attname),
4570  typename);
4571 
4572  /*
4573  * Add column_name option so that renaming the foreign table's
4574  * column doesn't break the association to the underlying
4575  * column.
4576  */
4577  appendStringInfoString(&buf, " OPTIONS (column_name ");
4578  deparseStringLiteral(&buf, attname);
4579  appendStringInfoChar(&buf, ')');
4580 
4581  /* Add COLLATE if needed */
4582  if (import_collate && collname != NULL && collnamespace != NULL)
4583  appendStringInfo(&buf, " COLLATE %s.%s",
4584  quote_identifier(collnamespace),
4585  quote_identifier(collname));
4586 
4587  /* Add DEFAULT if needed */
4588  if (import_default && attdefault != NULL)
4589  appendStringInfo(&buf, " DEFAULT %s", attdefault);
4590 
4591  /* Add NOT NULL if needed */
4592  if (import_not_null && attnotnull[0] == 't')
4593  appendStringInfoString(&buf, " NOT NULL");
4594  }
4595  while (++i < numrows &&
4596  strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4597 
4598  /*
4599  * Add server name and table-level options. We specify remote
4600  * schema and table name as options (the latter to ensure that
4601  * renaming the foreign table doesn't break the association).
4602  */
4603  appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4604  quote_identifier(server->servername));
4605 
4606  appendStringInfoString(&buf, "schema_name ");
4607  deparseStringLiteral(&buf, stmt->remote_schema);
4608  appendStringInfoString(&buf, ", table_name ");
4609  deparseStringLiteral(&buf, tablename);
4610 
4611  appendStringInfoString(&buf, ");");
4612 
4613  commands = lappend(commands, pstrdup(buf.data));
4614  }
4615 
4616  /* Clean up */
4617  PQclear(res);
4618  res = NULL;
4619  }
4620  PG_CATCH();
4621  {
4622  if (res)
4623  PQclear(res);
4624  PG_RE_THROW();
4625  }
4626  PG_END_TRY();
4627 
4628  ReleaseConnection(conn);
4629 
4630  return commands;
4631 }
4632 
4633 /*
4634  * Assess whether the join between inner and outer relations can be pushed down
4635  * to the foreign server. As a side effect, save information we obtain in this
4636  * function to PgFdwRelationInfo passed in.
4637  */
4638 static bool
4640  RelOptInfo *outerrel, RelOptInfo *innerrel,
4641  JoinPathExtraData *extra)
4642 {
4643  PgFdwRelationInfo *fpinfo;
4644  PgFdwRelationInfo *fpinfo_o;
4645  PgFdwRelationInfo *fpinfo_i;
4646  ListCell *lc;
4647  List *joinclauses;
4648 
4649  /*
4650  * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4651  * Constructing queries representing SEMI and ANTI joins is hard, hence
4652  * not considered right now.
4653  */
4654  if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4655  jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4656  return false;
4657 
4658  /*
4659  * If either of the joining relations is marked as unsafe to pushdown, the
4660  * join can not be pushed down.
4661  */
4662  fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4663  fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4664  fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4665  if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4666  !fpinfo_i || !fpinfo_i->pushdown_safe)
4667  return false;
4668 
4669  /*
4670  * If joining relations have local conditions, those conditions are
4671  * required to be applied before joining the relations. Hence the join can
4672  * not be pushed down.
4673  */
4674  if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4675  return false;
4676 
4677  /*
4678  * Merge FDW options. We might be tempted to do this after we have deemed
4679  * the foreign join to be OK. But we must do this beforehand so that we
4680  * know which quals can be evaluated on the foreign server, which might
4681  * depend on shippable_extensions.
4682  */
4683  fpinfo->server = fpinfo_o->server;
4684  merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
4685 
4686  /*
4687  * Separate restrict list into join quals and pushed-down (other) quals.
4688  *
4689  * Join quals belonging to an outer join must all be shippable, else we
4690  * cannot execute the join remotely. Add such quals to 'joinclauses'.
4691  *
4692  * Add other quals to fpinfo->remote_conds if they are shippable, else to
4693  * fpinfo->local_conds. In an inner join it's okay to execute conditions
4694  * either locally or remotely; the same is true for pushed-down conditions
4695  * at an outer join.
4696  *
4697  * Note we might return failure after having already scribbled on
4698  * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
4699  * won't consult those lists again if we deem the join unshippable.
4700  */
4701  joinclauses = NIL;
4702  foreach(lc, extra->restrictlist)
4703  {
4704  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4705  bool is_remote_clause = is_foreign_expr(root, joinrel,
4706  rinfo->clause);
4707 
4708  if (IS_OUTER_JOIN(jointype) &&
4709  !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
4710  {
4711  if (!is_remote_clause)
4712  return false;
4713  joinclauses = lappend(joinclauses, rinfo);
4714  }
4715  else
4716  {
4717  if (is_remote_clause)
4718  fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4719  else
4720  fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4721  }
4722  }
4723 
4724  /*
4725  * deparseExplicitTargetList() isn't smart enough to handle anything other
4726  * than a Var. In particular, if there's some PlaceHolderVar that would
4727  * need to be evaluated within this join tree (because there's an upper
4728  * reference to a quantity that may go to NULL as a result of an outer
4729  * join), then we can't try to push the join down because we'll fail when
4730  * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
4731  * needs to be evaluated *at the top* of this join tree is OK, because we
4732  * can do that locally after fetching the results from the remote side.
4733  */
4734  foreach(lc, root->placeholder_list)
4735  {
4736  PlaceHolderInfo *phinfo = lfirst(lc);
4737  Relids relids;
4738 
4739  /* PlaceHolderInfo refers to parent relids, not child relids. */
4740  relids = IS_OTHER_REL(joinrel) ?
4741  joinrel->top_parent_relids : joinrel->relids;
4742 
4743  if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4744  bms_nonempty_difference(relids, phinfo->ph_eval_at))
4745  return false;
4746  }
4747 
4748  /* Save the join clauses, for later use. */
4749  fpinfo->joinclauses = joinclauses;
4750 
4751  fpinfo->outerrel = outerrel;
4752  fpinfo->innerrel = innerrel;
4753  fpinfo->jointype = jointype;
4754 
4755  /*
4756  * By default, both the input relations are not required to be deparsed as
4757  * subqueries, but there might be some relations covered by the input
4758  * relations that are required to be deparsed as subqueries, so save the
4759  * relids of those relations for later use by the deparser.
4760  */
4761  fpinfo->make_outerrel_subquery = false;
4762  fpinfo->make_innerrel_subquery = false;
4763  Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
4764  Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
4766  fpinfo_i->lower_subquery_rels);
4767 
4768  /*
4769  * Pull the other remote conditions from the joining relations into join
4770  * clauses or other remote clauses (remote_conds) of this relation
4771  * wherever possible. This avoids building subqueries at every join step.
4772  *
4773  * For an inner join, clauses from both the relations are added to the
4774  * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4775  * the outer side are added to remote_conds since those can be evaluated
4776  * after the join is evaluated. The clauses from inner side are added to
4777  * the joinclauses, since they need to be evaluated while constructing the
4778  * join.
4779  *
4780  * For a FULL OUTER JOIN, the other clauses from either relation can not
4781  * be added to the joinclauses or remote_conds, since each relation acts
4782  * as an outer relation for the other.
4783  *
4784  * The joining sides can not have local conditions, thus no need to test
4785  * shippability of the clauses being pulled up.
4786  */
4787  switch (jointype)
4788  {
4789  case JOIN_INNER:
4790  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4791  list_copy(fpinfo_i->remote_conds));
4792  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4793  list_copy(fpinfo_o->remote_conds));
4794  break;
4795 
4796  case JOIN_LEFT:
4797  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4798  list_copy(fpinfo_i->remote_conds));
4799  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4800  list_copy(fpinfo_o->remote_conds));
4801  break;
4802 
4803  case JOIN_RIGHT:
4804  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4805  list_copy(fpinfo_o->remote_conds));
4806  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4807  list_copy(fpinfo_i->remote_conds));
4808  break;
4809 
4810  case JOIN_FULL:
4811 
4812  /*
4813  * In this case, if any of the input relations has conditions, we
4814  * need to deparse that relation as a subquery so that the
4815  * conditions can be evaluated before the join. Remember it in
4816  * the fpinfo of this relation so that the deparser can take
4817  * appropriate action. Also, save the relids of base relations
4818  * covered by that relation for later use by the deparser.
4819  */
4820  if (fpinfo_o->remote_conds)
4821  {
4822  fpinfo->make_outerrel_subquery = true;
4823  fpinfo->lower_subquery_rels =
4825  outerrel->relids);
4826  }
4827  if (fpinfo_i->remote_conds)
4828  {
4829  fpinfo->make_innerrel_subquery = true;
4830  fpinfo->lower_subquery_rels =
4832  innerrel->relids);
4833  }
4834  break;
4835 
4836  default:
4837  /* Should not happen, we have just checked this above */
4838  elog(ERROR, "unsupported join type %d", jointype);
4839  }
4840 
4841  /*
4842  * For an inner join, all restrictions can be treated alike. Treating the
4843  * pushed down conditions as join conditions allows a top level full outer
4844  * join to be deparsed without requiring subqueries.
4845  */
4846  if (jointype == JOIN_INNER)
4847  {
4848  Assert(!fpinfo->joinclauses);
4849  fpinfo->joinclauses = fpinfo->remote_conds;
4850  fpinfo->remote_conds = NIL;
4851  }
4852 
4853  /* Mark that this join can be pushed down safely */
4854  fpinfo->pushdown_safe = true;
4855 
4856  /* Get user mapping */
4857  if (fpinfo->use_remote_estimate)
4858  {
4859  if (fpinfo_o->use_remote_estimate)
4860  fpinfo->user = fpinfo_o->user;
4861  else
4862  fpinfo->user = fpinfo_i->user;
4863  }
4864  else
4865  fpinfo->user = NULL;
4866 
4867  /*
4868  * Set cached relation costs to some negative value, so that we can detect
4869  * when they are set to some sensible costs, during one (usually the
4870  * first) of the calls to estimate_path_cost_size().
4871  */
4872  fpinfo->rel_startup_cost = -1;
4873  fpinfo->rel_total_cost = -1;
4874 
4875  /*
4876  * Set the string describing this join relation to be used in EXPLAIN
4877  * output of corresponding ForeignScan.
4878  */
4879  fpinfo->relation_name = makeStringInfo();
4880  appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4881  fpinfo_o->relation_name->data,
4882  get_jointype_name(fpinfo->jointype),
4883  fpinfo_i->relation_name->data);
4884 
4885  /*
4886  * Set the relation index. This is defined as the position of this
4887  * joinrel in the join_rel_list list plus the length of the rtable list.
4888  * Note that since this joinrel is at the end of the join_rel_list list
4889  * when we are called, we can get the position by list_length.
4890  */
4891  Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
4892  fpinfo->relation_index =
4894 
4895  return true;
4896 }
4897 
4898 static void
4900  Path *epq_path)
4901 {
4902  List *useful_pathkeys_list = NIL; /* List of all pathkeys */
4903  ListCell *lc;
4904 
4905  useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4906 
4907  /* Create one path for each set of pathkeys we found above. */
4908  foreach(lc, useful_pathkeys_list)
4909  {
4910  double rows;
4911  int width;
4912  Cost startup_cost;
4913  Cost total_cost;
4914  List *useful_pathkeys = lfirst(lc);
4915  Path *sorted_epq_path;
4916 
4917  estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4918  &rows, &width, &startup_cost, &total_cost);
4919 
4920  /*
4921  * The EPQ path must be at least as well sorted as the path itself,
4922  * in case it gets used as input to a mergejoin.
4923  */
4924  sorted_epq_path = epq_path;
4925  if (sorted_epq_path != NULL &&
4926  !pathkeys_contained_in(useful_pathkeys,
4927  sorted_epq_path->pathkeys))
4928  sorted_epq_path = (Path *)
4929  create_sort_path(root,
4930  rel,
4931  sorted_epq_path,
4932  useful_pathkeys,
4933  -1.0);
4934 
4935  add_path(rel, (Path *)
4936  create_foreignscan_path(root, rel,
4937  NULL,
4938  rows,
4939  startup_cost,
4940  total_cost,
4941  useful_pathkeys,
4942  NULL,
4943  sorted_epq_path,
4944  NIL));
4945  }
4946 }
4947 
4948 /*
4949  * Parse options from foreign server and apply them to fpinfo.
4950  *
4951  * New options might also require tweaking merge_fdw_options().
4952  */
4953 static void
4955 {
4956  ListCell *lc;
4957 
4958  foreach(lc, fpinfo->server->options)
4959  {
4960  DefElem *def = (DefElem *) lfirst(lc);
4961 
4962  if (strcmp(def->defname, "use_remote_estimate") == 0)
4963  fpinfo->use_remote_estimate = defGetBoolean(def);
4964  else if (strcmp(def->defname, "fdw_startup_cost") == 0)
4965  fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
4966  else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
4967  fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
4968  else if (strcmp(def->defname, "extensions") == 0)
4969  fpinfo->shippable_extensions =
4970  ExtractExtensionList(defGetString(def), false);
4971  else if (strcmp(def->defname, "fetch_size") == 0)
4972  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4973  }
4974 }
4975 
4976 /*
4977  * Parse options from foreign table and apply them to fpinfo.
4978  *
4979  * New options might also require tweaking merge_fdw_options().
4980  */
4981 static void
4983 {
4984  ListCell *lc;
4985 
4986  foreach(lc, fpinfo->table->options)
4987  {
4988  DefElem *def = (DefElem *) lfirst(lc);
4989 
4990  if (strcmp(def->defname, "use_remote_estimate") == 0)
4991  fpinfo->use_remote_estimate = defGetBoolean(def);
4992  else if (strcmp(def->defname, "fetch_size") == 0)
4993  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4994  }
4995 }
4996 
4997 /*
4998  * Merge FDW options from input relations into a new set of options for a join
4999  * or an upper rel.
5000  *
5001  * For a join relation, FDW-specific information about the inner and outer
5002  * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
5003  * fpinfo_o provides the information for the input relation; fpinfo_i is
5004  * expected to NULL.
5005  */
5006 static void
5008  const PgFdwRelationInfo *fpinfo_o,
5009  const PgFdwRelationInfo *fpinfo_i)
5010 {
5011  /* We must always have fpinfo_o. */
5012  Assert(fpinfo_o);
5013 
5014  /* fpinfo_i may be NULL, but if present the servers must both match. */
5015  Assert(!fpinfo_i ||
5016  fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5017 
5018  /*
5019  * Copy the server specific FDW options. (For a join, both relations come
5020  * from the same server, so the server options should have the same value
5021  * for both relations.)
5022  */
5023  fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5024  fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5025  fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5026  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5027  fpinfo->fetch_size = fpinfo_o->fetch_size;
5028 
5029  /* Merge the table level options from either side of the join. */
5030  if (fpinfo_i)
5031  {
5032  /*
5033  * We'll prefer to use remote estimates for this join if any table
5034  * from either side of the join is using remote estimates. This is
5035  * most likely going to be preferred since they're already willing to
5036  * pay the price of a round trip to get the remote EXPLAIN. In any
5037  * case it's not entirely clear how we might otherwise handle this
5038  * best.
5039  */
5040  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5041  fpinfo_i->use_remote_estimate;
5042 
5043  /*
5044  * Set fetch size to maximum of the joining sides, since we are
5045  * expecting the rows returned by the join to be proportional to the
5046  * relation sizes.
5047  */
5048  fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5049  }
5050 }
5051 
5052 /*
5053  * postgresGetForeignJoinPaths
5054  * Add possible ForeignPath to joinrel, if join is safe to push down.
5055  */
5056 static void
5058  RelOptInfo *joinrel,
5059  RelOptInfo *outerrel,
5060  RelOptInfo *innerrel,
5061  JoinType jointype,
5062  JoinPathExtraData *extra)
5063 {
5064  PgFdwRelationInfo *fpinfo;
5065  ForeignPath *joinpath;
5066  double rows;
5067  int width;
5068  Cost startup_cost;
5069  Cost total_cost;
5070  Path *epq_path; /* Path to create plan to be executed when
5071  * EvalPlanQual gets triggered. */
5072 
5073  /*
5074  * Skip if this join combination has been considered already.
5075  */
5076  if (joinrel->fdw_private)
5077  return;
5078 
5079  /*
5080  * Create unfinished PgFdwRelationInfo entry which is used to indicate
5081  * that the join relation is already considered, so that we won't waste
5082  * time in judging safety of join pushdown and adding the same paths again
5083  * if found safe. Once we know that this join can be pushed down, we fill
5084  * the entry.
5085  */
5086  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5087  fpinfo->pushdown_safe = false;
5088  joinrel->fdw_private = fpinfo;
5089  /* attrs_used is only for base relations. */
5090  fpinfo->attrs_used = NULL;
5091 
5092  /*
5093  * If there is a possibility that EvalPlanQual will be executed, we need
5094  * to be able to reconstruct the row using scans of the base relations.
5095  * GetExistingLocalJoinPath will find a suitable path for this purpose in
5096  * the path list of the joinrel, if one exists. We must be careful to
5097  * call it before adding any ForeignPath, since the ForeignPath might
5098  * dominate the only suitable local path available. We also do it before
5099  * calling foreign_join_ok(), since that function updates fpinfo and marks
5100  * it as pushable if the join is found to be pushable.
5101  */
5102  if (root->parse->commandType == CMD_DELETE ||
5103  root->parse->commandType == CMD_UPDATE ||
5104  root->rowMarks)
5105  {
5106  epq_path = GetExistingLocalJoinPath(joinrel);
5107  if (!epq_path)
5108  {
5109  elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
5110  return;
5111  }
5112  }
5113  else
5114  epq_path = NULL;
5115 
5116  if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
5117  {
5118  /* Free path required for EPQ if we copied one; we don't need it now */
5119  if (epq_path)
5120  pfree(epq_path);
5121  return;
5122  }
5123 
5124  /*
5125  * Compute the selectivity and cost of the local_conds, so we don't have
5126  * to do it over again for each path. The best we can do for these
5127  * conditions is to estimate selectivity on the basis of local statistics.
5128  * The local conditions are applied after the join has been computed on
5129  * the remote side like quals in WHERE clause, so pass jointype as
5130  * JOIN_INNER.
5131  */
5132  fpinfo->local_conds_sel = clauselist_selectivity(root,
5133  fpinfo->local_conds,
5134  0,
5135  JOIN_INNER,
5136  NULL);
5137  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5138 
5139  /*
5140  * If we are going to estimate costs locally, estimate the join clause
5141  * selectivity here while we have special join info.
5142  */
5143  if (!fpinfo->use_remote_estimate)
5144  fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
5145  0, fpinfo->jointype,
5146  extra->sjinfo);
5147 
5148  /* Estimate costs for bare join relation */
5149  estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
5150  &width, &startup_cost, &total_cost);
5151  /* Now update this information in the joinrel */
5152  joinrel->rows = rows;
5153  joinrel->reltarget->width = width;
5154  fpinfo->rows = rows;
5155  fpinfo->width = width;
5156  fpinfo->startup_cost = startup_cost;
5157  fpinfo->total_cost = total_cost;
5158 
5159  /*
5160  * Create a new join path and add it to the joinrel which represents a
5161  * join between foreign tables.
5162  */
5163  joinpath = create_foreignscan_path(root,
5164  joinrel,
5165  NULL, /* default pathtarget */
5166  rows,
5167  startup_cost,
5168  total_cost,
5169  NIL, /* no pathkeys */
5170  NULL, /* no required_outer */
5171  epq_path,
5172  NIL); /* no fdw_private */
5173 
5174  /* Add generated path into joinrel by add_path(). */
5175  add_path(joinrel, (Path *) joinpath);
5176 
5177  /* Consider pathkeys for the join relation */
5178  add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5179 
5180  /* XXX Consider parameterized paths for the join relation */
5181 }
5182 
5183 /*
5184  * Assess whether the aggregation, grouping and having operations can be pushed
5185  * down to the foreign server. As a side effect, save information we obtain in
5186  * this function to PgFdwRelationInfo of the input relation.
5187  */
5188 static bool
5190  Node *havingQual)
5191 {
5192  Query *query = root->parse;
5193  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5194  PathTarget *grouping_target = grouped_rel->reltarget;
5195  PgFdwRelationInfo *ofpinfo;
5196  List *aggvars;
5197  ListCell *lc;
5198  int i;
5199  List *tlist = NIL;
5200 
5201  /* We currently don't support pushing Grouping Sets. */
5202  if (query->groupingSets)
5203  return false;
5204 
5205  /* Get the fpinfo of the underlying scan relation. */
5206  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5207 
5208  /*
5209  * If underlying scan relation has any local conditions, those conditions
5210  * are required to be applied before performing aggregation. Hence the
5211  * aggregate cannot be pushed down.
5212  */
5213  if (ofpinfo->local_conds)
5214  return false;
5215 
5216  /*
5217  * Examine grouping expressions, as well as other expressions we'd need to
5218  * compute, and check whether they are safe to push down to the foreign
5219  * server. All GROUP BY expressions will be part of the grouping target
5220  * and thus there is no need to search for them separately. Add grouping
5221  * expressions into target list which will be passed to foreign server.
5222  */
5223  i = 0;
5224  foreach(lc, grouping_target->exprs)
5225  {
5226  Expr *expr = (Expr *) lfirst(lc);
5227  Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
5228  ListCell *l;
5229 
5230  /* Check whether this expression is part of GROUP BY clause */
5231  if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5232  {
5233  TargetEntry *tle;
5234 
5235  /*
5236  * If any GROUP BY expression is not shippable, then we cannot
5237  * push down aggregation to the foreign server.
5238  */
5239  if (!is_foreign_expr(root, grouped_rel, expr))
5240  return false;
5241 
5242  /*
5243  * Pushable, so add to tlist. We need to create a TLE for this
5244  * expression and apply the sortgroupref to it. We cannot use
5245  * add_to_flat_tlist() here because that avoids making duplicate
5246  * entries in the tlist. If there are duplicate entries with
5247  * distinct sortgrouprefs, we have to duplicate that situation in
5248  * the output tlist.
5249  */
5250  tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
5251  tle->ressortgroupref = sgref;
5252  tlist = lappend(tlist, tle);
5253  }
5254  else
5255  {
5256  /*
5257  * Non-grouping expression we need to compute. Is it shippable?
5258  */
5259  if (is_foreign_expr(root, grouped_rel, expr))
5260  {
5261  /* Yes, so add to tlist as-is; OK to suppress duplicates */
5262  tlist = add_to_flat_tlist(tlist, list_make1(expr));
5263  }
5264  else
5265  {
5266  /* Not pushable as a whole; extract its Vars and aggregates */
5267  aggvars = pull_var_clause((Node *) expr,
5269 
5270  /*
5271  * If any aggregate expression is not shippable, then we
5272  * cannot push down aggregation to the foreign server.
5273  */
5274  if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
5275  return false;
5276 
5277  /*
5278  * Add aggregates, if any, into the targetlist. Plain Vars
5279  * outside an aggregate can be ignored, because they should be
5280  * either same as some GROUP BY column or part of some GROUP
5281  * BY expression. In either case, they are already part of
5282  * the targetlist and thus no need to add them again. In fact
5283  * including plain Vars in the tlist when they do not match a
5284  * GROUP BY column would cause the foreign server to complain
5285  * that the shipped query is invalid.
5286  */
5287  foreach(l, aggvars)
5288  {
5289  Expr *expr = (Expr *) lfirst(l);
5290 
5291  if (IsA(expr, Aggref))
5292  tlist = add_to_flat_tlist(tlist, list_make1(expr));
5293  }
5294  }
5295  }
5296 
5297  i++;
5298  }
5299 
5300  /*
5301  * Classify the pushable and non-pushable HAVING clauses and save them in
5302  * remote_conds and local_conds of the grouped rel's fpinfo.
5303  */
5304  if (havingQual)
5305  {
5306  ListCell *lc;
5307 
5308  foreach(lc, (List *) havingQual)
5309  {
5310  Expr *expr = (Expr *) lfirst(lc);
5311  RestrictInfo *rinfo;
5312 
5313  /*
5314  * Currently, the core code doesn't wrap havingQuals in
5315  * RestrictInfos, so we must make our own.
5316  */
5317  Assert(!IsA(expr, RestrictInfo));
5318  rinfo = make_restrictinfo(expr,
5319  true,
5320  false,
5321  false,
5322  root->qual_security_level,
5323  grouped_rel->relids,
5324  NULL,
5325  NULL);
5326  if (is_foreign_expr(root, grouped_rel, expr))
5327  fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5328  else
5329  fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5330  }
5331  }
5332 
5333  /*
5334  * If there are any local conditions, pull Vars and aggregates from it and
5335  * check whether they are safe to pushdown or not.
5336  */
5337  if (fpinfo->local_conds)
5338  {
5339  List *aggvars = NIL;
5340  ListCell *lc;
5341 
5342  foreach(lc, fpinfo->local_conds)
5343  {
5344  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5345 
5346  aggvars = list_concat(aggvars,
5347  pull_var_clause((Node *) rinfo->clause,
5349  }
5350 
5351  foreach(lc, aggvars)
5352  {
5353  Expr *expr = (Expr *) lfirst(lc);
5354 
5355  /*
5356  * If aggregates within local conditions are not safe to push
5357  * down, then we cannot push down the query. Vars are already
5358  * part of GROUP BY clause which are checked above, so no need to
5359  * access them again here.
5360  */
5361  if (IsA(expr, Aggref))
5362  {
5363  if (!is_foreign_expr(root, grouped_rel, expr))
5364  return false;
5365 
5366  tlist = add_to_flat_tlist(tlist, list_make1(expr));
5367  }
5368  }
5369  }
5370 
5371  /* Store generated targetlist */
5372  fpinfo->grouped_tlist = tlist;
5373 
5374  /* Safe to pushdown */
5375  fpinfo->pushdown_safe = true;
5376 
5377  /*
5378  * Set cached relation costs to some negative value, so that we can detect
5379  * when they are set to some sensible costs, during one (usually the
5380  * first) of the calls to estimate_path_cost_size().
5381  */
5382  fpinfo->rel_startup_cost = -1;
5383  fpinfo->rel_total_cost = -1;
5384 
5385  /*
5386  * Set the string describing this grouped relation to be used in EXPLAIN
5387  * output of corresponding ForeignScan.
5388  */
5389  fpinfo->relation_name = makeStringInfo();
5390  appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
5391  ofpinfo->relation_name->data);
5392 
5393  return true;
5394 }
5395 
5396 /*
5397  * postgresGetForeignUpperPaths
5398  * Add paths for post-join operations like aggregation, grouping etc. if
5399  * corresponding operations are safe to push down.
5400  *
5401  * Right now, we only support aggregate, grouping and having clause pushdown.
5402  */
5403 static void
5405  RelOptInfo *input_rel, RelOptInfo *output_rel,
5406  void *extra)
5407 {
5408  PgFdwRelationInfo *fpinfo;
5409 
5410  /*
5411  * If input rel is not safe to pushdown, then simply return as we cannot
5412  * perform any post-join operations on the foreign server.
5413  */
5414  if (!input_rel->fdw_private ||
5415  !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
5416  return;
5417 
5418  /* Ignore stages we don't support; and skip any duplicate calls. */
5419  if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
5420  return;
5421 
5422  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5423  fpinfo->pushdown_safe = false;
5424  output_rel->fdw_private = fpinfo;
5425 
5426  add_foreign_grouping_paths(root, input_rel, output_rel,
5427  (GroupPathExtraData *) extra);
5428 }
5429 
5430 /*
5431  * add_foreign_grouping_paths
5432  * Add foreign path for grouping and/or aggregation.
5433  *
5434  * Given input_rel represents the underlying scan. The paths are added to the
5435  * given grouped_rel.
5436  */
5437 static void
5439  RelOptInfo *grouped_rel,
5440  GroupPathExtraData *extra)
5441 {
5442  Query *parse = root->parse;
5443  PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5444  PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
5445  ForeignPath *grouppath;
5446  double rows;
5447  int width;
5448  Cost startup_cost;
5449  Cost total_cost;
5450 
5451  /* Nothing to be done, if there is no grouping or aggregation required. */
5452  if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
5453  !root->hasHavingQual)
5454  return;
5455 
5458 
5459  /* save the input_rel as outerrel in fpinfo */
5460  fpinfo->outerrel = input_rel;
5461 
5462  /*
5463  * Copy foreign table, foreign server, user mapping, FDW options etc.
5464  * details from the input relation's fpinfo.
5465  */
5466  fpinfo->table = ifpinfo->table;
5467  fpinfo->server = ifpinfo->server;
5468  fpinfo->user = ifpinfo->user;
5469  merge_fdw_options(fpinfo, ifpinfo, NULL);
5470 
5471  /*
5472  * Assess if it is safe to push down aggregation and grouping.
5473  *
5474  * Use HAVING qual from extra. In case of child partition, it will have
5475  * translated Vars.
5476  */
5477  if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
5478  return;
5479 
5480  /* Estimate the cost of push down */
5481  estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
5482  &width, &startup_cost, &total_cost);
5483 
5484  /* Now update this information in the fpinfo */
5485  fpinfo->rows = rows;
5486  fpinfo->width = width;
5487  fpinfo->startup_cost = startup_cost;
5488  fpinfo->total_cost = total_cost;
5489 
5490  /* Create and add foreign path to the grouping relation. */
5491  grouppath = create_foreignscan_path(root,
5492  grouped_rel,
5493  grouped_rel->reltarget,
5494  rows,
5495  startup_cost,
5496  total_cost,
5497  NIL, /* no pathkeys */
5498  NULL, /* no required_outer */
5499  NULL,
5500  NIL); /* no fdw_private */
5501 
5502  /* Add generated path into grouped_rel by add_path(). */
5503  add_path(grouped_rel, (Path *) grouppath);
5504 }
5505 
5506 /*
5507  * Create a tuple from the specified row of the PGresult.
5508  *
5509  * rel is the local representation of the foreign table, attinmeta is
5510  * conversion data for the rel's tupdesc, and retrieved_attrs is an
5511  * integer list of the table column numbers present in the PGresult.
5512  * temp_context is a working context that can be reset after each tuple.
5513  */
5514 static HeapTuple
5516  int row,
5517  Relation rel,
5520  ForeignScanState *fsstate,
5521  MemoryContext temp_context)
5522 {
5523  HeapTuple tuple;
5525  Datum *values;
5526  bool *nulls;
5527  ItemPointer ctid = NULL;
5528  Oid oid = InvalidOid;
5529  ConversionLocation errpos;
5530  ErrorContextCallback errcallback;
5531  MemoryContext oldcontext;
5532  ListCell *lc;
5533  int j;
5534 
5535  Assert(row < PQntuples(res));
5536 
5537  /*
5538  * Do the following work in a temp context that we reset after each tuple.
5539  * This cleans up not only the data we have direct access to, but any
5540  * cruft the I/O functions might leak.
5541  */
5542  oldcontext = MemoryContextSwitchTo(temp_context);
5543 
5544  if (rel)
5545  tupdesc = RelationGetDescr(rel);
5546  else
5547  {
5548  Assert(fsstate);
5549  tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
5550  }
5551 
5552  values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
5553  nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
5554  /* Initialize to nulls for any columns not present in result */
5555  memset(nulls, true, tupdesc->natts * sizeof(bool));
5556 
5557  /*
5558  * Set up and install callback to report where conversion error occurs.
5559  */
5560  errpos.rel = rel;
5561  errpos.cur_attno = 0;
5562  errpos.fsstate = fsstate;
5563  errcallback.callback = conversion_error_callback;
5564  errcallback.arg = (void *) &errpos;
5565  errcallback.previous = error_context_stack;
5566  error_context_stack = &errcallback;
5567 
5568  /*
5569  * i indexes columns in the relation, j indexes columns in the PGresult.
5570  */
5571  j = 0;
5572  foreach(lc, retrieved_attrs)
5573  {
5574  int i = lfirst_int(lc);
5575  char *valstr;
5576 
5577  /* fetch next column's textual value */
5578  if (PQgetisnull(res, row, j))
5579  valstr = NULL;
5580  else
5581  valstr = PQgetvalue(res, row, j);
5582 
5583  /*
5584  * convert value to internal representation
5585  *
5586  * Note: we ignore system columns other than ctid and oid in result
5587  */
5588  errpos.cur_attno = i;
5589  if (i > 0)
5590  {
5591  /* ordinary column */
5592  Assert(i <= tupdesc->natts);
5593  nulls[i - 1] = (valstr == NULL);
5594  /* Apply the input function even to nulls, to support domains */
5595  values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
5596  valstr,
5597  attinmeta->attioparams[i - 1],
5598  attinmeta->atttypmods[i - 1]);
5599  }
5600  else if (i == SelfItemPointerAttributeNumber)
5601  {
5602  /* ctid */
5603  if (valstr != NULL)
5604  {
5605  Datum datum;
5606 
5607  datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
5608  ctid = (ItemPointer) DatumGetPointer(datum);
5609  }
5610  }
5611  else if (i == ObjectIdAttributeNumber)
5612  {
5613  /* oid */
5614  if (valstr != NULL)
5615  {
5616  Datum datum;
5617 
5618  datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
5619  oid = DatumGetObjectId(datum);
5620  }
5621  }
5622  errpos.cur_attno = 0;
5623 
5624  j++;
5625  }
5626 
5627  /* Uninstall error context callback. */
5628  error_context_stack = errcallback.previous;
5629 
5630  /*
5631  * Check we got the expected number of columns. Note: j == 0 and
5632  * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
5633  */
5634  if (j > 0 && j != PQnfields(res))
5635  elog(ERROR, "remote query result does not match the foreign table");
5636 
5637  /*
5638  * Build the result tuple in caller's memory context.
5639  */
5640  MemoryContextSwitchTo(oldcontext);
5641 
5642  tuple = heap_form_tuple(tupdesc, values, nulls);
5643 
5644  /*
5645  * If we have a CTID to return, install it in both t_self and t_ctid.
5646  * t_self is the normal place, but if the tuple is converted to a
5647  * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
5648  * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
5649  */
5650  if (ctid)
5651  tuple->t_self = tuple->t_data->t_ctid = *ctid;
5652 
5653  /*
5654  * Stomp on the xmin, xmax, and cmin fields from the tuple created by
5655  * heap_form_tuple. heap_form_tuple actually creates the tuple with
5656  * DatumTupleFields, not HeapTupleFields, but the executor expects
5657  * HeapTupleFields and will happily extract system columns on that
5658  * assumption. If we don't do this then, for example, the tuple length
5659  * ends up in the xmin field, which isn't what we want.
5660  */
5664 
5665  /*
5666  * If we have an OID to return, install it.
5667  */
5668  if (OidIsValid(oid))
5669  HeapTupleSetOid(tuple, oid);
5670 
5671  /* Clean up */
5672  MemoryContextReset(temp_context);
5673 
5674  return tuple;
5675 }
5676 
5677 /*
5678  * Callback function which is called when error occurs during column value
5679  * conversion. Print names of column and relation.
5680  */
5681 static void
5683 {
5684  const char *attname = NULL;
5685  const char *relname = NULL;
5686  bool